1"""
2TCP transport classes
3
4Wire protocol: "len(payload) msgpack({'head': SOMEHEADER, 'body': SOMEBODY})"
5
6"""
7import errno
8import logging
9import os
10import queue
11import socket
12import threading
13import time
14import traceback
15import urllib.parse
16import uuid
17
18import salt.crypt
19import salt.exceptions
20import salt.ext.tornado
21import salt.ext.tornado.concurrent
22import salt.ext.tornado.gen
23import salt.ext.tornado.iostream
24import salt.ext.tornado.netutil
25import salt.ext.tornado.tcpclient
26import salt.ext.tornado.tcpserver
27import salt.payload
28import salt.transport.client
29import salt.transport.frame
30import salt.transport.ipc
31import salt.transport.mixins.auth
32import salt.transport.server
33import salt.utils.asynchronous
34import salt.utils.event
35import salt.utils.files
36import salt.utils.msgpack
37import salt.utils.platform
38import salt.utils.process
39import salt.utils.verify
40import salt.utils.versions
41from salt.exceptions import SaltClientError, SaltReqTimeoutError
42from salt.transport import iter_transport_opts
43
44try:
45    from M2Crypto import RSA
46
47    HAS_M2 = True
48except ImportError:
49    HAS_M2 = False
50    try:
51        from Cryptodome.Cipher import PKCS1_OAEP
52    except ImportError:
53        from Crypto.Cipher import PKCS1_OAEP  # nosec
54
55if salt.utils.platform.is_windows():
56    USE_LOAD_BALANCER = True
57else:
58    USE_LOAD_BALANCER = False
59
60if USE_LOAD_BALANCER:
61    import threading
62    import multiprocessing
63    import salt.ext.tornado.util
64    from salt.utils.process import SignalHandlingProcess
65
66log = logging.getLogger(__name__)
67
68
69def _set_tcp_keepalive(sock, opts):
70    """
71    Ensure that TCP keepalives are set for the socket.
72    """
73    if hasattr(socket, "SO_KEEPALIVE"):
74        if opts.get("tcp_keepalive", False):
75            sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
76            if hasattr(socket, "SOL_TCP"):
77                if hasattr(socket, "TCP_KEEPIDLE"):
78                    tcp_keepalive_idle = opts.get("tcp_keepalive_idle", -1)
79                    if tcp_keepalive_idle > 0:
80                        sock.setsockopt(
81                            socket.SOL_TCP, socket.TCP_KEEPIDLE, int(tcp_keepalive_idle)
82                        )
83                if hasattr(socket, "TCP_KEEPCNT"):
84                    tcp_keepalive_cnt = opts.get("tcp_keepalive_cnt", -1)
85                    if tcp_keepalive_cnt > 0:
86                        sock.setsockopt(
87                            socket.SOL_TCP, socket.TCP_KEEPCNT, int(tcp_keepalive_cnt)
88                        )
89                if hasattr(socket, "TCP_KEEPINTVL"):
90                    tcp_keepalive_intvl = opts.get("tcp_keepalive_intvl", -1)
91                    if tcp_keepalive_intvl > 0:
92                        sock.setsockopt(
93                            socket.SOL_TCP,
94                            socket.TCP_KEEPINTVL,
95                            int(tcp_keepalive_intvl),
96                        )
97            if hasattr(socket, "SIO_KEEPALIVE_VALS"):
98                # Windows doesn't support TCP_KEEPIDLE, TCP_KEEPCNT, nor
99                # TCP_KEEPINTVL. Instead, it has its own proprietary
100                # SIO_KEEPALIVE_VALS.
101                tcp_keepalive_idle = opts.get("tcp_keepalive_idle", -1)
102                tcp_keepalive_intvl = opts.get("tcp_keepalive_intvl", -1)
103                # Windows doesn't support changing something equivalent to
104                # TCP_KEEPCNT.
105                if tcp_keepalive_idle > 0 or tcp_keepalive_intvl > 0:
106                    # Windows defaults may be found by using the link below.
107                    # Search for 'KeepAliveTime' and 'KeepAliveInterval'.
108                    # https://technet.microsoft.com/en-us/library/bb726981.aspx#EDAA
109                    # If one value is set and the other isn't, we still need
110                    # to send both values to SIO_KEEPALIVE_VALS and they both
111                    # need to be valid. So in that case, use the Windows
112                    # default.
113                    if tcp_keepalive_idle <= 0:
114                        tcp_keepalive_idle = 7200
115                    if tcp_keepalive_intvl <= 0:
116                        tcp_keepalive_intvl = 1
117                    # The values expected are in milliseconds, so multiply by
118                    # 1000.
119                    sock.ioctl(
120                        socket.SIO_KEEPALIVE_VALS,
121                        (
122                            1,
123                            int(tcp_keepalive_idle * 1000),
124                            int(tcp_keepalive_intvl * 1000),
125                        ),
126                    )
127        else:
128            sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 0)
129
130
131if USE_LOAD_BALANCER:
132
133    class LoadBalancerServer(SignalHandlingProcess):
134        """
135        Raw TCP server which runs in its own process and will listen
136        for incoming connections. Each incoming connection will be
137        sent via multiprocessing queue to the workers.
138        Since the queue is shared amongst workers, only one worker will
139        handle a given connection.
140        """
141
142        # TODO: opts!
143        # Based on default used in salt.ext.tornado.netutil.bind_sockets()
144        backlog = 128
145
146        def __init__(self, opts, socket_queue, **kwargs):
147            super().__init__(**kwargs)
148            self.opts = opts
149            self.socket_queue = socket_queue
150            self._socket = None
151
152        def close(self):
153            if self._socket is not None:
154                self._socket.shutdown(socket.SHUT_RDWR)
155                self._socket.close()
156                self._socket = None
157
158        # pylint: disable=W1701
159        def __del__(self):
160            self.close()
161
162        # pylint: enable=W1701
163
164        def run(self):
165            """
166            Start the load balancer
167            """
168            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
169            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
170            _set_tcp_keepalive(self._socket, self.opts)
171            self._socket.setblocking(1)
172            self._socket.bind((self.opts["interface"], int(self.opts["ret_port"])))
173            self._socket.listen(self.backlog)
174
175            while True:
176                try:
177                    # Wait for a connection to occur since the socket is
178                    # blocking.
179                    connection, address = self._socket.accept()
180                    # Wait for a free slot to be available to put
181                    # the connection into.
182                    # Sockets are picklable on Windows in Python 3.
183                    self.socket_queue.put((connection, address), True, None)
184                except OSError as e:
185                    # ECONNABORTED indicates that there was a connection
186                    # but it was closed while still in the accept queue.
187                    # (observed on FreeBSD).
188                    if (
189                        salt.ext.tornado.util.errno_from_exception(e)
190                        == errno.ECONNABORTED
191                    ):
192                        continue
193                    raise
194
195
196# TODO: move serial down into message library
197class AsyncTCPReqChannel(salt.transport.client.ReqChannel):
198    """
199    Encapsulate sending routines to tcp.
200
201    Note: this class returns a singleton
202    """
203
204    async_methods = [
205        "crypted_transfer_decode_dictentry",
206        "_crypted_transfer",
207        "_uncrypted_transfer",
208        "send",
209    ]
210    close_methods = [
211        "close",
212    ]
213
214    def __init__(self, opts, **kwargs):
215        self.opts = dict(opts)
216        if "master_uri" in kwargs:
217            self.opts["master_uri"] = kwargs["master_uri"]
218
219        # crypt defaults to 'aes'
220        self.crypt = kwargs.get("crypt", "aes")
221
222        self.io_loop = kwargs.get("io_loop") or salt.ext.tornado.ioloop.IOLoop.current()
223
224        if self.crypt != "clear":
225            self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
226
227        resolver = kwargs.get("resolver")
228
229        parse = urllib.parse.urlparse(self.opts["master_uri"])
230        master_host, master_port = parse.netloc.rsplit(":", 1)
231        self.master_addr = (master_host, int(master_port))
232        self._closing = False
233        self.message_client = SaltMessageClientPool(
234            self.opts,
235            args=(
236                self.opts,
237                master_host,
238                int(master_port),
239            ),
240            kwargs={
241                "io_loop": self.io_loop,
242                "resolver": resolver,
243                "source_ip": self.opts.get("source_ip"),
244                "source_port": self.opts.get("source_ret_port"),
245            },
246        )
247
248    def close(self):
249        if self._closing:
250            return
251        log.debug("Closing %s instance", self.__class__.__name__)
252        self._closing = True
253        self.message_client.close()
254
255    # pylint: disable=W1701
256    def __del__(self):
257        try:
258            self.close()
259        except OSError as exc:
260            if exc.errno != errno.EBADF:
261                # If its not a bad file descriptor error, raise
262                raise
263
264    # pylint: enable=W1701
265
266    def _package_load(self, load):
267        return {
268            "enc": self.crypt,
269            "load": load,
270            "version": 2,
271        }
272
273    @salt.ext.tornado.gen.coroutine
274    def crypted_transfer_decode_dictentry(
275        self, load, dictkey=None, tries=3, timeout=60
276    ):
277        nonce = uuid.uuid4().hex
278        load["nonce"] = nonce
279        if not self.auth.authenticated:
280            yield self.auth.authenticate()
281        ret = yield self.message_client.send(
282            self._package_load(self.auth.crypticle.dumps(load)),
283            timeout=timeout,
284            tries=tries,
285        )
286        key = self.auth.get_keys()
287        if HAS_M2:
288            aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
289        else:
290            cipher = PKCS1_OAEP.new(key)
291            aes = cipher.decrypt(ret["key"])
292
293        # Decrypt using the public key.
294        pcrypt = salt.crypt.Crypticle(self.opts, aes)
295        signed_msg = pcrypt.loads(ret[dictkey])
296
297        # Validate the master's signature.
298        master_pubkey_path = os.path.join(self.opts["pki_dir"], "minion_master.pub")
299        if not salt.crypt.verify_signature(
300            master_pubkey_path, signed_msg["data"], signed_msg["sig"]
301        ):
302            raise salt.crypt.AuthenticationError(
303                "Pillar payload signature failed to validate."
304            )
305
306        # Make sure the signed key matches the key we used to decrypt the data.
307        data = salt.payload.loads(signed_msg["data"])
308        if data["key"] != ret["key"]:
309            raise salt.crypt.AuthenticationError("Key verification failed.")
310
311        # Validate the nonce.
312        if data["nonce"] != nonce:
313            raise salt.crypt.AuthenticationError("Pillar nonce verification failed.")
314        raise salt.ext.tornado.gen.Return(data["pillar"])
315
316    @salt.ext.tornado.gen.coroutine
317    def _crypted_transfer(self, load, tries=3, timeout=60):
318        """
319        In case of authentication errors, try to renegotiate authentication
320        and retry the method.
321        Indeed, we can fail too early in case of a master restart during a
322        minion state execution call
323        """
324        nonce = uuid.uuid4().hex
325        if load and isinstance(load, dict):
326            load["nonce"] = nonce
327
328        @salt.ext.tornado.gen.coroutine
329        def _do_transfer():
330            data = yield self.message_client.send(
331                self._package_load(self.auth.crypticle.dumps(load)),
332                timeout=timeout,
333                tries=tries,
334            )
335            # we may not have always data
336            # as for example for saltcall ret submission, this is a blind
337            # communication, we do not subscribe to return events, we just
338            # upload the results to the master
339            if data:
340                data = self.auth.crypticle.loads(data, nonce=nonce)
341                data = salt.transport.frame.decode_embedded_strs(data)
342            raise salt.ext.tornado.gen.Return(data)
343
344        if not self.auth.authenticated:
345            yield self.auth.authenticate()
346        try:
347            ret = yield _do_transfer()
348            raise salt.ext.tornado.gen.Return(ret)
349        except salt.crypt.AuthenticationError:
350            yield self.auth.authenticate()
351            ret = yield _do_transfer()
352            raise salt.ext.tornado.gen.Return(ret)
353
354    @salt.ext.tornado.gen.coroutine
355    def _uncrypted_transfer(self, load, tries=3, timeout=60):
356        ret = yield self.message_client.send(
357            self._package_load(load),
358            timeout=timeout,
359            tries=tries,
360        )
361
362        raise salt.ext.tornado.gen.Return(ret)
363
364    @salt.ext.tornado.gen.coroutine
365    def send(self, load, tries=3, timeout=60, raw=False):
366        """
367        Send a request, return a future which will complete when we send the message
368        """
369        try:
370            if self.crypt == "clear":
371                ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout)
372            else:
373                ret = yield self._crypted_transfer(load, tries=tries, timeout=timeout)
374        except salt.ext.tornado.iostream.StreamClosedError:
375            # Convert to 'SaltClientError' so that clients can handle this
376            # exception more appropriately.
377            raise SaltClientError("Connection to master lost")
378        raise salt.ext.tornado.gen.Return(ret)
379
380
381class AsyncTCPPubChannel(
382    salt.transport.mixins.auth.AESPubClientMixin, salt.transport.client.AsyncPubChannel
383):
384    async_methods = [
385        "send_id",
386        "connect_callback",
387        "connect",
388    ]
389    close_methods = [
390        "close",
391    ]
392
393    def __init__(self, opts, **kwargs):
394        self.opts = opts
395        self.crypt = kwargs.get("crypt", "aes")
396        self.io_loop = kwargs.get("io_loop") or salt.ext.tornado.ioloop.IOLoop.current()
397        self.connected = False
398        self._closing = False
399        self._reconnected = False
400        self.message_client = None
401        self.event = salt.utils.event.get_event("minion", opts=self.opts, listen=False)
402
403    def close(self):
404        if self._closing:
405            return
406        self._closing = True
407        if self.message_client is not None:
408            self.message_client.close()
409            self.message_client = None
410        if self.event is not None:
411            self.event.destroy()
412            self.event = None
413
414    # pylint: disable=W1701
415    def __del__(self):
416        self.close()
417
418    # pylint: enable=W1701
419
420    def _package_load(self, load):
421        return {
422            "enc": self.crypt,
423            "load": load,
424            "version": 2,
425        }
426
427    @salt.ext.tornado.gen.coroutine
428    def send_id(self, tok, force_auth):
429        """
430        Send the minion id to the master so that the master may better
431        track the connection state of the minion.
432        In case of authentication errors, try to renegotiate authentication
433        and retry the method.
434        """
435        load = {"id": self.opts["id"], "tok": tok}
436
437        @salt.ext.tornado.gen.coroutine
438        def _do_transfer():
439            msg = self._package_load(self.auth.crypticle.dumps(load))
440            package = salt.transport.frame.frame_msg(msg, header=None)
441            yield self.message_client.write_to_stream(package)
442            raise salt.ext.tornado.gen.Return(True)
443
444        if force_auth or not self.auth.authenticated:
445            count = 0
446            while (
447                count <= self.opts["tcp_authentication_retries"]
448                or self.opts["tcp_authentication_retries"] < 0
449            ):
450                try:
451                    yield self.auth.authenticate()
452                    break
453                except SaltClientError as exc:
454                    log.debug(exc)
455                    count += 1
456        try:
457            ret = yield _do_transfer()
458            raise salt.ext.tornado.gen.Return(ret)
459        except salt.crypt.AuthenticationError:
460            yield self.auth.authenticate()
461            ret = yield _do_transfer()
462            raise salt.ext.tornado.gen.Return(ret)
463
464    @salt.ext.tornado.gen.coroutine
465    def connect_callback(self, result):
466        if self._closing:
467            return
468        # Force re-auth on reconnect since the master
469        # may have been restarted
470        yield self.send_id(self.tok, self._reconnected)
471        self.connected = True
472        self.event.fire_event({"master": self.opts["master"]}, "__master_connected")
473        if self._reconnected:
474            # On reconnects, fire a master event to notify that the minion is
475            # available.
476            if self.opts.get("__role") == "syndic":
477                data = "Syndic {} started at {}".format(self.opts["id"], time.asctime())
478                tag = salt.utils.event.tagify([self.opts["id"], "start"], "syndic")
479            else:
480                data = "Minion {} started at {}".format(self.opts["id"], time.asctime())
481                tag = salt.utils.event.tagify([self.opts["id"], "start"], "minion")
482            load = {
483                "id": self.opts["id"],
484                "cmd": "_minion_event",
485                "pretag": None,
486                "tok": self.tok,
487                "data": data,
488                "tag": tag,
489            }
490            req_channel = salt.utils.asynchronous.SyncWrapper(
491                AsyncTCPReqChannel,
492                (self.opts,),
493                loop_kwarg="io_loop",
494            )
495            try:
496                req_channel.send(load, timeout=60)
497            except salt.exceptions.SaltReqTimeoutError:
498                log.info(
499                    "fire_master failed: master could not be contacted. Request timed"
500                    " out."
501                )
502            except Exception:  # pylint: disable=broad-except
503                log.info("fire_master failed: %s", traceback.format_exc())
504            finally:
505                # SyncWrapper will call either close() or destroy(), whichever is available
506                del req_channel
507        else:
508            self._reconnected = True
509
510    def disconnect_callback(self):
511        if self._closing:
512            return
513        self.connected = False
514        self.event.fire_event({"master": self.opts["master"]}, "__master_disconnected")
515
516    @salt.ext.tornado.gen.coroutine
517    def connect(self):
518        try:
519            self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop)
520            self.tok = self.auth.gen_token(b"salt")
521            if not self.auth.authenticated:
522                yield self.auth.authenticate()
523            if self.auth.authenticated:
524                # if this is changed from the default, we assume it was intentional
525                if int(self.opts.get("publish_port", 4505)) != 4505:
526                    self.publish_port = self.opts.get("publish_port")
527                # else take the relayed publish_port master reports
528                else:
529                    self.publish_port = self.auth.creds["publish_port"]
530
531                self.message_client = SaltMessageClientPool(
532                    self.opts,
533                    args=(self.opts, self.opts["master_ip"], int(self.publish_port)),
534                    kwargs={
535                        "io_loop": self.io_loop,
536                        "connect_callback": self.connect_callback,
537                        "disconnect_callback": self.disconnect_callback,
538                        "source_ip": self.opts.get("source_ip"),
539                        "source_port": self.opts.get("source_publish_port"),
540                    },
541                )
542                yield self.message_client.connect()  # wait for the client to be connected
543                self.connected = True
544        # TODO: better exception handling...
545        except KeyboardInterrupt:  # pylint: disable=try-except-raise
546            raise
547        except Exception as exc:  # pylint: disable=broad-except
548            if "-|RETRY|-" not in str(exc):
549                raise SaltClientError(
550                    "Unable to sign_in to master: {}".format(exc)
551                )  # TODO: better error message
552
553    def on_recv(self, callback):
554        """
555        Register an on_recv callback
556        """
557        if callback is None:
558            return self.message_client.on_recv(callback)
559
560        @salt.ext.tornado.gen.coroutine
561        def wrap_callback(body):
562            if not isinstance(body, dict):
563                # TODO: For some reason we need to decode here for things
564                #       to work. Fix this.
565                body = salt.utils.msgpack.loads(body)
566                body = salt.transport.frame.decode_embedded_strs(body)
567            ret = yield self._decode_payload(body)
568            callback(ret)
569
570        return self.message_client.on_recv(wrap_callback)
571
572
573class TCPReqServerChannel(
574    salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel
575):
576    # TODO: opts!
577    backlog = 5
578
579    def __init__(self, opts):
580        salt.transport.server.ReqServerChannel.__init__(self, opts)
581        self._socket = None
582        self.req_server = None
583
584    @property
585    def socket(self):
586        return self._socket
587
588    def close(self):
589        if self._socket is not None:
590            try:
591                self._socket.shutdown(socket.SHUT_RDWR)
592            except OSError as exc:
593                if exc.errno == errno.ENOTCONN:
594                    # We may try to shutdown a socket which is already disconnected.
595                    # Ignore this condition and continue.
596                    pass
597                else:
598                    raise
599            if self.req_server is None:
600                # We only close the socket if we don't have a req_server instance.
601                # If we did, because the req_server is also handling this socket, when we call
602                # req_server.stop(), tornado will give us an AssertionError because it's trying to
603                # match the socket.fileno() (after close it's -1) to the fd it holds on it's _sockets cache
604                # so it can remove the socket from the IOLoop handlers
605                self._socket.close()
606            self._socket = None
607        if self.req_server is not None:
608            try:
609                self.req_server.close()
610            except OSError as exc:
611                if exc.errno != 9:
612                    raise
613                log.exception(
614                    "TCPReqServerChannel close generated an exception: %s", str(exc)
615                )
616            self.req_server = None
617
618    # pylint: disable=W1701
619    def __del__(self):
620        self.close()
621
622    # pylint: enable=W1701
623
624    def __enter__(self):
625        return self
626
627    def __exit__(self, *args):
628        self.close()
629
630    def pre_fork(self, process_manager):
631        """
632        Pre-fork we need to create the zmq router device
633        """
634        salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager)
635        if USE_LOAD_BALANCER:
636            self.socket_queue = multiprocessing.Queue()
637            process_manager.add_process(
638                LoadBalancerServer, args=(self.opts, self.socket_queue)
639            )
640        elif not salt.utils.platform.is_windows():
641            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
642            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
643            _set_tcp_keepalive(self._socket, self.opts)
644            self._socket.setblocking(0)
645            self._socket.bind((self.opts["interface"], int(self.opts["ret_port"])))
646
647    def post_fork(self, payload_handler, io_loop):
648        """
649        After forking we need to create all of the local sockets to listen to the
650        router
651
652        payload_handler: function to call with your payloads
653        """
654        if self.opts["pub_server_niceness"] and not salt.utils.platform.is_windows():
655            log.info(
656                "setting Publish daemon niceness to %i",
657                self.opts["pub_server_niceness"],
658            )
659            os.nice(self.opts["pub_server_niceness"])
660
661        self.payload_handler = payload_handler
662        self.io_loop = io_loop
663        with salt.utils.asynchronous.current_ioloop(self.io_loop):
664            if USE_LOAD_BALANCER:
665                self.req_server = LoadBalancerWorker(
666                    self.socket_queue,
667                    self.handle_message,
668                    ssl_options=self.opts.get("ssl"),
669                )
670            else:
671                if salt.utils.platform.is_windows():
672                    self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
673                    self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
674                    _set_tcp_keepalive(self._socket, self.opts)
675                    self._socket.setblocking(0)
676                    self._socket.bind(
677                        (self.opts["interface"], int(self.opts["ret_port"]))
678                    )
679                self.req_server = SaltMessageServer(
680                    self.handle_message,
681                    ssl_options=self.opts.get("ssl"),
682                    io_loop=self.io_loop,
683                )
684                self.req_server.add_socket(self._socket)
685                self._socket.listen(self.backlog)
686        salt.transport.mixins.auth.AESReqServerMixin.post_fork(
687            self, payload_handler, io_loop
688        )
689
690    @salt.ext.tornado.gen.coroutine
691    def handle_message(self, stream, header, payload):
692        """
693        Handle incoming messages from underlying tcp streams
694        """
695        try:
696            try:
697                payload = self._decode_payload(payload)
698            except Exception:  # pylint: disable=broad-except
699                stream.write(salt.transport.frame.frame_msg("bad load", header=header))
700                raise salt.ext.tornado.gen.Return()
701
702            # TODO helper functions to normalize payload?
703            if not isinstance(payload, dict) or not isinstance(
704                payload.get("load"), dict
705            ):
706                yield stream.write(
707                    salt.transport.frame.frame_msg(
708                        "payload and load must be a dict", header=header
709                    )
710                )
711                raise salt.ext.tornado.gen.Return()
712
713            try:
714                id_ = payload["load"].get("id", "")
715                if "\0" in id_:
716                    log.error("Payload contains an id with a null byte: %s", payload)
717                    stream.send(salt.payload.dumps("bad load: id contains a null byte"))
718                    raise salt.ext.tornado.gen.Return()
719            except TypeError:
720                log.error("Payload contains non-string id: %s", payload)
721                stream.send(
722                    salt.payload.dumps("bad load: id {} is not a string".format(id_))
723                )
724                raise salt.ext.tornado.gen.Return()
725
726            version = 0
727            if "version" in payload:
728                version = payload["version"]
729
730            sign_messages = False
731            if version > 1:
732                sign_messages = True
733
734            # intercept the "_auth" commands, since the main daemon shouldn't know
735            # anything about our key auth
736            if (
737                payload["enc"] == "clear"
738                and payload.get("load", {}).get("cmd") == "_auth"
739            ):
740                yield stream.write(
741                    salt.transport.frame.frame_msg(
742                        self._auth(payload["load"], sign_messages), header=header
743                    )
744                )
745                raise salt.ext.tornado.gen.Return()
746
747            nonce = None
748            if version > 1:
749                nonce = payload["load"].pop("nonce", None)
750
751            # TODO: test
752            try:
753                ret, req_opts = yield self.payload_handler(payload)
754            except Exception as e:  # pylint: disable=broad-except
755                # always attempt to return an error to the minion
756                stream.write("Some exception handling minion payload")
757                log.error(
758                    "Some exception handling a payload from minion", exc_info=True
759                )
760                stream.close()
761                raise salt.ext.tornado.gen.Return()
762
763            req_fun = req_opts.get("fun", "send")
764            if req_fun == "send_clear":
765                stream.write(salt.transport.frame.frame_msg(ret, header=header))
766            elif req_fun == "send":
767                stream.write(
768                    salt.transport.frame.frame_msg(
769                        self.crypticle.dumps(ret, nonce), header=header
770                    )
771                )
772            elif req_fun == "send_private":
773                stream.write(
774                    salt.transport.frame.frame_msg(
775                        self._encrypt_private(
776                            ret,
777                            req_opts["key"],
778                            req_opts["tgt"],
779                            nonce,
780                            sign_messages,
781                        ),
782                        header=header,
783                    )
784                )
785            else:
786                log.error("Unknown req_fun %s", req_fun)
787                # always attempt to return an error to the minion
788                stream.write("Server-side exception handling payload")
789                stream.close()
790        except salt.ext.tornado.gen.Return:
791            raise
792        except salt.ext.tornado.iostream.StreamClosedError:
793            # Stream was closed. This could happen if the remote side
794            # closed the connection on its end (eg in a timeout or shutdown
795            # situation).
796            log.error("Connection was unexpectedly closed", exc_info=True)
797        except Exception as exc:  # pylint: disable=broad-except
798            # Absorb any other exceptions
799            log.error("Unexpected exception occurred: %s", exc, exc_info=True)
800
801        raise salt.ext.tornado.gen.Return()
802
803
804class SaltMessageServer(salt.ext.tornado.tcpserver.TCPServer):
805    """
806    Raw TCP server which will receive all of the TCP streams and re-assemble
807    messages that are sent through to us
808    """
809
810    def __init__(self, message_handler, *args, **kwargs):
811        io_loop = (
812            kwargs.pop("io_loop", None) or salt.ext.tornado.ioloop.IOLoop.current()
813        )
814        self._closing = False
815        super().__init__(*args, **kwargs)
816        self.io_loop = io_loop
817        self.clients = []
818        self.message_handler = message_handler
819
820    @salt.ext.tornado.gen.coroutine
821    def handle_stream(self, stream, address):
822        """
823        Handle incoming streams and add messages to the incoming queue
824        """
825        log.trace("Req client %s connected", address)
826        self.clients.append((stream, address))
827        unpacker = salt.utils.msgpack.Unpacker()
828        try:
829            while True:
830                wire_bytes = yield stream.read_bytes(4096, partial=True)
831                unpacker.feed(wire_bytes)
832                for framed_msg in unpacker:
833                    framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg)
834                    header = framed_msg["head"]
835                    self.io_loop.spawn_callback(
836                        self.message_handler, stream, header, framed_msg["body"]
837                    )
838        except salt.ext.tornado.iostream.StreamClosedError:
839            log.trace("req client disconnected %s", address)
840            self.remove_client((stream, address))
841        except Exception as e:  # pylint: disable=broad-except
842            log.trace("other master-side exception: %s", e)
843            self.remove_client((stream, address))
844            stream.close()
845
846    def remove_client(self, client):
847        try:
848            self.clients.remove(client)
849        except ValueError:
850            log.trace("Message server client was not in list to remove")
851
852    def shutdown(self):
853        """
854        Shutdown the whole server
855        """
856        salt.utils.versions.warn_until(
857            "Phosphorus",
858            "Please stop calling {0}.{1}.shutdown() and instead call {0}.{1}.close()".format(
859                __name__, self.__class__.__name__
860            ),
861        )
862        self.close()
863
864    def close(self):
865        """
866        Close the server
867        """
868        if self._closing:
869            return
870        self._closing = True
871        for item in self.clients:
872            client, address = item
873            client.close()
874            self.remove_client(item)
875        try:
876            self.stop()
877        except OSError as exc:
878            if exc.errno != 9:
879                raise
880
881
882if USE_LOAD_BALANCER:
883
884    class LoadBalancerWorker(SaltMessageServer):
885        """
886        This will receive TCP connections from 'LoadBalancerServer' via
887        a multiprocessing queue.
888        Since the queue is shared amongst workers, only one worker will handle
889        a given connection.
890        """
891
892        def __init__(self, socket_queue, message_handler, *args, **kwargs):
893            super().__init__(message_handler, *args, **kwargs)
894            self.socket_queue = socket_queue
895            self._stop = threading.Event()
896            self.thread = threading.Thread(target=self.socket_queue_thread)
897            self.thread.start()
898
899        def stop(self):
900            salt.utils.versions.warn_until(
901                "Phosphorus",
902                "Please stop calling {0}.{1}.stop() and instead call {0}.{1}.close()".format(
903                    __name__, self.__class__.__name__
904                ),
905            )
906            self.close()
907
908        def close(self):
909            self._stop.set()
910            self.thread.join()
911            super().close()
912
913        def socket_queue_thread(self):
914            try:
915                while True:
916                    try:
917                        client_socket, address = self.socket_queue.get(True, 1)
918                    except queue.Empty:
919                        if self._stop.is_set():
920                            break
921                        continue
922                    # 'self.io_loop' initialized in super class
923                    # 'salt.ext.tornado.tcpserver.TCPServer'.
924                    # 'self._handle_connection' defined in same super class.
925                    self.io_loop.spawn_callback(
926                        self._handle_connection, client_socket, address
927                    )
928            except (KeyboardInterrupt, SystemExit):
929                pass
930
931
932class TCPClientKeepAlive(salt.ext.tornado.tcpclient.TCPClient):
933    """
934    Override _create_stream() in TCPClient to enable keep alive support.
935    """
936
937    def __init__(self, opts, resolver=None):
938        self.opts = opts
939        super().__init__(resolver=resolver)
940
941    def _create_stream(
942        self, max_buffer_size, af, addr, **kwargs
943    ):  # pylint: disable=unused-argument,arguments-differ
944        """
945        Override _create_stream() in TCPClient.
946
947        Tornado 4.5 added the kwargs 'source_ip' and 'source_port'.
948        Due to this, use **kwargs to swallow these and any future
949        kwargs to maintain compatibility.
950        """
951        # Always connect in plaintext; we'll convert to ssl if necessary
952        # after one connection has completed.
953        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
954        _set_tcp_keepalive(sock, self.opts)
955        stream = salt.ext.tornado.iostream.IOStream(
956            sock, max_buffer_size=max_buffer_size
957        )
958        if salt.ext.tornado.version_info < (5,):
959            return stream.connect(addr)
960        return stream, stream.connect(addr)
961
962
963class SaltMessageClientPool(salt.transport.MessageClientPool):
964    """
965    Wrapper class of SaltMessageClient to avoid blocking waiting while writing data to socket.
966    """
967
968    def __init__(self, opts, args=None, kwargs=None):
969        super().__init__(SaltMessageClient, opts, args=args, kwargs=kwargs)
970
971    def __enter__(self):
972        return self
973
974    def __exit__(self, *args):
975        self.close()
976
977    # pylint: disable=W1701
978    def __del__(self):
979        self.close()
980
981    # pylint: enable=W1701
982
983    def close(self):
984        for message_client in self.message_clients:
985            message_client.close()
986        self.message_clients = []
987
988    @salt.ext.tornado.gen.coroutine
989    def connect(self):
990        futures = []
991        for message_client in self.message_clients:
992            futures.append(message_client.connect())
993        yield futures
994        raise salt.ext.tornado.gen.Return(None)
995
996    def on_recv(self, *args, **kwargs):
997        for message_client in self.message_clients:
998            message_client.on_recv(*args, **kwargs)
999
1000    def send(self, *args, **kwargs):
1001        message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue))
1002        return message_clients[0].send(*args, **kwargs)
1003
1004    def write_to_stream(self, *args, **kwargs):
1005        message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue))
1006        return message_clients[0]._stream.write(*args, **kwargs)
1007
1008
1009# TODO consolidate with IPCClient
1010# TODO: limit in-flight messages.
1011# TODO: singleton? Something to not re-create the tcp connection so much
1012class SaltMessageClient:
1013    """
1014    Low-level message sending client
1015    """
1016
1017    def __init__(
1018        self,
1019        opts,
1020        host,
1021        port,
1022        io_loop=None,
1023        resolver=None,
1024        connect_callback=None,
1025        disconnect_callback=None,
1026        source_ip=None,
1027        source_port=None,
1028    ):
1029        self.opts = opts
1030        self.host = host
1031        self.port = port
1032        self.source_ip = source_ip
1033        self.source_port = source_port
1034        self.connect_callback = connect_callback
1035        self.disconnect_callback = disconnect_callback
1036
1037        self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current()
1038
1039        with salt.utils.asynchronous.current_ioloop(self.io_loop):
1040            self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver)
1041
1042        self._mid = 1
1043        self._max_messages = int((1 << 31) - 2)  # number of IDs before we wrap
1044
1045        # TODO: max queue size
1046        self.send_queue = []  # queue of messages to be sent
1047        self.send_future_map = {}  # mapping of request_id -> Future
1048        self.send_timeout_map = {}  # request_id -> timeout_callback
1049
1050        self._read_until_future = None
1051        self._on_recv = None
1052        self._closing = False
1053        self._connecting_future = self.connect()
1054        self._stream_return_future = salt.ext.tornado.concurrent.Future()
1055        self.io_loop.spawn_callback(self._stream_return)
1056
1057        self.backoff = opts.get("tcp_reconnect_backoff", 1)
1058
1059    def _stop_io_loop(self):
1060        if self.io_loop is not None:
1061            self.io_loop.stop()
1062
1063    # TODO: timeout inflight sessions
1064    def close(self):
1065        if self._closing:
1066            return
1067        self._closing = True
1068        if hasattr(self, "_stream") and not self._stream.closed():
1069            # If _stream_return() hasn't completed, it means the IO
1070            # Loop is stopped (such as when using
1071            # 'salt.utils.asynchronous.SyncWrapper'). Ensure that
1072            # _stream_return() completes by restarting the IO Loop.
1073            # This will prevent potential errors on shutdown.
1074            try:
1075                orig_loop = salt.ext.tornado.ioloop.IOLoop.current()
1076                self.io_loop.make_current()
1077                self._stream.close()
1078                if self._read_until_future is not None:
1079                    # This will prevent this message from showing up:
1080                    # '[ERROR   ] Future exception was never retrieved:
1081                    # StreamClosedError'
1082                    # This happens because the logic is always waiting to read
1083                    # the next message and the associated read future is marked
1084                    # 'StreamClosedError' when the stream is closed.
1085                    if self._read_until_future.done():
1086                        self._read_until_future.exception()
1087                    if (
1088                        self.io_loop
1089                        != salt.ext.tornado.ioloop.IOLoop.current(instance=False)
1090                        or not self._stream_return_future.done()
1091                    ):
1092                        self.io_loop.add_future(
1093                            self._stream_return_future,
1094                            lambda future: self._stop_io_loop(),
1095                        )
1096                        self.io_loop.start()
1097            except Exception as e:  # pylint: disable=broad-except
1098                log.info("Exception caught in SaltMessageClient.close: %s", str(e))
1099            finally:
1100                orig_loop.make_current()
1101        self._tcp_client.close()
1102        self.io_loop = None
1103        self._read_until_future = None
1104        # Clear callback references to allow the object that they belong to
1105        # to be deleted.
1106        self.connect_callback = None
1107        self.disconnect_callback = None
1108
1109    # pylint: disable=W1701
1110    def __del__(self):
1111        self.close()
1112
1113    # pylint: enable=W1701
1114
1115    def connect(self):
1116        """
1117        Ask for this client to reconnect to the origin
1118        """
1119        if hasattr(self, "_connecting_future") and not self._connecting_future.done():
1120            future = self._connecting_future
1121        else:
1122            future = salt.ext.tornado.concurrent.Future()
1123            self._connecting_future = future
1124            self.io_loop.add_callback(self._connect)
1125
1126            # Add the callback only when a new future is created
1127            if self.connect_callback is not None:
1128
1129                def handle_future(future):
1130                    response = future.result()
1131                    self.io_loop.add_callback(self.connect_callback, response)
1132
1133                future.add_done_callback(handle_future)
1134
1135        return future
1136
1137    @salt.ext.tornado.gen.coroutine
1138    def _connect(self):
1139        """
1140        Try to connect for the rest of time!
1141        """
1142        while True:
1143            if self._closing:
1144                break
1145            try:
1146                kwargs = {}
1147                if self.source_ip or self.source_port:
1148                    if salt.ext.tornado.version_info >= (4, 5):
1149                        ### source_ip and source_port are supported only in Tornado >= 4.5
1150                        # See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html
1151                        # Otherwise will just ignore these args
1152                        kwargs = {
1153                            "source_ip": self.source_ip,
1154                            "source_port": self.source_port,
1155                        }
1156                    else:
1157                        log.warning(
1158                            "If you need a certain source IP/port, consider upgrading"
1159                            " Tornado >= 4.5"
1160                        )
1161                with salt.utils.asynchronous.current_ioloop(self.io_loop):
1162                    self._stream = yield self._tcp_client.connect(
1163                        self.host, self.port, ssl_options=self.opts.get("ssl"), **kwargs
1164                    )
1165                self._connecting_future.set_result(True)
1166                break
1167            except Exception as exc:  # pylint: disable=broad-except
1168                log.warning(
1169                    "TCP Message Client encountered an exception while connecting to"
1170                    " %s:%s: %r, will reconnect in %d seconds",
1171                    self.host,
1172                    self.port,
1173                    exc,
1174                    self.backoff,
1175                )
1176                yield salt.ext.tornado.gen.sleep(self.backoff)
1177                # self._connecting_future.set_exception(exc)
1178
1179    @salt.ext.tornado.gen.coroutine
1180    def _stream_return(self):
1181        try:
1182            while not self._closing and (
1183                not self._connecting_future.done()
1184                or self._connecting_future.result() is not True
1185            ):
1186                yield self._connecting_future
1187            unpacker = salt.utils.msgpack.Unpacker()
1188            while not self._closing:
1189                try:
1190                    self._read_until_future = self._stream.read_bytes(
1191                        4096, partial=True
1192                    )
1193                    wire_bytes = yield self._read_until_future
1194                    unpacker.feed(wire_bytes)
1195                    for framed_msg in unpacker:
1196                        framed_msg = salt.transport.frame.decode_embedded_strs(
1197                            framed_msg
1198                        )
1199                        header = framed_msg["head"]
1200                        body = framed_msg["body"]
1201                        message_id = header.get("mid")
1202
1203                        if message_id in self.send_future_map:
1204                            self.send_future_map.pop(message_id).set_result(body)
1205                            self.remove_message_timeout(message_id)
1206                        else:
1207                            if self._on_recv is not None:
1208                                self.io_loop.spawn_callback(self._on_recv, header, body)
1209                            else:
1210                                log.error(
1211                                    "Got response for message_id %s that we are not"
1212                                    " tracking",
1213                                    message_id,
1214                                )
1215                except salt.ext.tornado.iostream.StreamClosedError as e:
1216                    log.debug(
1217                        "tcp stream to %s:%s closed, unable to recv",
1218                        self.host,
1219                        self.port,
1220                    )
1221                    for future in self.send_future_map.values():
1222                        future.set_exception(e)
1223                    self.send_future_map = {}
1224                    if self._closing:
1225                        return
1226                    if self.disconnect_callback:
1227                        self.disconnect_callback()
1228                    # if the last connect finished, then we need to make a new one
1229                    if self._connecting_future.done():
1230                        self._connecting_future = self.connect()
1231                    yield self._connecting_future
1232                except TypeError:
1233                    # This is an invalid transport
1234                    if "detect_mode" in self.opts:
1235                        log.info(
1236                            "There was an error trying to use TCP transport; "
1237                            "attempting to fallback to another transport"
1238                        )
1239                    else:
1240                        raise SaltClientError
1241                except Exception as e:  # pylint: disable=broad-except
1242                    log.error("Exception parsing response", exc_info=True)
1243                    for future in self.send_future_map.values():
1244                        future.set_exception(e)
1245                    self.send_future_map = {}
1246                    if self._closing:
1247                        return
1248                    if self.disconnect_callback:
1249                        self.disconnect_callback()
1250                    # if the last connect finished, then we need to make a new one
1251                    if self._connecting_future.done():
1252                        self._connecting_future = self.connect()
1253                    yield self._connecting_future
1254        finally:
1255            self._stream_return_future.set_result(True)
1256
1257    @salt.ext.tornado.gen.coroutine
1258    def _stream_send(self):
1259        while (
1260            not self._connecting_future.done()
1261            or self._connecting_future.result() is not True
1262        ):
1263            yield self._connecting_future
1264        while len(self.send_queue) > 0:
1265            message_id, item = self.send_queue[0]
1266            try:
1267                yield self._stream.write(item)
1268                del self.send_queue[0]
1269            # if the connection is dead, lets fail this send, and make sure we
1270            # attempt to reconnect
1271            except salt.ext.tornado.iostream.StreamClosedError as e:
1272                if message_id in self.send_future_map:
1273                    self.send_future_map.pop(message_id).set_exception(e)
1274                self.remove_message_timeout(message_id)
1275                del self.send_queue[0]
1276                if self._closing:
1277                    return
1278                if self.disconnect_callback:
1279                    self.disconnect_callback()
1280                # if the last connect finished, then we need to make a new one
1281                if self._connecting_future.done():
1282                    self._connecting_future = self.connect()
1283                yield self._connecting_future
1284
1285    def _message_id(self):
1286        wrap = False
1287        while self._mid in self.send_future_map:
1288            if self._mid >= self._max_messages:
1289                if wrap:
1290                    # this shouldn't ever happen, but just in case
1291                    raise Exception("Unable to find available messageid")
1292                self._mid = 1
1293                wrap = True
1294            else:
1295                self._mid += 1
1296
1297        return self._mid
1298
1299    # TODO: return a message object which takes care of multiplexing?
1300    def on_recv(self, callback):
1301        """
1302        Register a callback for received messages (that we didn't initiate)
1303        """
1304        if callback is None:
1305            self._on_recv = callback
1306        else:
1307
1308            def wrap_recv(header, body):
1309                callback(body)
1310
1311            self._on_recv = wrap_recv
1312
1313    def remove_message_timeout(self, message_id):
1314        if message_id not in self.send_timeout_map:
1315            return
1316        timeout = self.send_timeout_map.pop(message_id)
1317        self.io_loop.remove_timeout(timeout)
1318
1319    def timeout_message(self, message_id, msg):
1320        if message_id in self.send_timeout_map:
1321            del self.send_timeout_map[message_id]
1322        if message_id in self.send_future_map:
1323            future = self.send_future_map.pop(message_id)
1324            # In a race condition the message might have been sent by the time
1325            # we're timing it out. Make sure the future is not None
1326            if future is not None:
1327                if future.attempts < future.tries:
1328                    future.attempts += 1
1329
1330                    log.debug(
1331                        "SaltReqTimeoutError, retrying. (%s/%s)",
1332                        future.attempts,
1333                        future.tries,
1334                    )
1335                    self.send(
1336                        msg,
1337                        timeout=future.timeout,
1338                        tries=future.tries,
1339                        future=future,
1340                    )
1341
1342                else:
1343                    future.set_exception(SaltReqTimeoutError("Message timed out"))
1344
1345    def send(self, msg, timeout=None, callback=None, raw=False, future=None, tries=3):
1346        """
1347        Send given message, and return a future
1348        """
1349        message_id = self._message_id()
1350        header = {"mid": message_id}
1351
1352        if future is None:
1353            future = salt.ext.tornado.concurrent.Future()
1354            future.tries = tries
1355            future.attempts = 0
1356            future.timeout = timeout
1357
1358        if callback is not None:
1359
1360            def handle_future(future):
1361                response = future.result()
1362                self.io_loop.add_callback(callback, response)
1363
1364            future.add_done_callback(handle_future)
1365        # Add this future to the mapping
1366        self.send_future_map[message_id] = future
1367
1368        if self.opts.get("detect_mode") is True:
1369            timeout = 1
1370
1371        if timeout is not None:
1372            send_timeout = self.io_loop.call_later(
1373                timeout, self.timeout_message, message_id, msg
1374            )
1375            self.send_timeout_map[message_id] = send_timeout
1376
1377        # if we don't have a send queue, we need to spawn the callback to do the sending
1378        if len(self.send_queue) == 0:
1379            self.io_loop.spawn_callback(self._stream_send)
1380        self.send_queue.append(
1381            (message_id, salt.transport.frame.frame_msg(msg, header=header))
1382        )
1383        return future
1384
1385
1386class Subscriber:
1387    """
1388    Client object for use with the TCP publisher server
1389    """
1390
1391    def __init__(self, stream, address):
1392        self.stream = stream
1393        self.address = address
1394        self._closing = False
1395        self._read_until_future = None
1396        self.id_ = None
1397
1398    def close(self):
1399        if self._closing:
1400            return
1401        self._closing = True
1402        if not self.stream.closed():
1403            self.stream.close()
1404            if self._read_until_future is not None and self._read_until_future.done():
1405                # This will prevent this message from showing up:
1406                # '[ERROR   ] Future exception was never retrieved:
1407                # StreamClosedError'
1408                # This happens because the logic is always waiting to read
1409                # the next message and the associated read future is marked
1410                # 'StreamClosedError' when the stream is closed.
1411                self._read_until_future.exception()
1412
1413    # pylint: disable=W1701
1414    def __del__(self):
1415        self.close()
1416
1417    # pylint: enable=W1701
1418
1419
1420class PubServer(salt.ext.tornado.tcpserver.TCPServer):
1421    """
1422    TCP publisher
1423    """
1424
1425    def __init__(self, opts, io_loop=None, pack_publish=lambda _: _):
1426        super().__init__(ssl_options=opts.get("ssl"))
1427        self.io_loop = io_loop
1428        self.opts = opts
1429        self._closing = False
1430        self.clients = set()
1431        self.aes_funcs = salt.master.AESFuncs(self.opts)
1432        self.present = {}
1433        self.event = None
1434        self.presence_events = False
1435        if self.opts.get("presence_events", False):
1436            tcp_only = True
1437            for transport, _ in iter_transport_opts(self.opts):
1438                if transport != "tcp":
1439                    tcp_only = False
1440            if tcp_only:
1441                # Only when the transport is TCP only, the presence events will
1442                # be handled here. Otherwise, it will be handled in the
1443                # 'Maintenance' process.
1444                self.presence_events = True
1445
1446        if self.presence_events:
1447            self.event = salt.utils.event.get_event(
1448                "master", opts=self.opts, listen=False
1449            )
1450        else:
1451            self.event = None
1452        self._pack_publish = pack_publish
1453
1454    def pack_publish(self, load):
1455        return self._pack_publish(load)
1456
1457    def close(self):
1458        if self._closing:
1459            return
1460        self._closing = True
1461        if self.event is not None:
1462            self.event.destroy()
1463            self.event = None
1464        if self.aes_funcs is not None:
1465            self.aes_funcs.destroy()
1466            self.aes_funcs = None
1467
1468    # pylint: disable=W1701
1469    def __del__(self):
1470        self.close()
1471
1472    # pylint: enable=W1701
1473
1474    def _add_client_present(self, client):
1475        id_ = client.id_
1476        if id_ in self.present:
1477            clients = self.present[id_]
1478            clients.add(client)
1479        else:
1480            self.present[id_] = {client}
1481            if self.presence_events:
1482                data = {"new": [id_], "lost": []}
1483                self.event.fire_event(
1484                    data, salt.utils.event.tagify("change", "presence")
1485                )
1486                data = {"present": list(self.present.keys())}
1487                self.event.fire_event(
1488                    data, salt.utils.event.tagify("present", "presence")
1489                )
1490
1491    def _remove_client_present(self, client):
1492        id_ = client.id_
1493        if id_ is None or id_ not in self.present:
1494            # This is possible if _remove_client_present() is invoked
1495            # before the minion's id is validated.
1496            return
1497
1498        clients = self.present[id_]
1499        if client not in clients:
1500            # Since _remove_client_present() is potentially called from
1501            # _stream_read() and/or publish_payload(), it is possible for
1502            # it to be called twice, in which case we will get here.
1503            # This is not an abnormal case, so no logging is required.
1504            return
1505
1506        clients.remove(client)
1507        if len(clients) == 0:
1508            del self.present[id_]
1509            if self.presence_events:
1510                data = {"new": [], "lost": [id_]}
1511                self.event.fire_event(
1512                    data, salt.utils.event.tagify("change", "presence")
1513                )
1514                data = {"present": list(self.present.keys())}
1515                self.event.fire_event(
1516                    data, salt.utils.event.tagify("present", "presence")
1517                )
1518
1519    @salt.ext.tornado.gen.coroutine
1520    def _stream_read(self, client):
1521        unpacker = salt.utils.msgpack.Unpacker()
1522        while not self._closing:
1523            try:
1524                client._read_until_future = client.stream.read_bytes(4096, partial=True)
1525                wire_bytes = yield client._read_until_future
1526                unpacker.feed(wire_bytes)
1527                for framed_msg in unpacker:
1528                    framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg)
1529                    body = framed_msg["body"]
1530                    if body["enc"] != "aes":
1531                        # We only accept 'aes' encoded messages for 'id'
1532                        continue
1533                    crypticle = salt.crypt.Crypticle(
1534                        self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
1535                    )
1536                    load = crypticle.loads(body["load"])
1537                    load = salt.transport.frame.decode_embedded_strs(load)
1538                    if not self.aes_funcs.verify_minion(load["id"], load["tok"]):
1539                        continue
1540                    client.id_ = load["id"]
1541                    self._add_client_present(client)
1542            except salt.ext.tornado.iostream.StreamClosedError as e:
1543                log.debug("tcp stream to %s closed, unable to recv", client.address)
1544                client.close()
1545                self._remove_client_present(client)
1546                self.clients.discard(client)
1547                break
1548            except Exception as e:  # pylint: disable=broad-except
1549                log.error(
1550                    "Exception parsing response from %s", client.address, exc_info=True
1551                )
1552                continue
1553
1554    def handle_stream(self, stream, address):
1555        log.trace("Subscriber at %s connected", address)
1556        client = Subscriber(stream, address)
1557        self.clients.add(client)
1558        self.io_loop.spawn_callback(self._stream_read, client)
1559
1560    # TODO: ACK the publish through IPC
1561    @salt.ext.tornado.gen.coroutine
1562    def publish_payload(self, package, _):
1563        log.debug("TCP PubServer sending payload: %s", package)
1564        package = self.pack_publish(package)
1565        payload = salt.transport.frame.frame_msg(package["payload"])
1566
1567        to_remove = []
1568        if "topic_lst" in package:
1569            topic_lst = package["topic_lst"]
1570            for topic in topic_lst:
1571                if topic in self.present:
1572                    # This will rarely be a list of more than 1 item. It will
1573                    # be more than 1 item if the minion disconnects from the
1574                    # master in an unclean manner (eg cable yank), then
1575                    # restarts and the master is yet to detect the disconnect
1576                    # via TCP keep-alive.
1577                    for client in self.present[topic]:
1578                        try:
1579                            # Write the packed str
1580                            f = client.stream.write(payload)
1581                            self.io_loop.add_future(f, lambda f: True)
1582                        except salt.ext.tornado.iostream.StreamClosedError:
1583                            to_remove.append(client)
1584                else:
1585                    log.debug("Publish target %s not connected", topic)
1586        else:
1587            for client in self.clients:
1588                try:
1589                    # Write the packed str
1590                    f = client.stream.write(payload)
1591                    self.io_loop.add_future(f, lambda f: True)
1592                except salt.ext.tornado.iostream.StreamClosedError:
1593                    to_remove.append(client)
1594        for client in to_remove:
1595            log.debug(
1596                "Subscriber at %s has disconnected from publisher", client.address
1597            )
1598            client.close()
1599            self._remove_client_present(client)
1600            self.clients.discard(client)
1601        log.trace("TCP PubServer finished publishing payload")
1602
1603
1604class TCPPubServerChannel(salt.transport.server.PubServerChannel):
1605    # TODO: opts!
1606    # Based on default used in salt.ext.tornado.netutil.bind_sockets()
1607    backlog = 128
1608
1609    def __init__(self, opts):
1610        self.opts = opts
1611        self.ckminions = salt.utils.minions.CkMinions(opts)
1612        self.io_loop = None
1613
1614    def __setstate__(self, state):
1615        salt.master.SMaster.secrets = state["secrets"]
1616        self.__init__(state["opts"])
1617
1618    def __getstate__(self):
1619        return {"opts": self.opts, "secrets": salt.master.SMaster.secrets}
1620
1621    def _publish_daemon(self, **kwargs):
1622        """
1623        Bind to the interface specified in the configuration file
1624        """
1625        salt.utils.process.appendproctitle(self.__class__.__name__)
1626
1627        log_queue = kwargs.get("log_queue")
1628        if log_queue is not None:
1629            salt.log.setup.set_multiprocessing_logging_queue(log_queue)
1630        log_queue_level = kwargs.get("log_queue_level")
1631        if log_queue_level is not None:
1632            salt.log.setup.set_multiprocessing_logging_level(log_queue_level)
1633        salt.log.setup.setup_multiprocessing_logging(log_queue)
1634
1635        # Check if io_loop was set outside
1636        if self.io_loop is None:
1637            self.io_loop = salt.ext.tornado.ioloop.IOLoop.current()
1638
1639        # Spin up the publisher
1640        pub_server = PubServer(
1641            self.opts, io_loop=self.io_loop, pack_publish=self.pack_publish
1642        )
1643        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1644        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1645        _set_tcp_keepalive(sock, self.opts)
1646        sock.setblocking(0)
1647        sock.bind((self.opts["interface"], int(self.opts["publish_port"])))
1648        sock.listen(self.backlog)
1649        # pub_server will take ownership of the socket
1650        pub_server.add_socket(sock)
1651
1652        # Set up Salt IPC server
1653        if self.opts.get("ipc_mode", "") == "tcp":
1654            pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
1655        else:
1656            pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
1657
1658        pull_sock = salt.transport.ipc.IPCMessageServer(
1659            pull_uri,
1660            io_loop=self.io_loop,
1661            payload_handler=pub_server.publish_payload,
1662        )
1663
1664        # Securely create socket
1665        log.info("Starting the Salt Puller on %s", pull_uri)
1666        with salt.utils.files.set_umask(0o177):
1667            pull_sock.start()
1668
1669        # run forever
1670        try:
1671            self.io_loop.start()
1672        except (KeyboardInterrupt, SystemExit):
1673            salt.log.setup.shutdown_multiprocessing_logging()
1674        finally:
1675            pull_sock.close()
1676
1677    def pre_fork(self, process_manager, kwargs=None):
1678        """
1679        Do anything necessary pre-fork. Since this is on the master side this will
1680        primarily be used to create IPC channels and create our daemon process to
1681        do the actual publishing
1682        """
1683        process_manager.add_process(self._publish_daemon, kwargs=kwargs)
1684
1685    def pack_publish(self, load):
1686        payload = {"enc": "aes"}
1687        load["serial"] = salt.master.SMaster.get_serial()
1688        crypticle = salt.crypt.Crypticle(
1689            self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
1690        )
1691        payload["load"] = crypticle.dumps(load)
1692        if self.opts["sign_pub_messages"]:
1693            master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem")
1694            log.debug("Signing data packet")
1695            payload["sig"] = salt.crypt.sign_message(master_pem_path, payload["load"])
1696        int_payload = {"payload": salt.payload.dumps(payload)}
1697
1698        # add some targeting stuff for lists only (for now)
1699        if load["tgt_type"] == "list" and not self.opts.get("order_masters", False):
1700            if isinstance(load["tgt"], str):
1701                # Fetch a list of minions that match
1702                _res = self.ckminions.check_minions(
1703                    load["tgt"], tgt_type=load["tgt_type"]
1704                )
1705                match_ids = _res["minions"]
1706
1707                log.debug("Publish Side Match: %s", match_ids)
1708                # Send list of miions thru so zmq can target them
1709                int_payload["topic_lst"] = match_ids
1710            else:
1711                int_payload["topic_lst"] = load["tgt"]
1712        return int_payload
1713
1714    def publish(self, load):
1715        """
1716        Publish "load" to minions
1717        """
1718        # Send it over IPC!
1719        if self.opts.get("ipc_mode", "") == "tcp":
1720            pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514))
1721        else:
1722            pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
1723        pub_sock = salt.utils.asynchronous.SyncWrapper(
1724            salt.transport.ipc.IPCMessageClient,
1725            (pull_uri,),
1726            loop_kwarg="io_loop",
1727        )
1728        pub_sock.connect()
1729        pub_sock.send(load)
1730