1"""
2Zeromq transport classes
3"""
4import errno
5import hashlib
6import logging
7import os
8import signal
9import sys
10import threading
11import uuid
12from random import randint
13
14import salt.auth
15import salt.crypt
16import salt.ext.tornado
17import salt.ext.tornado.concurrent
18import salt.ext.tornado.gen
19import salt.ext.tornado.ioloop
20import salt.log.setup
21import salt.payload
22import salt.transport.client
23import salt.transport.mixins.auth
24import salt.transport.server
25import salt.utils.event
26import salt.utils.files
27import salt.utils.minions
28import salt.utils.process
29import salt.utils.stringutils
30import salt.utils.verify
31import salt.utils.versions
32import salt.utils.zeromq
33import zmq.error
34import zmq.eventloop.ioloop
35import zmq.eventloop.zmqstream
36from salt._compat import ipaddress
37from salt.exceptions import SaltException, SaltReqTimeoutError
38from salt.utils.zeromq import LIBZMQ_VERSION_INFO, ZMQ_VERSION_INFO, zmq
39
40try:
41    import zmq.utils.monitor
42
43    HAS_ZMQ_MONITOR = True
44except ImportError:
45    HAS_ZMQ_MONITOR = False
46
47
48try:
49    from M2Crypto import RSA
50
51    HAS_M2 = True
52except ImportError:
53    HAS_M2 = False
54    try:
55        from Cryptodome.Cipher import PKCS1_OAEP
56    except ImportError:
57        from Crypto.Cipher import PKCS1_OAEP  # nosec
58
59
60log = logging.getLogger(__name__)
61
62
63def _get_master_uri(master_ip, master_port, source_ip=None, source_port=None):
64    """
65    Return the ZeroMQ URI to connect the Minion to the Master.
66    It supports different source IP / port, given the ZeroMQ syntax:
67    // Connecting using a IP address and bind to an IP address
68    rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0);
69    Source: http://api.zeromq.org/4-1:zmq-tcp
70    """
71
72    from salt.utils.zeromq import ip_bracket
73
74    master_uri = "tcp://{master_ip}:{master_port}".format(
75        master_ip=ip_bracket(master_ip), master_port=master_port
76    )
77    if source_ip or source_port:
78        if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1):
79            # The source:port syntax for ZeroMQ has been added in libzmq 4.1.6
80            # which is included in the pyzmq wheels starting with 16.0.1.
81            if source_ip and source_port:
82                master_uri = (
83                    "tcp://{source_ip}:{source_port};{master_ip}:{master_port}".format(
84                        source_ip=ip_bracket(source_ip),
85                        source_port=source_port,
86                        master_ip=ip_bracket(master_ip),
87                        master_port=master_port,
88                    )
89                )
90            elif source_ip and not source_port:
91                master_uri = "tcp://{source_ip}:0;{master_ip}:{master_port}".format(
92                    source_ip=ip_bracket(source_ip),
93                    master_ip=ip_bracket(master_ip),
94                    master_port=master_port,
95                )
96            elif source_port and not source_ip:
97                ip_any = (
98                    "0.0.0.0"
99                    if ipaddress.ip_address(master_ip).version == 4
100                    else ip_bracket("::")
101                )
102                master_uri = (
103                    "tcp://{ip_any}:{source_port};{master_ip}:{master_port}".format(
104                        ip_any=ip_any,
105                        source_port=source_port,
106                        master_ip=ip_bracket(master_ip),
107                        master_port=master_port,
108                    )
109                )
110        else:
111            log.warning(
112                "Unable to connect to the Master using a specific source IP / port"
113            )
114            log.warning("Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6")
115            log.warning(
116                "Specific source IP / port for connecting to master returner port:"
117                " configuraion ignored"
118            )
119
120    return master_uri
121
122
123class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel):
124    """
125    Encapsulate sending routines to ZeroMQ.
126
127    ZMQ Channels default to 'crypt=aes'
128    """
129
130    async_methods = [
131        "crypted_transfer_decode_dictentry",
132        "_crypted_transfer",
133        "_do_transfer",
134        "_uncrypted_transfer",
135        "send",
136    ]
137    close_methods = [
138        "close",
139    ]
140
141    def __init__(self, opts, **kwargs):
142        self.opts = dict(opts)
143        self.ttype = "zeromq"
144
145        # crypt defaults to 'aes'
146        self.crypt = kwargs.get("crypt", "aes")
147
148        if "master_uri" in kwargs:
149            self.opts["master_uri"] = kwargs["master_uri"]
150
151        self._io_loop = kwargs.get("io_loop")
152        if self._io_loop is None:
153            self._io_loop = salt.ext.tornado.ioloop.IOLoop.current()
154
155        if self.crypt != "clear":
156            # we don't need to worry about auth as a kwarg, since its a singleton
157            self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self._io_loop)
158        log.debug(
159            "Connecting the Minion to the Master URI (for the return server): %s",
160            self.master_uri,
161        )
162        self.message_client = AsyncReqMessageClientPool(
163            self.opts,
164            args=(
165                self.opts,
166                self.master_uri,
167            ),
168            kwargs={"io_loop": self._io_loop},
169        )
170        self._closing = False
171
172    def close(self):
173        """
174        Since the message_client creates sockets and assigns them to the IOLoop we have to
175        specifically destroy them, since we aren't the only ones with references to the FDs
176        """
177        if self._closing:
178            return
179        log.debug("Closing %s instance", self.__class__.__name__)
180        self._closing = True
181        if hasattr(self, "message_client"):
182            self.message_client.close()
183
184    # pylint: disable=W1701
185    def __del__(self):
186        try:
187            self.close()
188        except OSError as exc:
189            if exc.errno != errno.EBADF:
190                # If its not a bad file descriptor error, raise
191                raise
192
193    # pylint: enable=W1701
194
195    @property
196    def master_uri(self):
197        if "master_uri" in self.opts:
198            return self.opts["master_uri"]
199
200        # if by chance master_uri is not there..
201        if "master_ip" in self.opts:
202            return _get_master_uri(
203                self.opts["master_ip"],
204                self.opts["master_port"],
205                source_ip=self.opts.get("source_ip"),
206                source_port=self.opts.get("source_ret_port"),
207            )
208
209        # if we've reached here something is very abnormal
210        raise SaltException("ReqChannel: missing master_uri/master_ip in self.opts")
211
212    def _package_load(self, load):
213        return {
214            "enc": self.crypt,
215            "load": load,
216            "version": 2,
217        }
218
219    @salt.ext.tornado.gen.coroutine
220    def crypted_transfer_decode_dictentry(
221        self, load, dictkey=None, tries=3, timeout=60
222    ):
223        nonce = uuid.uuid4().hex
224        load["nonce"] = nonce
225        if not self.auth.authenticated:
226            # Return control back to the caller, continue when authentication succeeds
227            yield self.auth.authenticate()
228
229        # Return control to the caller. When send() completes, resume by
230        # populating ret with the Future.result
231        ret = yield self.message_client.send(
232            self._package_load(self.auth.crypticle.dumps(load)),
233            timeout=timeout,
234            tries=tries,
235        )
236
237        if "key" not in ret:
238            # Reauth in the case our key is deleted on the master side.
239            yield self.auth.authenticate()
240            ret = yield self.message_client.send(
241                self._package_load(self.auth.crypticle.dumps(load)),
242                timeout=timeout,
243                tries=tries,
244            )
245
246        key = self.auth.get_keys()
247        if HAS_M2:
248            aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
249        else:
250            cipher = PKCS1_OAEP.new(key)
251            aes = cipher.decrypt(ret["key"])
252
253        # Decrypt using the public key.
254        pcrypt = salt.crypt.Crypticle(self.opts, aes)
255        signed_msg = pcrypt.loads(ret[dictkey])
256
257        # Validate the master's signature.
258        master_pubkey_path = os.path.join(self.opts["pki_dir"], "minion_master.pub")
259        if not salt.crypt.verify_signature(
260            master_pubkey_path, signed_msg["data"], signed_msg["sig"]
261        ):
262            raise salt.crypt.AuthenticationError(
263                "Pillar payload signature failed to validate."
264            )
265
266        # Make sure the signed key matches the key we used to decrypt the data.
267        data = salt.payload.loads(signed_msg["data"])
268        if data["key"] != ret["key"]:
269            raise salt.crypt.AuthenticationError("Key verification failed.")
270
271        # Validate the nonce.
272        if data["nonce"] != nonce:
273            raise salt.crypt.AuthenticationError("Pillar nonce verification failed.")
274        raise salt.ext.tornado.gen.Return(data["pillar"])
275
276    @salt.ext.tornado.gen.coroutine
277    def _crypted_transfer(self, load, tries=3, timeout=60, raw=False):
278        """
279        Send a load across the wire, with encryption
280
281        In case of authentication errors, try to renegotiate authentication
282        and retry the method.
283
284        Indeed, we can fail too early in case of a master restart during a
285        minion state execution call
286
287        :param dict load: A load to send across the wire
288        :param int tries: The number of times to make before failure
289        :param int timeout: The number of seconds on a response before failing
290        """
291        nonce = uuid.uuid4().hex
292        if load and isinstance(load, dict):
293            load["nonce"] = nonce
294
295        @salt.ext.tornado.gen.coroutine
296        def _do_transfer():
297            # Yield control to the caller. When send() completes, resume by populating data with the Future.result
298            data = yield self.message_client.send(
299                self._package_load(self.auth.crypticle.dumps(load)),
300                timeout=timeout,
301                tries=tries,
302            )
303            # we may not have always data
304            # as for example for saltcall ret submission, this is a blind
305            # communication, we do not subscribe to return events, we just
306            # upload the results to the master
307            if data:
308                data = self.auth.crypticle.loads(data, raw, nonce)
309            if not raw:
310                data = salt.transport.frame.decode_embedded_strs(data)
311            raise salt.ext.tornado.gen.Return(data)
312
313        if not self.auth.authenticated:
314            # Return control back to the caller, resume when authentication succeeds
315            yield self.auth.authenticate()
316        try:
317            # We did not get data back the first time. Retry.
318            ret = yield _do_transfer()
319        except salt.crypt.AuthenticationError:
320            # If auth error, return control back to the caller, continue when authentication succeeds
321            yield self.auth.authenticate()
322            ret = yield _do_transfer()
323        raise salt.ext.tornado.gen.Return(ret)
324
325    @salt.ext.tornado.gen.coroutine
326    def _uncrypted_transfer(self, load, tries=3, timeout=60):
327        """
328        Send a load across the wire in cleartext
329
330        :param dict load: A load to send across the wire
331        :param int tries: The number of times to make before failure
332        :param int timeout: The number of seconds on a response before failing
333        """
334        ret = yield self.message_client.send(
335            self._package_load(load),
336            timeout=timeout,
337            tries=tries,
338        )
339
340        raise salt.ext.tornado.gen.Return(ret)
341
342    @salt.ext.tornado.gen.coroutine
343    def send(self, load, tries=3, timeout=60, raw=False):
344        """
345        Send a request, return a future which will complete when we send the message
346        """
347        if self.crypt == "clear":
348            ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout)
349        else:
350            ret = yield self._crypted_transfer(
351                load, tries=tries, timeout=timeout, raw=raw
352            )
353        raise salt.ext.tornado.gen.Return(ret)
354
355
356class AsyncZeroMQPubChannel(
357    salt.transport.mixins.auth.AESPubClientMixin, salt.transport.client.AsyncPubChannel
358):
359    """
360    A transport channel backed by ZeroMQ for a Salt Publisher to use to
361    publish commands to connected minions
362    """
363
364    async_methods = [
365        "connect",
366        "_decode_messages",
367    ]
368    close_methods = [
369        "close",
370    ]
371
372    def __init__(self, opts, **kwargs):
373        self.opts = opts
374        self.ttype = "zeromq"
375        self.io_loop = kwargs.get("io_loop")
376        self._closing = False
377
378        if self.io_loop is None:
379            self.io_loop = salt.ext.tornado.ioloop.IOLoop.current()
380
381        self.hexid = hashlib.sha1(
382            salt.utils.stringutils.to_bytes(self.opts["id"])
383        ).hexdigest()
384        self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
385        self.context = zmq.Context()
386        self._socket = self.context.socket(zmq.SUB)
387
388        if self.opts["zmq_filtering"]:
389            # TODO: constants file for "broadcast"
390            self._socket.setsockopt(zmq.SUBSCRIBE, b"broadcast")
391            if self.opts.get("__role") == "syndic":
392                self._socket.setsockopt(zmq.SUBSCRIBE, b"syndic")
393            else:
394                self._socket.setsockopt(
395                    zmq.SUBSCRIBE, salt.utils.stringutils.to_bytes(self.hexid)
396                )
397        else:
398            self._socket.setsockopt(zmq.SUBSCRIBE, b"")
399
400        self._socket.setsockopt(
401            zmq.IDENTITY, salt.utils.stringutils.to_bytes(self.opts["id"])
402        )
403
404        # TODO: cleanup all the socket opts stuff
405        if hasattr(zmq, "TCP_KEEPALIVE"):
406            self._socket.setsockopt(zmq.TCP_KEEPALIVE, self.opts["tcp_keepalive"])
407            self._socket.setsockopt(
408                zmq.TCP_KEEPALIVE_IDLE, self.opts["tcp_keepalive_idle"]
409            )
410            self._socket.setsockopt(
411                zmq.TCP_KEEPALIVE_CNT, self.opts["tcp_keepalive_cnt"]
412            )
413            self._socket.setsockopt(
414                zmq.TCP_KEEPALIVE_INTVL, self.opts["tcp_keepalive_intvl"]
415            )
416
417        recon_delay = self.opts["recon_default"]
418
419        if self.opts["recon_randomize"]:
420            recon_delay = randint(
421                self.opts["recon_default"],
422                self.opts["recon_default"] + self.opts["recon_max"],
423            )
424
425            log.debug(
426                "Generated random reconnect delay between '%sms' and '%sms' (%s)",
427                self.opts["recon_default"],
428                self.opts["recon_default"] + self.opts["recon_max"],
429                recon_delay,
430            )
431
432        log.debug("Setting zmq_reconnect_ivl to '%sms'", recon_delay)
433        self._socket.setsockopt(zmq.RECONNECT_IVL, recon_delay)
434
435        if hasattr(zmq, "RECONNECT_IVL_MAX"):
436            log.debug(
437                "Setting zmq_reconnect_ivl_max to '%sms'",
438                self.opts["recon_default"] + self.opts["recon_max"],
439            )
440
441            self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, self.opts["recon_max"])
442
443        if (self.opts["ipv6"] is True or ":" in self.opts["master_ip"]) and hasattr(
444            zmq, "IPV4ONLY"
445        ):
446            # IPv6 sockets work for both IPv6 and IPv4 addresses
447            self._socket.setsockopt(zmq.IPV4ONLY, 0)
448
449        if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]:
450            self._monitor = ZeroMQSocketMonitor(self._socket)
451            self._monitor.start_io_loop(self.io_loop)
452
453    def close(self):
454        if self._closing is True:
455            return
456
457        self._closing = True
458
459        if hasattr(self, "_monitor") and self._monitor is not None:
460            self._monitor.stop()
461            self._monitor = None
462        if hasattr(self, "_stream"):
463            self._stream.close(0)
464        elif hasattr(self, "_socket"):
465            self._socket.close(0)
466        if hasattr(self, "context") and self.context.closed is False:
467            self.context.term()
468
469    # pylint: disable=W1701
470    def __del__(self):
471        self.close()
472
473    # pylint: enable=W1701
474    def __enter__(self):
475        return self
476
477    def __exit__(self, *args):
478        self.close()
479
480    # TODO: this is the time to see if we are connected, maybe use the req channel to guess?
481    @salt.ext.tornado.gen.coroutine
482    def connect(self):
483        if not self.auth.authenticated:
484            yield self.auth.authenticate()
485
486        # if this is changed from the default, we assume it was intentional
487        if int(self.opts.get("publish_port", 4506)) != 4506:
488            self.publish_port = self.opts.get("publish_port")
489        # else take the relayed publish_port master reports
490        else:
491            self.publish_port = self.auth.creds["publish_port"]
492
493        log.debug(
494            "Connecting the Minion to the Master publish port, using the URI: %s",
495            self.master_pub,
496        )
497        self._socket.connect(self.master_pub)
498
499    @property
500    def master_pub(self):
501        """
502        Return the master publish port
503        """
504        return _get_master_uri(
505            self.opts["master_ip"],
506            self.publish_port,
507            source_ip=self.opts.get("source_ip"),
508            source_port=self.opts.get("source_publish_port"),
509        )
510
511    @salt.ext.tornado.gen.coroutine
512    def _decode_messages(self, messages):
513        """
514        Take the zmq messages, decrypt/decode them into a payload
515
516        :param list messages: A list of messages to be decoded
517        """
518        messages_len = len(messages)
519        # if it was one message, then its old style
520        if messages_len == 1:
521            payload = salt.payload.loads(messages[0])
522        # 2 includes a header which says who should do it
523        elif messages_len == 2:
524            message_target = salt.utils.stringutils.to_str(messages[0])
525            if (
526                self.opts.get("__role") != "syndic"
527                and message_target not in ("broadcast", self.hexid)
528            ) or (
529                self.opts.get("__role") == "syndic"
530                and message_target not in ("broadcast", "syndic")
531            ):
532                log.debug("Publish received for not this minion: %s", message_target)
533                raise salt.ext.tornado.gen.Return(None)
534            payload = salt.payload.loads(messages[1])
535        else:
536            raise Exception(
537                "Invalid number of messages ({}) in zeromq pubmessage from master".format(
538                    len(messages_len)
539                )
540            )
541        # Yield control back to the caller. When the payload has been decoded, assign
542        # the decoded payload to 'ret' and resume operation
543        ret = yield self._decode_payload(payload)
544        raise salt.ext.tornado.gen.Return(ret)
545
546    @property
547    def stream(self):
548        """
549        Return the current zmqstream, creating one if necessary
550        """
551        if not hasattr(self, "_stream"):
552            self._stream = zmq.eventloop.zmqstream.ZMQStream(
553                self._socket, io_loop=self.io_loop
554            )
555        return self._stream
556
557    def on_recv(self, callback):
558        """
559        Register a callback for received messages (that we didn't initiate)
560
561        :param func callback: A function which should be called when data is received
562        """
563        if callback is None:
564            return self.stream.on_recv(None)
565
566        @salt.ext.tornado.gen.coroutine
567        def wrap_callback(messages):
568            payload = yield self._decode_messages(messages)
569            if payload is not None:
570                callback(payload)
571
572        return self.stream.on_recv(wrap_callback)
573
574
575class ZeroMQReqServerChannel(
576    salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel
577):
578    def __init__(self, opts):
579        salt.transport.server.ReqServerChannel.__init__(self, opts)
580        self._closing = False
581        self._monitor = None
582        self._w_monitor = None
583
584    def zmq_device(self):
585        """
586        Multiprocessing target for the zmq queue device
587        """
588        self.__setup_signals()
589        salt.utils.process.appendproctitle("MWorkerQueue")
590        self.context = zmq.Context(self.opts["worker_threads"])
591        # Prepare the zeromq sockets
592        self.uri = "tcp://{interface}:{ret_port}".format(**self.opts)
593        self.clients = self.context.socket(zmq.ROUTER)
594        if self.opts["ipv6"] is True and hasattr(zmq, "IPV4ONLY"):
595            # IPv6 sockets work for both IPv6 and IPv4 addresses
596            self.clients.setsockopt(zmq.IPV4ONLY, 0)
597        self.clients.setsockopt(zmq.BACKLOG, self.opts.get("zmq_backlog", 1000))
598        self._start_zmq_monitor()
599        self.workers = self.context.socket(zmq.DEALER)
600
601        if self.opts["mworker_queue_niceness"] and not salt.utils.platform.is_windows():
602            log.info(
603                "setting mworker_queue niceness to %d",
604                self.opts["mworker_queue_niceness"],
605            )
606            os.nice(self.opts["mworker_queue_niceness"])
607
608        if self.opts.get("ipc_mode", "") == "tcp":
609            self.w_uri = "tcp://127.0.0.1:{}".format(
610                self.opts.get("tcp_master_workers", 4515)
611            )
612        else:
613            self.w_uri = "ipc://{}".format(
614                os.path.join(self.opts["sock_dir"], "workers.ipc")
615            )
616
617        log.info("Setting up the master communication server")
618        self.clients.bind(self.uri)
619        self.workers.bind(self.w_uri)
620
621        while True:
622            if self.clients.closed or self.workers.closed:
623                break
624            try:
625                zmq.device(zmq.QUEUE, self.clients, self.workers)
626            except zmq.ZMQError as exc:
627                if exc.errno == errno.EINTR:
628                    continue
629                raise
630            except (KeyboardInterrupt, SystemExit):
631                break
632
633    def close(self):
634        """
635        Cleanly shutdown the router socket
636        """
637        if self._closing:
638            return
639        log.info("MWorkerQueue under PID %s is closing", os.getpid())
640        self._closing = True
641        if getattr(self, "_monitor", None) is not None:
642            self._monitor.stop()
643            self._monitor = None
644        if getattr(self, "_w_monitor", None) is not None:
645            self._w_monitor.stop()
646            self._w_monitor = None
647        if hasattr(self, "clients") and self.clients.closed is False:
648            self.clients.close()
649        if hasattr(self, "workers") and self.workers.closed is False:
650            self.workers.close()
651        if hasattr(self, "stream"):
652            self.stream.close()
653        if hasattr(self, "_socket") and self._socket.closed is False:
654            self._socket.close()
655        if hasattr(self, "context") and self.context.closed is False:
656            self.context.term()
657
658    def pre_fork(self, process_manager):
659        """
660        Pre-fork we need to create the zmq router device
661
662        :param func process_manager: An instance of salt.utils.process.ProcessManager
663        """
664        salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager)
665        process_manager.add_process(self.zmq_device)
666
667    def _start_zmq_monitor(self):
668        """
669        Starts ZMQ monitor for debugging purposes.
670        :return:
671        """
672        # Socket monitor shall be used the only for debug
673        # purposes so using threading doesn't look too bad here
674
675        if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]:
676            log.debug("Starting ZMQ monitor")
677            import threading
678
679            self._w_monitor = ZeroMQSocketMonitor(self._socket)
680            threading.Thread(target=self._w_monitor.start_poll).start()
681            log.debug("ZMQ monitor has been started started")
682
683    def post_fork(self, payload_handler, io_loop):
684        """
685        After forking we need to create all of the local sockets to listen to the
686        router
687
688        :param func payload_handler: A function to called to handle incoming payloads as
689                                     they are picked up off the wire
690        :param IOLoop io_loop: An instance of a Tornado IOLoop, to handle event scheduling
691        """
692        self.payload_handler = payload_handler
693        self.io_loop = io_loop
694
695        self.context = zmq.Context(1)
696        self._socket = self.context.socket(zmq.REP)
697        self._start_zmq_monitor()
698
699        if self.opts.get("ipc_mode", "") == "tcp":
700            self.w_uri = "tcp://127.0.0.1:{}".format(
701                self.opts.get("tcp_master_workers", 4515)
702            )
703        else:
704            self.w_uri = "ipc://{}".format(
705                os.path.join(self.opts["sock_dir"], "workers.ipc")
706            )
707        log.info("Worker binding to socket %s", self.w_uri)
708        self._socket.connect(self.w_uri)
709
710        salt.transport.mixins.auth.AESReqServerMixin.post_fork(
711            self, payload_handler, io_loop
712        )
713
714        self.stream = zmq.eventloop.zmqstream.ZMQStream(
715            self._socket, io_loop=self.io_loop
716        )
717        self.stream.on_recv_stream(self.handle_message)
718
719    @salt.ext.tornado.gen.coroutine
720    def handle_message(self, stream, payload):
721        """
722        Handle incoming messages from underlying TCP streams
723
724        :stream ZMQStream stream: A ZeroMQ stream.
725        See http://zeromq.github.io/pyzmq/api/generated/zmq.eventloop.zmqstream.html
726
727        :param dict payload: A payload to process
728        """
729        try:
730            payload = salt.payload.loads(payload[0])
731            payload = self._decode_payload(payload)
732        except Exception as exc:  # pylint: disable=broad-except
733            exc_type = type(exc).__name__
734            if exc_type == "AuthenticationError":
735                log.debug(
736                    "Minion failed to auth to master. Since the payload is "
737                    "encrypted, it is not known which minion failed to "
738                    "authenticate. It is likely that this is a transient "
739                    "failure due to the master rotating its public key."
740                )
741            else:
742                log.error("Bad load from minion: %s: %s", exc_type, exc)
743            stream.send(salt.payload.dumps("bad load"))
744            raise salt.ext.tornado.gen.Return()
745
746        # TODO helper functions to normalize payload?
747        if not isinstance(payload, dict) or not isinstance(payload.get("load"), dict):
748            log.error(
749                "payload and load must be a dict. Payload was: %s and load was %s",
750                payload,
751                payload.get("load"),
752            )
753            stream.send(salt.payload.dumps("payload and load must be a dict"))
754            raise salt.ext.tornado.gen.Return()
755
756        try:
757            id_ = payload["load"].get("id", "")
758            if "\0" in id_:
759                log.error("Payload contains an id with a null byte: %s", payload)
760                stream.send(salt.payload.dumps("bad load: id contains a null byte"))
761                raise salt.ext.tornado.gen.Return()
762        except TypeError:
763            log.error("Payload contains non-string id: %s", payload)
764            stream.send(
765                salt.payload.dumps("bad load: id {} is not a string".format(id_))
766            )
767            raise salt.ext.tornado.gen.Return()
768
769        version = 0
770        if "version" in payload:
771            version = payload["version"]
772
773        sign_messages = False
774        if version > 1:
775            sign_messages = True
776
777        # intercept the "_auth" commands, since the main daemon shouldn't know
778        # anything about our key auth
779        if payload["enc"] == "clear" and payload.get("load", {}).get("cmd") == "_auth":
780            stream.send(salt.payload.dumps(self._auth(payload["load"], sign_messages)))
781            raise salt.ext.tornado.gen.Return()
782
783        nonce = None
784        if version > 1:
785            nonce = payload["load"].pop("nonce", None)
786
787        # TODO: test
788        try:
789            # Take the payload_handler function that was registered when we created the channel
790            # and call it, returning control to the caller until it completes
791            ret, req_opts = yield self.payload_handler(payload)
792        except Exception as e:  # pylint: disable=broad-except
793            # always attempt to return an error to the minion
794            stream.send("Some exception handling minion payload")
795            log.error("Some exception handling a payload from minion", exc_info=True)
796            raise salt.ext.tornado.gen.Return()
797
798        req_fun = req_opts.get("fun", "send")
799        if req_fun == "send_clear":
800            stream.send(salt.payload.dumps(ret))
801        elif req_fun == "send":
802            stream.send(salt.payload.dumps(self.crypticle.dumps(ret, nonce)))
803        elif req_fun == "send_private":
804            stream.send(
805                salt.payload.dumps(
806                    self._encrypt_private(
807                        ret,
808                        req_opts["key"],
809                        req_opts["tgt"],
810                        nonce,
811                        sign_messages,
812                    )
813                )
814            )
815        else:
816            log.error("Unknown req_fun %s", req_fun)
817            # always attempt to return an error to the minion
818            stream.send("Server-side exception handling payload")
819        raise salt.ext.tornado.gen.Return()
820
821    def __setup_signals(self):
822        signal.signal(signal.SIGINT, self._handle_signals)
823        signal.signal(signal.SIGTERM, self._handle_signals)
824
825    def _handle_signals(self, signum, sigframe):
826        msg = "{} received a ".format(self.__class__.__name__)
827        if signum == signal.SIGINT:
828            msg += "SIGINT"
829        elif signum == signal.SIGTERM:
830            msg += "SIGTERM"
831        msg += ". Exiting"
832        log.debug(msg)
833        self.close()
834        sys.exit(salt.defaults.exitcodes.EX_OK)
835
836
837def _set_tcp_keepalive(zmq_socket, opts):
838    """
839    Ensure that TCP keepalives are set as specified in "opts".
840
841    Warning: Failure to set TCP keepalives on the salt-master can result in
842    not detecting the loss of a minion when the connection is lost or when
843    its host has been terminated without first closing the socket.
844    Salt's Presence System depends on this connection status to know if a minion
845    is "present".
846
847    Warning: Failure to set TCP keepalives on minions can result in frequent or
848    unexpected disconnects!
849    """
850    if hasattr(zmq, "TCP_KEEPALIVE") and opts:
851        if "tcp_keepalive" in opts:
852            zmq_socket.setsockopt(zmq.TCP_KEEPALIVE, opts["tcp_keepalive"])
853        if "tcp_keepalive_idle" in opts:
854            zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, opts["tcp_keepalive_idle"])
855        if "tcp_keepalive_cnt" in opts:
856            zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, opts["tcp_keepalive_cnt"])
857        if "tcp_keepalive_intvl" in opts:
858            zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"])
859
860
861class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel):
862    """
863    Encapsulate synchronous operations for a publisher channel
864    """
865
866    _sock_data = threading.local()
867
868    def __init__(self, opts):
869        self.opts = opts
870        self.ckminions = salt.utils.minions.CkMinions(self.opts)
871
872    def connect(self):
873        return salt.ext.tornado.gen.sleep(5)
874
875    def _publish_daemon(self, log_queue=None):
876        """
877        Bind to the interface specified in the configuration file
878        """
879        salt.utils.process.appendproctitle(self.__class__.__name__)
880
881        if self.opts["pub_server_niceness"] and not salt.utils.platform.is_windows():
882            log.info(
883                "setting Publish daemon niceness to %i",
884                self.opts["pub_server_niceness"],
885            )
886            os.nice(self.opts["pub_server_niceness"])
887
888        if log_queue:
889            salt.log.setup.set_multiprocessing_logging_queue(log_queue)
890            salt.log.setup.setup_multiprocessing_logging(log_queue)
891
892        # Set up the context
893        context = zmq.Context(1)
894        # Prepare minion publish socket
895        pub_sock = context.socket(zmq.PUB)
896        _set_tcp_keepalive(pub_sock, self.opts)
897        # if 2.1 >= zmq < 3.0, we only have one HWM setting
898        try:
899            pub_sock.setsockopt(zmq.HWM, self.opts.get("pub_hwm", 1000))
900        # in zmq >= 3.0, there are separate send and receive HWM settings
901        except AttributeError:
902            # Set the High Water Marks. For more information on HWM, see:
903            # http://api.zeromq.org/4-1:zmq-setsockopt
904            pub_sock.setsockopt(zmq.SNDHWM, self.opts.get("pub_hwm", 1000))
905            pub_sock.setsockopt(zmq.RCVHWM, self.opts.get("pub_hwm", 1000))
906        if self.opts["ipv6"] is True and hasattr(zmq, "IPV4ONLY"):
907            # IPv6 sockets work for both IPv6 and IPv4 addresses
908            pub_sock.setsockopt(zmq.IPV4ONLY, 0)
909        pub_sock.setsockopt(zmq.BACKLOG, self.opts.get("zmq_backlog", 1000))
910        pub_sock.setsockopt(zmq.LINGER, -1)
911        pub_uri = "tcp://{interface}:{publish_port}".format(**self.opts)
912        # Prepare minion pull socket
913        pull_sock = context.socket(zmq.PULL)
914        pull_sock.setsockopt(zmq.LINGER, -1)
915
916        if self.opts.get("ipc_mode", "") == "tcp":
917            pull_uri = "tcp://127.0.0.1:{}".format(
918                self.opts.get("tcp_master_publish_pull", 4514)
919            )
920        else:
921            pull_uri = "ipc://{}".format(
922                os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
923            )
924        salt.utils.zeromq.check_ipc_path_max_len(pull_uri)
925
926        # Start the minion command publisher
927        log.info("Starting the Salt Publisher on %s", pub_uri)
928        pub_sock.bind(pub_uri)
929
930        # Securely create socket
931        log.info("Starting the Salt Puller on %s", pull_uri)
932        with salt.utils.files.set_umask(0o177):
933            pull_sock.bind(pull_uri)
934
935        try:
936            while True:
937                # Catch and handle EINTR from when this process is sent
938                # SIGUSR1 gracefully so we don't choke and die horribly
939                try:
940                    log.debug("Publish daemon getting data from puller %s", pull_uri)
941                    package = pull_sock.recv()
942                    package = salt.payload.loads(package)
943                    package = self.pack_publish(package)
944                    log.debug("Publish daemon received payload. size=%d", len(package))
945
946                    unpacked_package = salt.payload.unpackage(package)
947                    unpacked_package = salt.transport.frame.decode_embedded_strs(
948                        unpacked_package
949                    )
950                    payload = unpacked_package["payload"]
951                    log.trace("Accepted unpacked package from puller")
952                    if self.opts["zmq_filtering"]:
953                        # if you have a specific topic list, use that
954                        if "topic_lst" in unpacked_package:
955                            for topic in unpacked_package["topic_lst"]:
956                                log.trace(
957                                    "Sending filtered data over publisher %s", pub_uri
958                                )
959                                # zmq filters are substring match, hash the topic
960                                # to avoid collisions
961                                htopic = salt.utils.stringutils.to_bytes(
962                                    hashlib.sha1(
963                                        salt.utils.stringutils.to_bytes(topic)
964                                    ).hexdigest()
965                                )
966                                pub_sock.send(htopic, flags=zmq.SNDMORE)
967                                pub_sock.send(payload)
968                                log.trace("Filtered data has been sent")
969
970                            # Syndic broadcast
971                            if self.opts.get("order_masters"):
972                                log.trace("Sending filtered data to syndic")
973                                pub_sock.send(b"syndic", flags=zmq.SNDMORE)
974                                pub_sock.send(payload)
975                                log.trace("Filtered data has been sent to syndic")
976                        # otherwise its a broadcast
977                        else:
978                            # TODO: constants file for "broadcast"
979                            log.trace(
980                                "Sending broadcasted data over publisher %s", pub_uri
981                            )
982                            pub_sock.send(b"broadcast", flags=zmq.SNDMORE)
983                            pub_sock.send(payload)
984                            log.trace("Broadcasted data has been sent")
985                    else:
986                        log.trace(
987                            "Sending ZMQ-unfiltered data over publisher %s", pub_uri
988                        )
989                        pub_sock.send(payload)
990                        log.trace("Unfiltered data has been sent")
991                except zmq.ZMQError as exc:
992                    if exc.errno == errno.EINTR:
993                        continue
994                    raise
995
996        except KeyboardInterrupt:
997            log.trace("Publish daemon caught Keyboard interupt, tearing down")
998        # Cleanly close the sockets if we're shutting down
999        if pub_sock.closed is False:
1000            pub_sock.close()
1001        if pull_sock.closed is False:
1002            pull_sock.close()
1003        if context.closed is False:
1004            context.term()
1005
1006    def pre_fork(self, process_manager, kwargs=None):
1007        """
1008        Do anything necessary pre-fork. Since this is on the master side this will
1009        primarily be used to create IPC channels and create our daemon process to
1010        do the actual publishing
1011
1012        :param func process_manager: A ProcessManager, from salt.utils.process.ProcessManager
1013        """
1014        process_manager.add_process(self._publish_daemon, kwargs=kwargs)
1015
1016    @property
1017    def pub_sock(self):
1018        """
1019        This thread's zmq publisher socket. This socket is stored on the class
1020        so that multiple instantiations in the same thread will re-use a single
1021        zmq socket.
1022        """
1023        try:
1024            return self._sock_data.sock
1025        except AttributeError:
1026            pass
1027
1028    def pub_connect(self):
1029        """
1030        Create and connect this thread's zmq socket. If a publisher socket
1031        already exists "pub_close" is called before creating and connecting a
1032        new socket.
1033        """
1034        if self.pub_sock:
1035            self.pub_close()
1036        self._sock_data._ctx = zmq.Context()
1037        self._sock_data.sock = self._sock_data._ctx.socket(zmq.PUSH)
1038        self.pub_sock.setsockopt(zmq.LINGER, -1)
1039        if self.opts.get("ipc_mode", "") == "tcp":
1040            pull_uri = "tcp://127.0.0.1:{}".format(
1041                self.opts.get("tcp_master_publish_pull", 4514)
1042            )
1043        else:
1044            pull_uri = "ipc://{}".format(
1045                os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
1046            )
1047        log.debug("Connecting to pub server: %s", pull_uri)
1048        self.pub_sock.connect(pull_uri)
1049        return self._sock_data.sock
1050
1051    def pub_close(self):
1052        """
1053        Disconnect an existing publisher socket and remove it from the local
1054        thread's cache.
1055        """
1056        if hasattr(self._sock_data, "sock"):
1057            self._sock_data.sock.close()
1058            delattr(self._sock_data, "sock")
1059            if hasattr(self._sock_data, "_ctx"):
1060                self._sock_data._ctx.destroy()
1061
1062    def pack_publish(self, load):
1063        payload = {"enc": "aes"}
1064        load["serial"] = salt.master.SMaster.get_serial()
1065        crypticle = salt.crypt.Crypticle(
1066            self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
1067        )
1068        payload["load"] = crypticle.dumps(load)
1069        if self.opts["sign_pub_messages"]:
1070            master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem")
1071            log.debug("Signing data packet")
1072            payload["sig"] = salt.crypt.sign_message(master_pem_path, payload["load"])
1073        int_payload = {"payload": salt.payload.dumps(payload)}
1074
1075        # add some targeting stuff for lists only (for now)
1076        if load["tgt_type"] == "list":
1077            int_payload["topic_lst"] = load["tgt"]
1078
1079        # If zmq_filtering is enabled, target matching has to happen master side
1080        match_targets = ["pcre", "glob", "list"]
1081        if self.opts["zmq_filtering"] and load["tgt_type"] in match_targets:
1082            # Fetch a list of minions that match
1083            _res = self.ckminions.check_minions(load["tgt"], tgt_type=load["tgt_type"])
1084            match_ids = _res["minions"]
1085
1086            log.debug("Publish Side Match: %s", match_ids)
1087            # Send list of miions thru so zmq can target them
1088            int_payload["topic_lst"] = match_ids
1089        payload = salt.payload.dumps(int_payload)
1090        log.debug(
1091            "Sending payload to publish daemon. jid=%s size=%d",
1092            load.get("jid", None),
1093            len(payload),
1094        )
1095        return payload
1096
1097    def publish(self, load):
1098        """
1099        Publish "load" to minions. This send the load to the publisher daemon
1100        process with does the actual sending to minions.
1101
1102        :param dict load: A load to be sent across the wire to minions
1103        """
1104        if not self.pub_sock:
1105            self.pub_connect()
1106        self.pub_sock.send(salt.payload.dumps(load))
1107        log.debug("Sent payload to publish daemon.")
1108
1109
1110class AsyncReqMessageClientPool(salt.transport.MessageClientPool):
1111    """
1112    Wrapper class of AsyncReqMessageClientPool to avoid blocking waiting while writing data to socket.
1113    """
1114
1115    def __init__(self, opts, args=None, kwargs=None):
1116        self._closing = False
1117        super().__init__(AsyncReqMessageClient, opts, args=args, kwargs=kwargs)
1118
1119    def close(self):
1120        if self._closing:
1121            return
1122
1123        self._closing = True
1124        for message_client in self.message_clients:
1125            message_client.close()
1126        self.message_clients = []
1127
1128    def send(self, *args, **kwargs):
1129        message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue))
1130        return message_clients[0].send(*args, **kwargs)
1131
1132    def __enter__(self):
1133        return self
1134
1135    def __exit__(self, *args):
1136        self.close()
1137
1138
1139# TODO: unit tests!
1140class AsyncReqMessageClient:
1141    """
1142    This class wraps the underlying zeromq REQ socket and gives a future-based
1143    interface to sending and recieving messages. This works around the primary
1144    limitation of serialized send/recv on the underlying socket by queueing the
1145    message sends in this class. In the future if we decide to attempt to multiplex
1146    we can manage a pool of REQ/REP sockets-- but for now we'll just do them in serial
1147    """
1148
1149    def __init__(self, opts, addr, linger=0, io_loop=None):
1150        """
1151        Create an asynchronous message client
1152
1153        :param dict opts: The salt opts dictionary
1154        :param str addr: The interface IP address to bind to
1155        :param int linger: The number of seconds to linger on a ZMQ socket. See
1156                           http://api.zeromq.org/2-1:zmq-setsockopt [ZMQ_LINGER]
1157        :param IOLoop io_loop: A Tornado IOLoop event scheduler [tornado.ioloop.IOLoop]
1158        """
1159        self.opts = opts
1160        self.addr = addr
1161        self.linger = linger
1162        if io_loop is None:
1163            self.io_loop = salt.ext.tornado.ioloop.IOLoop.current()
1164        else:
1165            self.io_loop = io_loop
1166
1167        self.context = zmq.Context()
1168
1169        # wire up sockets
1170        self._init_socket()
1171
1172        self.send_queue = []
1173        # mapping of message -> future
1174        self.send_future_map = {}
1175
1176        self.send_timeout_map = {}  # message -> timeout
1177        self._closing = False
1178
1179    # TODO: timeout all in-flight sessions, or error
1180    def close(self):
1181        try:
1182            if self._closing:
1183                return
1184        except AttributeError:
1185            # We must have been called from __del__
1186            # The python interpreter has nuked most attributes already
1187            return
1188        else:
1189            self._closing = True
1190            if hasattr(self, "stream") and self.stream is not None:
1191                if ZMQ_VERSION_INFO < (14, 3, 0):
1192                    # stream.close() doesn't work properly on pyzmq < 14.3.0
1193                    if self.stream.socket:
1194                        self.stream.socket.close()
1195                    self.stream.io_loop.remove_handler(self.stream.socket)
1196                    # set this to None, more hacks for messed up pyzmq
1197                    self.stream.socket = None
1198                    self.socket.close()
1199                else:
1200                    self.stream.close()
1201                    self.socket = None
1202                self.stream = None
1203            if self.context.closed is False:
1204                self.context.term()
1205
1206    # pylint: disable=W1701
1207    def __del__(self):
1208        self.close()
1209
1210    # pylint: enable=W1701
1211
1212    def _init_socket(self):
1213        if hasattr(self, "stream"):
1214            self.stream.close()  # pylint: disable=E0203
1215            self.socket.close()  # pylint: disable=E0203
1216            del self.stream
1217            del self.socket
1218
1219        self.socket = self.context.socket(zmq.REQ)
1220
1221        # socket options
1222        if hasattr(zmq, "RECONNECT_IVL_MAX"):
1223            self.socket.setsockopt(zmq.RECONNECT_IVL_MAX, 5000)
1224
1225        _set_tcp_keepalive(self.socket, self.opts)
1226        if self.addr.startswith("tcp://["):
1227            # Hint PF type if bracket enclosed IPv6 address
1228            if hasattr(zmq, "IPV6"):
1229                self.socket.setsockopt(zmq.IPV6, 1)
1230            elif hasattr(zmq, "IPV4ONLY"):
1231                self.socket.setsockopt(zmq.IPV4ONLY, 0)
1232        self.socket.linger = self.linger
1233        log.debug("Trying to connect to: %s", self.addr)
1234        self.socket.connect(self.addr)
1235        self.stream = zmq.eventloop.zmqstream.ZMQStream(
1236            self.socket, io_loop=self.io_loop
1237        )
1238
1239    @salt.ext.tornado.gen.coroutine
1240    def _internal_send_recv(self):
1241        while len(self.send_queue) > 0:
1242            message = self.send_queue[0]
1243            future = self.send_future_map.get(message, None)
1244            if future is None:
1245                # Timedout
1246                del self.send_queue[0]
1247                continue
1248
1249            # send
1250            def mark_future(msg):
1251                if not future.done():
1252                    data = salt.payload.loads(msg[0])
1253                    future.set_result(data)
1254
1255            self.stream.on_recv(mark_future)
1256            self.stream.send(message)
1257
1258            try:
1259                ret = yield future
1260            except Exception as err:  # pylint: disable=broad-except
1261                log.debug("Re-init ZMQ socket: %s", err)
1262                self._init_socket()  # re-init the zmq socket (no other way in zmq)
1263                del self.send_queue[0]
1264                continue
1265            del self.send_queue[0]
1266            self.send_future_map.pop(message, None)
1267            self.remove_message_timeout(message)
1268
1269    def remove_message_timeout(self, message):
1270        if message not in self.send_timeout_map:
1271            return
1272        timeout = self.send_timeout_map.pop(message, None)
1273        if timeout is not None:
1274            # Hasn't been already timedout
1275            self.io_loop.remove_timeout(timeout)
1276
1277    def timeout_message(self, message):
1278        """
1279        Handle a message timeout by removing it from the sending queue
1280        and informing the caller
1281
1282        :raises: SaltReqTimeoutError
1283        """
1284        future = self.send_future_map.pop(message, None)
1285        # In a race condition the message might have been sent by the time
1286        # we're timing it out. Make sure the future is not None
1287        if future is not None:
1288            del self.send_timeout_map[message]
1289            if future.attempts < future.tries:
1290                future.attempts += 1
1291                log.debug(
1292                    "SaltReqTimeoutError, retrying. (%s/%s)",
1293                    future.attempts,
1294                    future.tries,
1295                )
1296                self.send(
1297                    message,
1298                    timeout=future.timeout,
1299                    tries=future.tries,
1300                    future=future,
1301                )
1302
1303            else:
1304                future.set_exception(SaltReqTimeoutError("Message timed out"))
1305
1306    def send(
1307        self, message, timeout=None, tries=3, future=None, callback=None, raw=False
1308    ):
1309        """
1310        Return a future which will be completed when the message has a response
1311        """
1312        if future is None:
1313            future = salt.ext.tornado.concurrent.Future()
1314            future.tries = tries
1315            future.attempts = 0
1316            future.timeout = timeout
1317            # if a future wasn't passed in, we need to serialize the message
1318            message = salt.payload.dumps(message)
1319        if callback is not None:
1320
1321            def handle_future(future):
1322                response = future.result()
1323                self.io_loop.add_callback(callback, response)
1324
1325            future.add_done_callback(handle_future)
1326        # Add this future to the mapping
1327        self.send_future_map[message] = future
1328
1329        if self.opts.get("detect_mode") is True:
1330            timeout = 1
1331
1332        if timeout is not None:
1333            send_timeout = self.io_loop.call_later(
1334                timeout, self.timeout_message, message
1335            )
1336            self.send_timeout_map[message] = send_timeout
1337
1338        if len(self.send_queue) == 0:
1339            self.io_loop.spawn_callback(self._internal_send_recv)
1340
1341        self.send_queue.append(message)
1342
1343        return future
1344
1345
1346class ZeroMQSocketMonitor:
1347    __EVENT_MAP = None
1348
1349    def __init__(self, socket):
1350        """
1351        Create ZMQ monitor sockets
1352
1353        More information:
1354            http://api.zeromq.org/4-0:zmq-socket-monitor
1355        """
1356        self._socket = socket
1357        self._monitor_socket = self._socket.get_monitor_socket()
1358        self._monitor_stream = None
1359
1360    def start_io_loop(self, io_loop):
1361        log.trace("Event monitor start!")
1362        self._monitor_stream = zmq.eventloop.zmqstream.ZMQStream(
1363            self._monitor_socket, io_loop=io_loop
1364        )
1365        self._monitor_stream.on_recv(self.monitor_callback)
1366
1367    def start_poll(self):
1368        log.trace("Event monitor start!")
1369        try:
1370            while self._monitor_socket is not None and self._monitor_socket.poll():
1371                msg = self._monitor_socket.recv_multipart()
1372                self.monitor_callback(msg)
1373        except (AttributeError, zmq.error.ContextTerminated):
1374            # We cannot log here because we'll get an interrupted system call in trying
1375            # to flush the logging buffer as we terminate
1376            pass
1377
1378    @property
1379    def event_map(self):
1380        if ZeroMQSocketMonitor.__EVENT_MAP is None:
1381            event_map = {}
1382            for name in dir(zmq):
1383                if name.startswith("EVENT_"):
1384                    value = getattr(zmq, name)
1385                    event_map[value] = name
1386            ZeroMQSocketMonitor.__EVENT_MAP = event_map
1387        return ZeroMQSocketMonitor.__EVENT_MAP
1388
1389    def monitor_callback(self, msg):
1390        evt = zmq.utils.monitor.parse_monitor_message(msg)
1391        evt["description"] = self.event_map[evt["event"]]
1392        log.debug("ZeroMQ event: %s", evt)
1393        if evt["event"] == zmq.EVENT_MONITOR_STOPPED:
1394            self.stop()
1395
1396    def stop(self):
1397        if self._socket is None:
1398            return
1399        self._socket.disable_monitor()
1400        self._socket = None
1401        self._monitor_socket = None
1402        if self._monitor_stream is not None:
1403            self._monitor_stream.close()
1404            self._monitor_stream = None
1405        log.trace("Event monitor done!")
1406