1""" 2TCP transport classes 3 4Wire protocol: "len(payload) msgpack({'head': SOMEHEADER, 'body': SOMEBODY})" 5 6""" 7import errno 8import logging 9import os 10import queue 11import socket 12import threading 13import time 14import traceback 15import urllib.parse 16import uuid 17 18import salt.crypt 19import salt.exceptions 20import salt.ext.tornado 21import salt.ext.tornado.concurrent 22import salt.ext.tornado.gen 23import salt.ext.tornado.iostream 24import salt.ext.tornado.netutil 25import salt.ext.tornado.tcpclient 26import salt.ext.tornado.tcpserver 27import salt.payload 28import salt.transport.client 29import salt.transport.frame 30import salt.transport.ipc 31import salt.transport.mixins.auth 32import salt.transport.server 33import salt.utils.asynchronous 34import salt.utils.event 35import salt.utils.files 36import salt.utils.msgpack 37import salt.utils.platform 38import salt.utils.process 39import salt.utils.verify 40import salt.utils.versions 41from salt.exceptions import SaltClientError, SaltReqTimeoutError 42from salt.transport import iter_transport_opts 43 44try: 45 from M2Crypto import RSA 46 47 HAS_M2 = True 48except ImportError: 49 HAS_M2 = False 50 try: 51 from Cryptodome.Cipher import PKCS1_OAEP 52 except ImportError: 53 from Crypto.Cipher import PKCS1_OAEP # nosec 54 55if salt.utils.platform.is_windows(): 56 USE_LOAD_BALANCER = True 57else: 58 USE_LOAD_BALANCER = False 59 60if USE_LOAD_BALANCER: 61 import threading 62 import multiprocessing 63 import salt.ext.tornado.util 64 from salt.utils.process import SignalHandlingProcess 65 66log = logging.getLogger(__name__) 67 68 69def _set_tcp_keepalive(sock, opts): 70 """ 71 Ensure that TCP keepalives are set for the socket. 72 """ 73 if hasattr(socket, "SO_KEEPALIVE"): 74 if opts.get("tcp_keepalive", False): 75 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 76 if hasattr(socket, "SOL_TCP"): 77 if hasattr(socket, "TCP_KEEPIDLE"): 78 tcp_keepalive_idle = opts.get("tcp_keepalive_idle", -1) 79 if tcp_keepalive_idle > 0: 80 sock.setsockopt( 81 socket.SOL_TCP, socket.TCP_KEEPIDLE, int(tcp_keepalive_idle) 82 ) 83 if hasattr(socket, "TCP_KEEPCNT"): 84 tcp_keepalive_cnt = opts.get("tcp_keepalive_cnt", -1) 85 if tcp_keepalive_cnt > 0: 86 sock.setsockopt( 87 socket.SOL_TCP, socket.TCP_KEEPCNT, int(tcp_keepalive_cnt) 88 ) 89 if hasattr(socket, "TCP_KEEPINTVL"): 90 tcp_keepalive_intvl = opts.get("tcp_keepalive_intvl", -1) 91 if tcp_keepalive_intvl > 0: 92 sock.setsockopt( 93 socket.SOL_TCP, 94 socket.TCP_KEEPINTVL, 95 int(tcp_keepalive_intvl), 96 ) 97 if hasattr(socket, "SIO_KEEPALIVE_VALS"): 98 # Windows doesn't support TCP_KEEPIDLE, TCP_KEEPCNT, nor 99 # TCP_KEEPINTVL. Instead, it has its own proprietary 100 # SIO_KEEPALIVE_VALS. 101 tcp_keepalive_idle = opts.get("tcp_keepalive_idle", -1) 102 tcp_keepalive_intvl = opts.get("tcp_keepalive_intvl", -1) 103 # Windows doesn't support changing something equivalent to 104 # TCP_KEEPCNT. 105 if tcp_keepalive_idle > 0 or tcp_keepalive_intvl > 0: 106 # Windows defaults may be found by using the link below. 107 # Search for 'KeepAliveTime' and 'KeepAliveInterval'. 108 # https://technet.microsoft.com/en-us/library/bb726981.aspx#EDAA 109 # If one value is set and the other isn't, we still need 110 # to send both values to SIO_KEEPALIVE_VALS and they both 111 # need to be valid. So in that case, use the Windows 112 # default. 113 if tcp_keepalive_idle <= 0: 114 tcp_keepalive_idle = 7200 115 if tcp_keepalive_intvl <= 0: 116 tcp_keepalive_intvl = 1 117 # The values expected are in milliseconds, so multiply by 118 # 1000. 119 sock.ioctl( 120 socket.SIO_KEEPALIVE_VALS, 121 ( 122 1, 123 int(tcp_keepalive_idle * 1000), 124 int(tcp_keepalive_intvl * 1000), 125 ), 126 ) 127 else: 128 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 0) 129 130 131if USE_LOAD_BALANCER: 132 133 class LoadBalancerServer(SignalHandlingProcess): 134 """ 135 Raw TCP server which runs in its own process and will listen 136 for incoming connections. Each incoming connection will be 137 sent via multiprocessing queue to the workers. 138 Since the queue is shared amongst workers, only one worker will 139 handle a given connection. 140 """ 141 142 # TODO: opts! 143 # Based on default used in salt.ext.tornado.netutil.bind_sockets() 144 backlog = 128 145 146 def __init__(self, opts, socket_queue, **kwargs): 147 super().__init__(**kwargs) 148 self.opts = opts 149 self.socket_queue = socket_queue 150 self._socket = None 151 152 def close(self): 153 if self._socket is not None: 154 self._socket.shutdown(socket.SHUT_RDWR) 155 self._socket.close() 156 self._socket = None 157 158 # pylint: disable=W1701 159 def __del__(self): 160 self.close() 161 162 # pylint: enable=W1701 163 164 def run(self): 165 """ 166 Start the load balancer 167 """ 168 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 169 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 170 _set_tcp_keepalive(self._socket, self.opts) 171 self._socket.setblocking(1) 172 self._socket.bind((self.opts["interface"], int(self.opts["ret_port"]))) 173 self._socket.listen(self.backlog) 174 175 while True: 176 try: 177 # Wait for a connection to occur since the socket is 178 # blocking. 179 connection, address = self._socket.accept() 180 # Wait for a free slot to be available to put 181 # the connection into. 182 # Sockets are picklable on Windows in Python 3. 183 self.socket_queue.put((connection, address), True, None) 184 except OSError as e: 185 # ECONNABORTED indicates that there was a connection 186 # but it was closed while still in the accept queue. 187 # (observed on FreeBSD). 188 if ( 189 salt.ext.tornado.util.errno_from_exception(e) 190 == errno.ECONNABORTED 191 ): 192 continue 193 raise 194 195 196# TODO: move serial down into message library 197class AsyncTCPReqChannel(salt.transport.client.ReqChannel): 198 """ 199 Encapsulate sending routines to tcp. 200 201 Note: this class returns a singleton 202 """ 203 204 async_methods = [ 205 "crypted_transfer_decode_dictentry", 206 "_crypted_transfer", 207 "_uncrypted_transfer", 208 "send", 209 ] 210 close_methods = [ 211 "close", 212 ] 213 214 def __init__(self, opts, **kwargs): 215 self.opts = dict(opts) 216 if "master_uri" in kwargs: 217 self.opts["master_uri"] = kwargs["master_uri"] 218 219 # crypt defaults to 'aes' 220 self.crypt = kwargs.get("crypt", "aes") 221 222 self.io_loop = kwargs.get("io_loop") or salt.ext.tornado.ioloop.IOLoop.current() 223 224 if self.crypt != "clear": 225 self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop) 226 227 resolver = kwargs.get("resolver") 228 229 parse = urllib.parse.urlparse(self.opts["master_uri"]) 230 master_host, master_port = parse.netloc.rsplit(":", 1) 231 self.master_addr = (master_host, int(master_port)) 232 self._closing = False 233 self.message_client = SaltMessageClientPool( 234 self.opts, 235 args=( 236 self.opts, 237 master_host, 238 int(master_port), 239 ), 240 kwargs={ 241 "io_loop": self.io_loop, 242 "resolver": resolver, 243 "source_ip": self.opts.get("source_ip"), 244 "source_port": self.opts.get("source_ret_port"), 245 }, 246 ) 247 248 def close(self): 249 if self._closing: 250 return 251 log.debug("Closing %s instance", self.__class__.__name__) 252 self._closing = True 253 self.message_client.close() 254 255 # pylint: disable=W1701 256 def __del__(self): 257 try: 258 self.close() 259 except OSError as exc: 260 if exc.errno != errno.EBADF: 261 # If its not a bad file descriptor error, raise 262 raise 263 264 # pylint: enable=W1701 265 266 def _package_load(self, load): 267 return { 268 "enc": self.crypt, 269 "load": load, 270 "version": 2, 271 } 272 273 @salt.ext.tornado.gen.coroutine 274 def crypted_transfer_decode_dictentry( 275 self, load, dictkey=None, tries=3, timeout=60 276 ): 277 nonce = uuid.uuid4().hex 278 load["nonce"] = nonce 279 if not self.auth.authenticated: 280 yield self.auth.authenticate() 281 ret = yield self.message_client.send( 282 self._package_load(self.auth.crypticle.dumps(load)), 283 timeout=timeout, 284 tries=tries, 285 ) 286 key = self.auth.get_keys() 287 if HAS_M2: 288 aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) 289 else: 290 cipher = PKCS1_OAEP.new(key) 291 aes = cipher.decrypt(ret["key"]) 292 293 # Decrypt using the public key. 294 pcrypt = salt.crypt.Crypticle(self.opts, aes) 295 signed_msg = pcrypt.loads(ret[dictkey]) 296 297 # Validate the master's signature. 298 master_pubkey_path = os.path.join(self.opts["pki_dir"], "minion_master.pub") 299 if not salt.crypt.verify_signature( 300 master_pubkey_path, signed_msg["data"], signed_msg["sig"] 301 ): 302 raise salt.crypt.AuthenticationError( 303 "Pillar payload signature failed to validate." 304 ) 305 306 # Make sure the signed key matches the key we used to decrypt the data. 307 data = salt.payload.loads(signed_msg["data"]) 308 if data["key"] != ret["key"]: 309 raise salt.crypt.AuthenticationError("Key verification failed.") 310 311 # Validate the nonce. 312 if data["nonce"] != nonce: 313 raise salt.crypt.AuthenticationError("Pillar nonce verification failed.") 314 raise salt.ext.tornado.gen.Return(data["pillar"]) 315 316 @salt.ext.tornado.gen.coroutine 317 def _crypted_transfer(self, load, tries=3, timeout=60): 318 """ 319 In case of authentication errors, try to renegotiate authentication 320 and retry the method. 321 Indeed, we can fail too early in case of a master restart during a 322 minion state execution call 323 """ 324 nonce = uuid.uuid4().hex 325 if load and isinstance(load, dict): 326 load["nonce"] = nonce 327 328 @salt.ext.tornado.gen.coroutine 329 def _do_transfer(): 330 data = yield self.message_client.send( 331 self._package_load(self.auth.crypticle.dumps(load)), 332 timeout=timeout, 333 tries=tries, 334 ) 335 # we may not have always data 336 # as for example for saltcall ret submission, this is a blind 337 # communication, we do not subscribe to return events, we just 338 # upload the results to the master 339 if data: 340 data = self.auth.crypticle.loads(data, nonce=nonce) 341 data = salt.transport.frame.decode_embedded_strs(data) 342 raise salt.ext.tornado.gen.Return(data) 343 344 if not self.auth.authenticated: 345 yield self.auth.authenticate() 346 try: 347 ret = yield _do_transfer() 348 raise salt.ext.tornado.gen.Return(ret) 349 except salt.crypt.AuthenticationError: 350 yield self.auth.authenticate() 351 ret = yield _do_transfer() 352 raise salt.ext.tornado.gen.Return(ret) 353 354 @salt.ext.tornado.gen.coroutine 355 def _uncrypted_transfer(self, load, tries=3, timeout=60): 356 ret = yield self.message_client.send( 357 self._package_load(load), 358 timeout=timeout, 359 tries=tries, 360 ) 361 362 raise salt.ext.tornado.gen.Return(ret) 363 364 @salt.ext.tornado.gen.coroutine 365 def send(self, load, tries=3, timeout=60, raw=False): 366 """ 367 Send a request, return a future which will complete when we send the message 368 """ 369 try: 370 if self.crypt == "clear": 371 ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout) 372 else: 373 ret = yield self._crypted_transfer(load, tries=tries, timeout=timeout) 374 except salt.ext.tornado.iostream.StreamClosedError: 375 # Convert to 'SaltClientError' so that clients can handle this 376 # exception more appropriately. 377 raise SaltClientError("Connection to master lost") 378 raise salt.ext.tornado.gen.Return(ret) 379 380 381class AsyncTCPPubChannel( 382 salt.transport.mixins.auth.AESPubClientMixin, salt.transport.client.AsyncPubChannel 383): 384 async_methods = [ 385 "send_id", 386 "connect_callback", 387 "connect", 388 ] 389 close_methods = [ 390 "close", 391 ] 392 393 def __init__(self, opts, **kwargs): 394 self.opts = opts 395 self.crypt = kwargs.get("crypt", "aes") 396 self.io_loop = kwargs.get("io_loop") or salt.ext.tornado.ioloop.IOLoop.current() 397 self.connected = False 398 self._closing = False 399 self._reconnected = False 400 self.message_client = None 401 self.event = salt.utils.event.get_event("minion", opts=self.opts, listen=False) 402 403 def close(self): 404 if self._closing: 405 return 406 self._closing = True 407 if self.message_client is not None: 408 self.message_client.close() 409 self.message_client = None 410 if self.event is not None: 411 self.event.destroy() 412 self.event = None 413 414 # pylint: disable=W1701 415 def __del__(self): 416 self.close() 417 418 # pylint: enable=W1701 419 420 def _package_load(self, load): 421 return { 422 "enc": self.crypt, 423 "load": load, 424 "version": 2, 425 } 426 427 @salt.ext.tornado.gen.coroutine 428 def send_id(self, tok, force_auth): 429 """ 430 Send the minion id to the master so that the master may better 431 track the connection state of the minion. 432 In case of authentication errors, try to renegotiate authentication 433 and retry the method. 434 """ 435 load = {"id": self.opts["id"], "tok": tok} 436 437 @salt.ext.tornado.gen.coroutine 438 def _do_transfer(): 439 msg = self._package_load(self.auth.crypticle.dumps(load)) 440 package = salt.transport.frame.frame_msg(msg, header=None) 441 yield self.message_client.write_to_stream(package) 442 raise salt.ext.tornado.gen.Return(True) 443 444 if force_auth or not self.auth.authenticated: 445 count = 0 446 while ( 447 count <= self.opts["tcp_authentication_retries"] 448 or self.opts["tcp_authentication_retries"] < 0 449 ): 450 try: 451 yield self.auth.authenticate() 452 break 453 except SaltClientError as exc: 454 log.debug(exc) 455 count += 1 456 try: 457 ret = yield _do_transfer() 458 raise salt.ext.tornado.gen.Return(ret) 459 except salt.crypt.AuthenticationError: 460 yield self.auth.authenticate() 461 ret = yield _do_transfer() 462 raise salt.ext.tornado.gen.Return(ret) 463 464 @salt.ext.tornado.gen.coroutine 465 def connect_callback(self, result): 466 if self._closing: 467 return 468 # Force re-auth on reconnect since the master 469 # may have been restarted 470 yield self.send_id(self.tok, self._reconnected) 471 self.connected = True 472 self.event.fire_event({"master": self.opts["master"]}, "__master_connected") 473 if self._reconnected: 474 # On reconnects, fire a master event to notify that the minion is 475 # available. 476 if self.opts.get("__role") == "syndic": 477 data = "Syndic {} started at {}".format(self.opts["id"], time.asctime()) 478 tag = salt.utils.event.tagify([self.opts["id"], "start"], "syndic") 479 else: 480 data = "Minion {} started at {}".format(self.opts["id"], time.asctime()) 481 tag = salt.utils.event.tagify([self.opts["id"], "start"], "minion") 482 load = { 483 "id": self.opts["id"], 484 "cmd": "_minion_event", 485 "pretag": None, 486 "tok": self.tok, 487 "data": data, 488 "tag": tag, 489 } 490 req_channel = salt.utils.asynchronous.SyncWrapper( 491 AsyncTCPReqChannel, 492 (self.opts,), 493 loop_kwarg="io_loop", 494 ) 495 try: 496 req_channel.send(load, timeout=60) 497 except salt.exceptions.SaltReqTimeoutError: 498 log.info( 499 "fire_master failed: master could not be contacted. Request timed" 500 " out." 501 ) 502 except Exception: # pylint: disable=broad-except 503 log.info("fire_master failed: %s", traceback.format_exc()) 504 finally: 505 # SyncWrapper will call either close() or destroy(), whichever is available 506 del req_channel 507 else: 508 self._reconnected = True 509 510 def disconnect_callback(self): 511 if self._closing: 512 return 513 self.connected = False 514 self.event.fire_event({"master": self.opts["master"]}, "__master_disconnected") 515 516 @salt.ext.tornado.gen.coroutine 517 def connect(self): 518 try: 519 self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop) 520 self.tok = self.auth.gen_token(b"salt") 521 if not self.auth.authenticated: 522 yield self.auth.authenticate() 523 if self.auth.authenticated: 524 # if this is changed from the default, we assume it was intentional 525 if int(self.opts.get("publish_port", 4505)) != 4505: 526 self.publish_port = self.opts.get("publish_port") 527 # else take the relayed publish_port master reports 528 else: 529 self.publish_port = self.auth.creds["publish_port"] 530 531 self.message_client = SaltMessageClientPool( 532 self.opts, 533 args=(self.opts, self.opts["master_ip"], int(self.publish_port)), 534 kwargs={ 535 "io_loop": self.io_loop, 536 "connect_callback": self.connect_callback, 537 "disconnect_callback": self.disconnect_callback, 538 "source_ip": self.opts.get("source_ip"), 539 "source_port": self.opts.get("source_publish_port"), 540 }, 541 ) 542 yield self.message_client.connect() # wait for the client to be connected 543 self.connected = True 544 # TODO: better exception handling... 545 except KeyboardInterrupt: # pylint: disable=try-except-raise 546 raise 547 except Exception as exc: # pylint: disable=broad-except 548 if "-|RETRY|-" not in str(exc): 549 raise SaltClientError( 550 "Unable to sign_in to master: {}".format(exc) 551 ) # TODO: better error message 552 553 def on_recv(self, callback): 554 """ 555 Register an on_recv callback 556 """ 557 if callback is None: 558 return self.message_client.on_recv(callback) 559 560 @salt.ext.tornado.gen.coroutine 561 def wrap_callback(body): 562 if not isinstance(body, dict): 563 # TODO: For some reason we need to decode here for things 564 # to work. Fix this. 565 body = salt.utils.msgpack.loads(body) 566 body = salt.transport.frame.decode_embedded_strs(body) 567 ret = yield self._decode_payload(body) 568 callback(ret) 569 570 return self.message_client.on_recv(wrap_callback) 571 572 573class TCPReqServerChannel( 574 salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel 575): 576 # TODO: opts! 577 backlog = 5 578 579 def __init__(self, opts): 580 salt.transport.server.ReqServerChannel.__init__(self, opts) 581 self._socket = None 582 self.req_server = None 583 584 @property 585 def socket(self): 586 return self._socket 587 588 def close(self): 589 if self._socket is not None: 590 try: 591 self._socket.shutdown(socket.SHUT_RDWR) 592 except OSError as exc: 593 if exc.errno == errno.ENOTCONN: 594 # We may try to shutdown a socket which is already disconnected. 595 # Ignore this condition and continue. 596 pass 597 else: 598 raise 599 if self.req_server is None: 600 # We only close the socket if we don't have a req_server instance. 601 # If we did, because the req_server is also handling this socket, when we call 602 # req_server.stop(), tornado will give us an AssertionError because it's trying to 603 # match the socket.fileno() (after close it's -1) to the fd it holds on it's _sockets cache 604 # so it can remove the socket from the IOLoop handlers 605 self._socket.close() 606 self._socket = None 607 if self.req_server is not None: 608 try: 609 self.req_server.close() 610 except OSError as exc: 611 if exc.errno != 9: 612 raise 613 log.exception( 614 "TCPReqServerChannel close generated an exception: %s", str(exc) 615 ) 616 self.req_server = None 617 618 # pylint: disable=W1701 619 def __del__(self): 620 self.close() 621 622 # pylint: enable=W1701 623 624 def __enter__(self): 625 return self 626 627 def __exit__(self, *args): 628 self.close() 629 630 def pre_fork(self, process_manager): 631 """ 632 Pre-fork we need to create the zmq router device 633 """ 634 salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager) 635 if USE_LOAD_BALANCER: 636 self.socket_queue = multiprocessing.Queue() 637 process_manager.add_process( 638 LoadBalancerServer, args=(self.opts, self.socket_queue) 639 ) 640 elif not salt.utils.platform.is_windows(): 641 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 642 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 643 _set_tcp_keepalive(self._socket, self.opts) 644 self._socket.setblocking(0) 645 self._socket.bind((self.opts["interface"], int(self.opts["ret_port"]))) 646 647 def post_fork(self, payload_handler, io_loop): 648 """ 649 After forking we need to create all of the local sockets to listen to the 650 router 651 652 payload_handler: function to call with your payloads 653 """ 654 if self.opts["pub_server_niceness"] and not salt.utils.platform.is_windows(): 655 log.info( 656 "setting Publish daemon niceness to %i", 657 self.opts["pub_server_niceness"], 658 ) 659 os.nice(self.opts["pub_server_niceness"]) 660 661 self.payload_handler = payload_handler 662 self.io_loop = io_loop 663 with salt.utils.asynchronous.current_ioloop(self.io_loop): 664 if USE_LOAD_BALANCER: 665 self.req_server = LoadBalancerWorker( 666 self.socket_queue, 667 self.handle_message, 668 ssl_options=self.opts.get("ssl"), 669 ) 670 else: 671 if salt.utils.platform.is_windows(): 672 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 673 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 674 _set_tcp_keepalive(self._socket, self.opts) 675 self._socket.setblocking(0) 676 self._socket.bind( 677 (self.opts["interface"], int(self.opts["ret_port"])) 678 ) 679 self.req_server = SaltMessageServer( 680 self.handle_message, 681 ssl_options=self.opts.get("ssl"), 682 io_loop=self.io_loop, 683 ) 684 self.req_server.add_socket(self._socket) 685 self._socket.listen(self.backlog) 686 salt.transport.mixins.auth.AESReqServerMixin.post_fork( 687 self, payload_handler, io_loop 688 ) 689 690 @salt.ext.tornado.gen.coroutine 691 def handle_message(self, stream, header, payload): 692 """ 693 Handle incoming messages from underlying tcp streams 694 """ 695 try: 696 try: 697 payload = self._decode_payload(payload) 698 except Exception: # pylint: disable=broad-except 699 stream.write(salt.transport.frame.frame_msg("bad load", header=header)) 700 raise salt.ext.tornado.gen.Return() 701 702 # TODO helper functions to normalize payload? 703 if not isinstance(payload, dict) or not isinstance( 704 payload.get("load"), dict 705 ): 706 yield stream.write( 707 salt.transport.frame.frame_msg( 708 "payload and load must be a dict", header=header 709 ) 710 ) 711 raise salt.ext.tornado.gen.Return() 712 713 try: 714 id_ = payload["load"].get("id", "") 715 if "\0" in id_: 716 log.error("Payload contains an id with a null byte: %s", payload) 717 stream.send(salt.payload.dumps("bad load: id contains a null byte")) 718 raise salt.ext.tornado.gen.Return() 719 except TypeError: 720 log.error("Payload contains non-string id: %s", payload) 721 stream.send( 722 salt.payload.dumps("bad load: id {} is not a string".format(id_)) 723 ) 724 raise salt.ext.tornado.gen.Return() 725 726 version = 0 727 if "version" in payload: 728 version = payload["version"] 729 730 sign_messages = False 731 if version > 1: 732 sign_messages = True 733 734 # intercept the "_auth" commands, since the main daemon shouldn't know 735 # anything about our key auth 736 if ( 737 payload["enc"] == "clear" 738 and payload.get("load", {}).get("cmd") == "_auth" 739 ): 740 yield stream.write( 741 salt.transport.frame.frame_msg( 742 self._auth(payload["load"], sign_messages), header=header 743 ) 744 ) 745 raise salt.ext.tornado.gen.Return() 746 747 nonce = None 748 if version > 1: 749 nonce = payload["load"].pop("nonce", None) 750 751 # TODO: test 752 try: 753 ret, req_opts = yield self.payload_handler(payload) 754 except Exception as e: # pylint: disable=broad-except 755 # always attempt to return an error to the minion 756 stream.write("Some exception handling minion payload") 757 log.error( 758 "Some exception handling a payload from minion", exc_info=True 759 ) 760 stream.close() 761 raise salt.ext.tornado.gen.Return() 762 763 req_fun = req_opts.get("fun", "send") 764 if req_fun == "send_clear": 765 stream.write(salt.transport.frame.frame_msg(ret, header=header)) 766 elif req_fun == "send": 767 stream.write( 768 salt.transport.frame.frame_msg( 769 self.crypticle.dumps(ret, nonce), header=header 770 ) 771 ) 772 elif req_fun == "send_private": 773 stream.write( 774 salt.transport.frame.frame_msg( 775 self._encrypt_private( 776 ret, 777 req_opts["key"], 778 req_opts["tgt"], 779 nonce, 780 sign_messages, 781 ), 782 header=header, 783 ) 784 ) 785 else: 786 log.error("Unknown req_fun %s", req_fun) 787 # always attempt to return an error to the minion 788 stream.write("Server-side exception handling payload") 789 stream.close() 790 except salt.ext.tornado.gen.Return: 791 raise 792 except salt.ext.tornado.iostream.StreamClosedError: 793 # Stream was closed. This could happen if the remote side 794 # closed the connection on its end (eg in a timeout or shutdown 795 # situation). 796 log.error("Connection was unexpectedly closed", exc_info=True) 797 except Exception as exc: # pylint: disable=broad-except 798 # Absorb any other exceptions 799 log.error("Unexpected exception occurred: %s", exc, exc_info=True) 800 801 raise salt.ext.tornado.gen.Return() 802 803 804class SaltMessageServer(salt.ext.tornado.tcpserver.TCPServer): 805 """ 806 Raw TCP server which will receive all of the TCP streams and re-assemble 807 messages that are sent through to us 808 """ 809 810 def __init__(self, message_handler, *args, **kwargs): 811 io_loop = ( 812 kwargs.pop("io_loop", None) or salt.ext.tornado.ioloop.IOLoop.current() 813 ) 814 self._closing = False 815 super().__init__(*args, **kwargs) 816 self.io_loop = io_loop 817 self.clients = [] 818 self.message_handler = message_handler 819 820 @salt.ext.tornado.gen.coroutine 821 def handle_stream(self, stream, address): 822 """ 823 Handle incoming streams and add messages to the incoming queue 824 """ 825 log.trace("Req client %s connected", address) 826 self.clients.append((stream, address)) 827 unpacker = salt.utils.msgpack.Unpacker() 828 try: 829 while True: 830 wire_bytes = yield stream.read_bytes(4096, partial=True) 831 unpacker.feed(wire_bytes) 832 for framed_msg in unpacker: 833 framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg) 834 header = framed_msg["head"] 835 self.io_loop.spawn_callback( 836 self.message_handler, stream, header, framed_msg["body"] 837 ) 838 except salt.ext.tornado.iostream.StreamClosedError: 839 log.trace("req client disconnected %s", address) 840 self.remove_client((stream, address)) 841 except Exception as e: # pylint: disable=broad-except 842 log.trace("other master-side exception: %s", e) 843 self.remove_client((stream, address)) 844 stream.close() 845 846 def remove_client(self, client): 847 try: 848 self.clients.remove(client) 849 except ValueError: 850 log.trace("Message server client was not in list to remove") 851 852 def shutdown(self): 853 """ 854 Shutdown the whole server 855 """ 856 salt.utils.versions.warn_until( 857 "Phosphorus", 858 "Please stop calling {0}.{1}.shutdown() and instead call {0}.{1}.close()".format( 859 __name__, self.__class__.__name__ 860 ), 861 ) 862 self.close() 863 864 def close(self): 865 """ 866 Close the server 867 """ 868 if self._closing: 869 return 870 self._closing = True 871 for item in self.clients: 872 client, address = item 873 client.close() 874 self.remove_client(item) 875 try: 876 self.stop() 877 except OSError as exc: 878 if exc.errno != 9: 879 raise 880 881 882if USE_LOAD_BALANCER: 883 884 class LoadBalancerWorker(SaltMessageServer): 885 """ 886 This will receive TCP connections from 'LoadBalancerServer' via 887 a multiprocessing queue. 888 Since the queue is shared amongst workers, only one worker will handle 889 a given connection. 890 """ 891 892 def __init__(self, socket_queue, message_handler, *args, **kwargs): 893 super().__init__(message_handler, *args, **kwargs) 894 self.socket_queue = socket_queue 895 self._stop = threading.Event() 896 self.thread = threading.Thread(target=self.socket_queue_thread) 897 self.thread.start() 898 899 def stop(self): 900 salt.utils.versions.warn_until( 901 "Phosphorus", 902 "Please stop calling {0}.{1}.stop() and instead call {0}.{1}.close()".format( 903 __name__, self.__class__.__name__ 904 ), 905 ) 906 self.close() 907 908 def close(self): 909 self._stop.set() 910 self.thread.join() 911 super().close() 912 913 def socket_queue_thread(self): 914 try: 915 while True: 916 try: 917 client_socket, address = self.socket_queue.get(True, 1) 918 except queue.Empty: 919 if self._stop.is_set(): 920 break 921 continue 922 # 'self.io_loop' initialized in super class 923 # 'salt.ext.tornado.tcpserver.TCPServer'. 924 # 'self._handle_connection' defined in same super class. 925 self.io_loop.spawn_callback( 926 self._handle_connection, client_socket, address 927 ) 928 except (KeyboardInterrupt, SystemExit): 929 pass 930 931 932class TCPClientKeepAlive(salt.ext.tornado.tcpclient.TCPClient): 933 """ 934 Override _create_stream() in TCPClient to enable keep alive support. 935 """ 936 937 def __init__(self, opts, resolver=None): 938 self.opts = opts 939 super().__init__(resolver=resolver) 940 941 def _create_stream( 942 self, max_buffer_size, af, addr, **kwargs 943 ): # pylint: disable=unused-argument,arguments-differ 944 """ 945 Override _create_stream() in TCPClient. 946 947 Tornado 4.5 added the kwargs 'source_ip' and 'source_port'. 948 Due to this, use **kwargs to swallow these and any future 949 kwargs to maintain compatibility. 950 """ 951 # Always connect in plaintext; we'll convert to ssl if necessary 952 # after one connection has completed. 953 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 954 _set_tcp_keepalive(sock, self.opts) 955 stream = salt.ext.tornado.iostream.IOStream( 956 sock, max_buffer_size=max_buffer_size 957 ) 958 if salt.ext.tornado.version_info < (5,): 959 return stream.connect(addr) 960 return stream, stream.connect(addr) 961 962 963class SaltMessageClientPool(salt.transport.MessageClientPool): 964 """ 965 Wrapper class of SaltMessageClient to avoid blocking waiting while writing data to socket. 966 """ 967 968 def __init__(self, opts, args=None, kwargs=None): 969 super().__init__(SaltMessageClient, opts, args=args, kwargs=kwargs) 970 971 def __enter__(self): 972 return self 973 974 def __exit__(self, *args): 975 self.close() 976 977 # pylint: disable=W1701 978 def __del__(self): 979 self.close() 980 981 # pylint: enable=W1701 982 983 def close(self): 984 for message_client in self.message_clients: 985 message_client.close() 986 self.message_clients = [] 987 988 @salt.ext.tornado.gen.coroutine 989 def connect(self): 990 futures = [] 991 for message_client in self.message_clients: 992 futures.append(message_client.connect()) 993 yield futures 994 raise salt.ext.tornado.gen.Return(None) 995 996 def on_recv(self, *args, **kwargs): 997 for message_client in self.message_clients: 998 message_client.on_recv(*args, **kwargs) 999 1000 def send(self, *args, **kwargs): 1001 message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue)) 1002 return message_clients[0].send(*args, **kwargs) 1003 1004 def write_to_stream(self, *args, **kwargs): 1005 message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue)) 1006 return message_clients[0]._stream.write(*args, **kwargs) 1007 1008 1009# TODO consolidate with IPCClient 1010# TODO: limit in-flight messages. 1011# TODO: singleton? Something to not re-create the tcp connection so much 1012class SaltMessageClient: 1013 """ 1014 Low-level message sending client 1015 """ 1016 1017 def __init__( 1018 self, 1019 opts, 1020 host, 1021 port, 1022 io_loop=None, 1023 resolver=None, 1024 connect_callback=None, 1025 disconnect_callback=None, 1026 source_ip=None, 1027 source_port=None, 1028 ): 1029 self.opts = opts 1030 self.host = host 1031 self.port = port 1032 self.source_ip = source_ip 1033 self.source_port = source_port 1034 self.connect_callback = connect_callback 1035 self.disconnect_callback = disconnect_callback 1036 1037 self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current() 1038 1039 with salt.utils.asynchronous.current_ioloop(self.io_loop): 1040 self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver) 1041 1042 self._mid = 1 1043 self._max_messages = int((1 << 31) - 2) # number of IDs before we wrap 1044 1045 # TODO: max queue size 1046 self.send_queue = [] # queue of messages to be sent 1047 self.send_future_map = {} # mapping of request_id -> Future 1048 self.send_timeout_map = {} # request_id -> timeout_callback 1049 1050 self._read_until_future = None 1051 self._on_recv = None 1052 self._closing = False 1053 self._connecting_future = self.connect() 1054 self._stream_return_future = salt.ext.tornado.concurrent.Future() 1055 self.io_loop.spawn_callback(self._stream_return) 1056 1057 self.backoff = opts.get("tcp_reconnect_backoff", 1) 1058 1059 def _stop_io_loop(self): 1060 if self.io_loop is not None: 1061 self.io_loop.stop() 1062 1063 # TODO: timeout inflight sessions 1064 def close(self): 1065 if self._closing: 1066 return 1067 self._closing = True 1068 if hasattr(self, "_stream") and not self._stream.closed(): 1069 # If _stream_return() hasn't completed, it means the IO 1070 # Loop is stopped (such as when using 1071 # 'salt.utils.asynchronous.SyncWrapper'). Ensure that 1072 # _stream_return() completes by restarting the IO Loop. 1073 # This will prevent potential errors on shutdown. 1074 try: 1075 orig_loop = salt.ext.tornado.ioloop.IOLoop.current() 1076 self.io_loop.make_current() 1077 self._stream.close() 1078 if self._read_until_future is not None: 1079 # This will prevent this message from showing up: 1080 # '[ERROR ] Future exception was never retrieved: 1081 # StreamClosedError' 1082 # This happens because the logic is always waiting to read 1083 # the next message and the associated read future is marked 1084 # 'StreamClosedError' when the stream is closed. 1085 if self._read_until_future.done(): 1086 self._read_until_future.exception() 1087 if ( 1088 self.io_loop 1089 != salt.ext.tornado.ioloop.IOLoop.current(instance=False) 1090 or not self._stream_return_future.done() 1091 ): 1092 self.io_loop.add_future( 1093 self._stream_return_future, 1094 lambda future: self._stop_io_loop(), 1095 ) 1096 self.io_loop.start() 1097 except Exception as e: # pylint: disable=broad-except 1098 log.info("Exception caught in SaltMessageClient.close: %s", str(e)) 1099 finally: 1100 orig_loop.make_current() 1101 self._tcp_client.close() 1102 self.io_loop = None 1103 self._read_until_future = None 1104 # Clear callback references to allow the object that they belong to 1105 # to be deleted. 1106 self.connect_callback = None 1107 self.disconnect_callback = None 1108 1109 # pylint: disable=W1701 1110 def __del__(self): 1111 self.close() 1112 1113 # pylint: enable=W1701 1114 1115 def connect(self): 1116 """ 1117 Ask for this client to reconnect to the origin 1118 """ 1119 if hasattr(self, "_connecting_future") and not self._connecting_future.done(): 1120 future = self._connecting_future 1121 else: 1122 future = salt.ext.tornado.concurrent.Future() 1123 self._connecting_future = future 1124 self.io_loop.add_callback(self._connect) 1125 1126 # Add the callback only when a new future is created 1127 if self.connect_callback is not None: 1128 1129 def handle_future(future): 1130 response = future.result() 1131 self.io_loop.add_callback(self.connect_callback, response) 1132 1133 future.add_done_callback(handle_future) 1134 1135 return future 1136 1137 @salt.ext.tornado.gen.coroutine 1138 def _connect(self): 1139 """ 1140 Try to connect for the rest of time! 1141 """ 1142 while True: 1143 if self._closing: 1144 break 1145 try: 1146 kwargs = {} 1147 if self.source_ip or self.source_port: 1148 if salt.ext.tornado.version_info >= (4, 5): 1149 ### source_ip and source_port are supported only in Tornado >= 4.5 1150 # See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html 1151 # Otherwise will just ignore these args 1152 kwargs = { 1153 "source_ip": self.source_ip, 1154 "source_port": self.source_port, 1155 } 1156 else: 1157 log.warning( 1158 "If you need a certain source IP/port, consider upgrading" 1159 " Tornado >= 4.5" 1160 ) 1161 with salt.utils.asynchronous.current_ioloop(self.io_loop): 1162 self._stream = yield self._tcp_client.connect( 1163 self.host, self.port, ssl_options=self.opts.get("ssl"), **kwargs 1164 ) 1165 self._connecting_future.set_result(True) 1166 break 1167 except Exception as exc: # pylint: disable=broad-except 1168 log.warning( 1169 "TCP Message Client encountered an exception while connecting to" 1170 " %s:%s: %r, will reconnect in %d seconds", 1171 self.host, 1172 self.port, 1173 exc, 1174 self.backoff, 1175 ) 1176 yield salt.ext.tornado.gen.sleep(self.backoff) 1177 # self._connecting_future.set_exception(exc) 1178 1179 @salt.ext.tornado.gen.coroutine 1180 def _stream_return(self): 1181 try: 1182 while not self._closing and ( 1183 not self._connecting_future.done() 1184 or self._connecting_future.result() is not True 1185 ): 1186 yield self._connecting_future 1187 unpacker = salt.utils.msgpack.Unpacker() 1188 while not self._closing: 1189 try: 1190 self._read_until_future = self._stream.read_bytes( 1191 4096, partial=True 1192 ) 1193 wire_bytes = yield self._read_until_future 1194 unpacker.feed(wire_bytes) 1195 for framed_msg in unpacker: 1196 framed_msg = salt.transport.frame.decode_embedded_strs( 1197 framed_msg 1198 ) 1199 header = framed_msg["head"] 1200 body = framed_msg["body"] 1201 message_id = header.get("mid") 1202 1203 if message_id in self.send_future_map: 1204 self.send_future_map.pop(message_id).set_result(body) 1205 self.remove_message_timeout(message_id) 1206 else: 1207 if self._on_recv is not None: 1208 self.io_loop.spawn_callback(self._on_recv, header, body) 1209 else: 1210 log.error( 1211 "Got response for message_id %s that we are not" 1212 " tracking", 1213 message_id, 1214 ) 1215 except salt.ext.tornado.iostream.StreamClosedError as e: 1216 log.debug( 1217 "tcp stream to %s:%s closed, unable to recv", 1218 self.host, 1219 self.port, 1220 ) 1221 for future in self.send_future_map.values(): 1222 future.set_exception(e) 1223 self.send_future_map = {} 1224 if self._closing: 1225 return 1226 if self.disconnect_callback: 1227 self.disconnect_callback() 1228 # if the last connect finished, then we need to make a new one 1229 if self._connecting_future.done(): 1230 self._connecting_future = self.connect() 1231 yield self._connecting_future 1232 except TypeError: 1233 # This is an invalid transport 1234 if "detect_mode" in self.opts: 1235 log.info( 1236 "There was an error trying to use TCP transport; " 1237 "attempting to fallback to another transport" 1238 ) 1239 else: 1240 raise SaltClientError 1241 except Exception as e: # pylint: disable=broad-except 1242 log.error("Exception parsing response", exc_info=True) 1243 for future in self.send_future_map.values(): 1244 future.set_exception(e) 1245 self.send_future_map = {} 1246 if self._closing: 1247 return 1248 if self.disconnect_callback: 1249 self.disconnect_callback() 1250 # if the last connect finished, then we need to make a new one 1251 if self._connecting_future.done(): 1252 self._connecting_future = self.connect() 1253 yield self._connecting_future 1254 finally: 1255 self._stream_return_future.set_result(True) 1256 1257 @salt.ext.tornado.gen.coroutine 1258 def _stream_send(self): 1259 while ( 1260 not self._connecting_future.done() 1261 or self._connecting_future.result() is not True 1262 ): 1263 yield self._connecting_future 1264 while len(self.send_queue) > 0: 1265 message_id, item = self.send_queue[0] 1266 try: 1267 yield self._stream.write(item) 1268 del self.send_queue[0] 1269 # if the connection is dead, lets fail this send, and make sure we 1270 # attempt to reconnect 1271 except salt.ext.tornado.iostream.StreamClosedError as e: 1272 if message_id in self.send_future_map: 1273 self.send_future_map.pop(message_id).set_exception(e) 1274 self.remove_message_timeout(message_id) 1275 del self.send_queue[0] 1276 if self._closing: 1277 return 1278 if self.disconnect_callback: 1279 self.disconnect_callback() 1280 # if the last connect finished, then we need to make a new one 1281 if self._connecting_future.done(): 1282 self._connecting_future = self.connect() 1283 yield self._connecting_future 1284 1285 def _message_id(self): 1286 wrap = False 1287 while self._mid in self.send_future_map: 1288 if self._mid >= self._max_messages: 1289 if wrap: 1290 # this shouldn't ever happen, but just in case 1291 raise Exception("Unable to find available messageid") 1292 self._mid = 1 1293 wrap = True 1294 else: 1295 self._mid += 1 1296 1297 return self._mid 1298 1299 # TODO: return a message object which takes care of multiplexing? 1300 def on_recv(self, callback): 1301 """ 1302 Register a callback for received messages (that we didn't initiate) 1303 """ 1304 if callback is None: 1305 self._on_recv = callback 1306 else: 1307 1308 def wrap_recv(header, body): 1309 callback(body) 1310 1311 self._on_recv = wrap_recv 1312 1313 def remove_message_timeout(self, message_id): 1314 if message_id not in self.send_timeout_map: 1315 return 1316 timeout = self.send_timeout_map.pop(message_id) 1317 self.io_loop.remove_timeout(timeout) 1318 1319 def timeout_message(self, message_id, msg): 1320 if message_id in self.send_timeout_map: 1321 del self.send_timeout_map[message_id] 1322 if message_id in self.send_future_map: 1323 future = self.send_future_map.pop(message_id) 1324 # In a race condition the message might have been sent by the time 1325 # we're timing it out. Make sure the future is not None 1326 if future is not None: 1327 if future.attempts < future.tries: 1328 future.attempts += 1 1329 1330 log.debug( 1331 "SaltReqTimeoutError, retrying. (%s/%s)", 1332 future.attempts, 1333 future.tries, 1334 ) 1335 self.send( 1336 msg, 1337 timeout=future.timeout, 1338 tries=future.tries, 1339 future=future, 1340 ) 1341 1342 else: 1343 future.set_exception(SaltReqTimeoutError("Message timed out")) 1344 1345 def send(self, msg, timeout=None, callback=None, raw=False, future=None, tries=3): 1346 """ 1347 Send given message, and return a future 1348 """ 1349 message_id = self._message_id() 1350 header = {"mid": message_id} 1351 1352 if future is None: 1353 future = salt.ext.tornado.concurrent.Future() 1354 future.tries = tries 1355 future.attempts = 0 1356 future.timeout = timeout 1357 1358 if callback is not None: 1359 1360 def handle_future(future): 1361 response = future.result() 1362 self.io_loop.add_callback(callback, response) 1363 1364 future.add_done_callback(handle_future) 1365 # Add this future to the mapping 1366 self.send_future_map[message_id] = future 1367 1368 if self.opts.get("detect_mode") is True: 1369 timeout = 1 1370 1371 if timeout is not None: 1372 send_timeout = self.io_loop.call_later( 1373 timeout, self.timeout_message, message_id, msg 1374 ) 1375 self.send_timeout_map[message_id] = send_timeout 1376 1377 # if we don't have a send queue, we need to spawn the callback to do the sending 1378 if len(self.send_queue) == 0: 1379 self.io_loop.spawn_callback(self._stream_send) 1380 self.send_queue.append( 1381 (message_id, salt.transport.frame.frame_msg(msg, header=header)) 1382 ) 1383 return future 1384 1385 1386class Subscriber: 1387 """ 1388 Client object for use with the TCP publisher server 1389 """ 1390 1391 def __init__(self, stream, address): 1392 self.stream = stream 1393 self.address = address 1394 self._closing = False 1395 self._read_until_future = None 1396 self.id_ = None 1397 1398 def close(self): 1399 if self._closing: 1400 return 1401 self._closing = True 1402 if not self.stream.closed(): 1403 self.stream.close() 1404 if self._read_until_future is not None and self._read_until_future.done(): 1405 # This will prevent this message from showing up: 1406 # '[ERROR ] Future exception was never retrieved: 1407 # StreamClosedError' 1408 # This happens because the logic is always waiting to read 1409 # the next message and the associated read future is marked 1410 # 'StreamClosedError' when the stream is closed. 1411 self._read_until_future.exception() 1412 1413 # pylint: disable=W1701 1414 def __del__(self): 1415 self.close() 1416 1417 # pylint: enable=W1701 1418 1419 1420class PubServer(salt.ext.tornado.tcpserver.TCPServer): 1421 """ 1422 TCP publisher 1423 """ 1424 1425 def __init__(self, opts, io_loop=None, pack_publish=lambda _: _): 1426 super().__init__(ssl_options=opts.get("ssl")) 1427 self.io_loop = io_loop 1428 self.opts = opts 1429 self._closing = False 1430 self.clients = set() 1431 self.aes_funcs = salt.master.AESFuncs(self.opts) 1432 self.present = {} 1433 self.event = None 1434 self.presence_events = False 1435 if self.opts.get("presence_events", False): 1436 tcp_only = True 1437 for transport, _ in iter_transport_opts(self.opts): 1438 if transport != "tcp": 1439 tcp_only = False 1440 if tcp_only: 1441 # Only when the transport is TCP only, the presence events will 1442 # be handled here. Otherwise, it will be handled in the 1443 # 'Maintenance' process. 1444 self.presence_events = True 1445 1446 if self.presence_events: 1447 self.event = salt.utils.event.get_event( 1448 "master", opts=self.opts, listen=False 1449 ) 1450 else: 1451 self.event = None 1452 self._pack_publish = pack_publish 1453 1454 def pack_publish(self, load): 1455 return self._pack_publish(load) 1456 1457 def close(self): 1458 if self._closing: 1459 return 1460 self._closing = True 1461 if self.event is not None: 1462 self.event.destroy() 1463 self.event = None 1464 if self.aes_funcs is not None: 1465 self.aes_funcs.destroy() 1466 self.aes_funcs = None 1467 1468 # pylint: disable=W1701 1469 def __del__(self): 1470 self.close() 1471 1472 # pylint: enable=W1701 1473 1474 def _add_client_present(self, client): 1475 id_ = client.id_ 1476 if id_ in self.present: 1477 clients = self.present[id_] 1478 clients.add(client) 1479 else: 1480 self.present[id_] = {client} 1481 if self.presence_events: 1482 data = {"new": [id_], "lost": []} 1483 self.event.fire_event( 1484 data, salt.utils.event.tagify("change", "presence") 1485 ) 1486 data = {"present": list(self.present.keys())} 1487 self.event.fire_event( 1488 data, salt.utils.event.tagify("present", "presence") 1489 ) 1490 1491 def _remove_client_present(self, client): 1492 id_ = client.id_ 1493 if id_ is None or id_ not in self.present: 1494 # This is possible if _remove_client_present() is invoked 1495 # before the minion's id is validated. 1496 return 1497 1498 clients = self.present[id_] 1499 if client not in clients: 1500 # Since _remove_client_present() is potentially called from 1501 # _stream_read() and/or publish_payload(), it is possible for 1502 # it to be called twice, in which case we will get here. 1503 # This is not an abnormal case, so no logging is required. 1504 return 1505 1506 clients.remove(client) 1507 if len(clients) == 0: 1508 del self.present[id_] 1509 if self.presence_events: 1510 data = {"new": [], "lost": [id_]} 1511 self.event.fire_event( 1512 data, salt.utils.event.tagify("change", "presence") 1513 ) 1514 data = {"present": list(self.present.keys())} 1515 self.event.fire_event( 1516 data, salt.utils.event.tagify("present", "presence") 1517 ) 1518 1519 @salt.ext.tornado.gen.coroutine 1520 def _stream_read(self, client): 1521 unpacker = salt.utils.msgpack.Unpacker() 1522 while not self._closing: 1523 try: 1524 client._read_until_future = client.stream.read_bytes(4096, partial=True) 1525 wire_bytes = yield client._read_until_future 1526 unpacker.feed(wire_bytes) 1527 for framed_msg in unpacker: 1528 framed_msg = salt.transport.frame.decode_embedded_strs(framed_msg) 1529 body = framed_msg["body"] 1530 if body["enc"] != "aes": 1531 # We only accept 'aes' encoded messages for 'id' 1532 continue 1533 crypticle = salt.crypt.Crypticle( 1534 self.opts, salt.master.SMaster.secrets["aes"]["secret"].value 1535 ) 1536 load = crypticle.loads(body["load"]) 1537 load = salt.transport.frame.decode_embedded_strs(load) 1538 if not self.aes_funcs.verify_minion(load["id"], load["tok"]): 1539 continue 1540 client.id_ = load["id"] 1541 self._add_client_present(client) 1542 except salt.ext.tornado.iostream.StreamClosedError as e: 1543 log.debug("tcp stream to %s closed, unable to recv", client.address) 1544 client.close() 1545 self._remove_client_present(client) 1546 self.clients.discard(client) 1547 break 1548 except Exception as e: # pylint: disable=broad-except 1549 log.error( 1550 "Exception parsing response from %s", client.address, exc_info=True 1551 ) 1552 continue 1553 1554 def handle_stream(self, stream, address): 1555 log.trace("Subscriber at %s connected", address) 1556 client = Subscriber(stream, address) 1557 self.clients.add(client) 1558 self.io_loop.spawn_callback(self._stream_read, client) 1559 1560 # TODO: ACK the publish through IPC 1561 @salt.ext.tornado.gen.coroutine 1562 def publish_payload(self, package, _): 1563 log.debug("TCP PubServer sending payload: %s", package) 1564 package = self.pack_publish(package) 1565 payload = salt.transport.frame.frame_msg(package["payload"]) 1566 1567 to_remove = [] 1568 if "topic_lst" in package: 1569 topic_lst = package["topic_lst"] 1570 for topic in topic_lst: 1571 if topic in self.present: 1572 # This will rarely be a list of more than 1 item. It will 1573 # be more than 1 item if the minion disconnects from the 1574 # master in an unclean manner (eg cable yank), then 1575 # restarts and the master is yet to detect the disconnect 1576 # via TCP keep-alive. 1577 for client in self.present[topic]: 1578 try: 1579 # Write the packed str 1580 f = client.stream.write(payload) 1581 self.io_loop.add_future(f, lambda f: True) 1582 except salt.ext.tornado.iostream.StreamClosedError: 1583 to_remove.append(client) 1584 else: 1585 log.debug("Publish target %s not connected", topic) 1586 else: 1587 for client in self.clients: 1588 try: 1589 # Write the packed str 1590 f = client.stream.write(payload) 1591 self.io_loop.add_future(f, lambda f: True) 1592 except salt.ext.tornado.iostream.StreamClosedError: 1593 to_remove.append(client) 1594 for client in to_remove: 1595 log.debug( 1596 "Subscriber at %s has disconnected from publisher", client.address 1597 ) 1598 client.close() 1599 self._remove_client_present(client) 1600 self.clients.discard(client) 1601 log.trace("TCP PubServer finished publishing payload") 1602 1603 1604class TCPPubServerChannel(salt.transport.server.PubServerChannel): 1605 # TODO: opts! 1606 # Based on default used in salt.ext.tornado.netutil.bind_sockets() 1607 backlog = 128 1608 1609 def __init__(self, opts): 1610 self.opts = opts 1611 self.ckminions = salt.utils.minions.CkMinions(opts) 1612 self.io_loop = None 1613 1614 def __setstate__(self, state): 1615 salt.master.SMaster.secrets = state["secrets"] 1616 self.__init__(state["opts"]) 1617 1618 def __getstate__(self): 1619 return {"opts": self.opts, "secrets": salt.master.SMaster.secrets} 1620 1621 def _publish_daemon(self, **kwargs): 1622 """ 1623 Bind to the interface specified in the configuration file 1624 """ 1625 salt.utils.process.appendproctitle(self.__class__.__name__) 1626 1627 log_queue = kwargs.get("log_queue") 1628 if log_queue is not None: 1629 salt.log.setup.set_multiprocessing_logging_queue(log_queue) 1630 log_queue_level = kwargs.get("log_queue_level") 1631 if log_queue_level is not None: 1632 salt.log.setup.set_multiprocessing_logging_level(log_queue_level) 1633 salt.log.setup.setup_multiprocessing_logging(log_queue) 1634 1635 # Check if io_loop was set outside 1636 if self.io_loop is None: 1637 self.io_loop = salt.ext.tornado.ioloop.IOLoop.current() 1638 1639 # Spin up the publisher 1640 pub_server = PubServer( 1641 self.opts, io_loop=self.io_loop, pack_publish=self.pack_publish 1642 ) 1643 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1644 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1645 _set_tcp_keepalive(sock, self.opts) 1646 sock.setblocking(0) 1647 sock.bind((self.opts["interface"], int(self.opts["publish_port"]))) 1648 sock.listen(self.backlog) 1649 # pub_server will take ownership of the socket 1650 pub_server.add_socket(sock) 1651 1652 # Set up Salt IPC server 1653 if self.opts.get("ipc_mode", "") == "tcp": 1654 pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514)) 1655 else: 1656 pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc") 1657 1658 pull_sock = salt.transport.ipc.IPCMessageServer( 1659 pull_uri, 1660 io_loop=self.io_loop, 1661 payload_handler=pub_server.publish_payload, 1662 ) 1663 1664 # Securely create socket 1665 log.info("Starting the Salt Puller on %s", pull_uri) 1666 with salt.utils.files.set_umask(0o177): 1667 pull_sock.start() 1668 1669 # run forever 1670 try: 1671 self.io_loop.start() 1672 except (KeyboardInterrupt, SystemExit): 1673 salt.log.setup.shutdown_multiprocessing_logging() 1674 finally: 1675 pull_sock.close() 1676 1677 def pre_fork(self, process_manager, kwargs=None): 1678 """ 1679 Do anything necessary pre-fork. Since this is on the master side this will 1680 primarily be used to create IPC channels and create our daemon process to 1681 do the actual publishing 1682 """ 1683 process_manager.add_process(self._publish_daemon, kwargs=kwargs) 1684 1685 def pack_publish(self, load): 1686 payload = {"enc": "aes"} 1687 load["serial"] = salt.master.SMaster.get_serial() 1688 crypticle = salt.crypt.Crypticle( 1689 self.opts, salt.master.SMaster.secrets["aes"]["secret"].value 1690 ) 1691 payload["load"] = crypticle.dumps(load) 1692 if self.opts["sign_pub_messages"]: 1693 master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem") 1694 log.debug("Signing data packet") 1695 payload["sig"] = salt.crypt.sign_message(master_pem_path, payload["load"]) 1696 int_payload = {"payload": salt.payload.dumps(payload)} 1697 1698 # add some targeting stuff for lists only (for now) 1699 if load["tgt_type"] == "list" and not self.opts.get("order_masters", False): 1700 if isinstance(load["tgt"], str): 1701 # Fetch a list of minions that match 1702 _res = self.ckminions.check_minions( 1703 load["tgt"], tgt_type=load["tgt_type"] 1704 ) 1705 match_ids = _res["minions"] 1706 1707 log.debug("Publish Side Match: %s", match_ids) 1708 # Send list of miions thru so zmq can target them 1709 int_payload["topic_lst"] = match_ids 1710 else: 1711 int_payload["topic_lst"] = load["tgt"] 1712 return int_payload 1713 1714 def publish(self, load): 1715 """ 1716 Publish "load" to minions 1717 """ 1718 # Send it over IPC! 1719 if self.opts.get("ipc_mode", "") == "tcp": 1720 pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514)) 1721 else: 1722 pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc") 1723 pub_sock = salt.utils.asynchronous.SyncWrapper( 1724 salt.transport.ipc.IPCMessageClient, 1725 (pull_uri,), 1726 loop_kwarg="io_loop", 1727 ) 1728 pub_sock.connect() 1729 pub_sock.send(load) 1730