1""" 2Functions for daemonizing and otherwise modifying running processes 3""" 4import contextlib 5import copy 6import errno 7import functools 8import io 9import json 10import logging 11import multiprocessing 12import multiprocessing.util 13import os 14import queue 15import signal 16import socket 17import subprocess 18import sys 19import threading 20import time 21import types 22 23import salt.defaults.exitcodes 24import salt.log.setup 25import salt.utils.files 26import salt.utils.path 27import salt.utils.platform 28import salt.utils.versions 29from salt.ext.tornado import gen 30 31log = logging.getLogger(__name__) 32 33HAS_PSUTIL = False 34try: 35 import psutil 36 37 HAS_PSUTIL = True 38except ImportError: 39 pass 40 41try: 42 import setproctitle 43 44 HAS_SETPROCTITLE = True 45except ImportError: 46 HAS_SETPROCTITLE = False 47 48 49def appendproctitle(name): 50 """ 51 Append "name" to the current process title 52 """ 53 if HAS_SETPROCTITLE: 54 setproctitle.setproctitle(setproctitle.getproctitle() + " " + name) 55 56 57def daemonize(redirect_out=True): 58 """ 59 Daemonize a process 60 """ 61 # Avoid circular import 62 import salt.utils.crypt 63 64 try: 65 pid = os.fork() 66 if pid > 0: 67 # exit first parent 68 salt.utils.crypt.reinit_crypto() 69 os._exit(salt.defaults.exitcodes.EX_OK) 70 except OSError as exc: 71 log.error("fork #1 failed: %s (%s)", exc.errno, exc) 72 sys.exit(salt.defaults.exitcodes.EX_GENERIC) 73 74 # decouple from parent environment 75 os.chdir("/") 76 # noinspection PyArgumentList 77 os.setsid() 78 os.umask(0o022) # pylint: disable=blacklisted-function 79 80 # do second fork 81 try: 82 pid = os.fork() 83 if pid > 0: 84 salt.utils.crypt.reinit_crypto() 85 sys.exit(salt.defaults.exitcodes.EX_OK) 86 except OSError as exc: 87 log.error("fork #2 failed: %s (%s)", exc.errno, exc) 88 sys.exit(salt.defaults.exitcodes.EX_GENERIC) 89 90 salt.utils.crypt.reinit_crypto() 91 92 # A normal daemonization redirects the process output to /dev/null. 93 # Unfortunately when a python multiprocess is called the output is 94 # not cleanly redirected and the parent process dies when the 95 # multiprocessing process attempts to access stdout or err. 96 if redirect_out: 97 with salt.utils.files.fopen("/dev/null", "r+") as dev_null: 98 # Redirect python stdin/out/err 99 # and the os stdin/out/err which can be different 100 dup2(dev_null, sys.stdin) 101 dup2(dev_null, sys.stdout) 102 dup2(dev_null, sys.stderr) 103 dup2(dev_null, 0) 104 dup2(dev_null, 1) 105 dup2(dev_null, 2) 106 107 108def dup2(file1, file2): 109 """ 110 Duplicate file descriptor fd to fd2, closing the latter first if necessary. 111 This method is similar to os.dup2 but ignores streams that do not have a 112 supported fileno method. 113 """ 114 if isinstance(file1, int): 115 fno1 = file1 116 else: 117 try: 118 fno1 = file1.fileno() 119 except io.UnsupportedOperation: 120 log.warning("Unsupported operation on file: %r", file1) 121 return 122 if isinstance(file2, int): 123 fno2 = file2 124 else: 125 try: 126 fno2 = file2.fileno() 127 except io.UnsupportedOperation: 128 log.warning("Unsupported operation on file: %r", file2) 129 return 130 os.dup2(fno1, fno2) 131 132 133def daemonize_if(opts): 134 """ 135 Daemonize a module function process if multiprocessing is True and the 136 process is not being called by salt-call 137 """ 138 if "salt-call" in sys.argv[0]: 139 return 140 if not opts.get("multiprocessing", True): 141 return 142 if sys.platform.startswith("win"): 143 return 144 daemonize(False) 145 146 147def systemd_notify_call(action): 148 process = subprocess.Popen( 149 ["systemd-notify", action], stdout=subprocess.PIPE, stderr=subprocess.PIPE 150 ) 151 process.communicate() 152 status = process.poll() 153 return status == 0 154 155 156def notify_systemd(): 157 """ 158 Notify systemd that this process has started 159 """ 160 try: 161 import systemd.daemon # pylint: disable=no-name-in-module 162 except ImportError: 163 if salt.utils.path.which("systemd-notify") and systemd_notify_call("--booted"): 164 # Notify systemd synchronously 165 notify_socket = os.getenv("NOTIFY_SOCKET") 166 if notify_socket: 167 # Handle abstract namespace socket 168 if notify_socket.startswith("@"): 169 notify_socket = "\0{}".format(notify_socket[1:]) 170 try: 171 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 172 sock.connect(notify_socket) 173 sock.sendall(b"READY=1") 174 sock.close() 175 except OSError: 176 return systemd_notify_call("--ready") 177 return True 178 return False 179 180 if systemd.daemon.booted(): 181 try: 182 return systemd.daemon.notify("READY=1") 183 except SystemError: 184 # Daemon was not started by systemd 185 pass 186 187 188def get_process_info(pid=None): 189 """ 190 Gets basic info about a process. 191 pid: None, or int: None will get the current process pid 192 Return: None or Dict 193 """ 194 if pid is None: 195 pid = os.getpid() 196 elif not psutil.pid_exists(pid): 197 return 198 199 raw_process_info = psutil.Process(pid) 200 201 # pid_exists can have false positives 202 # for example Windows reserves PID 5 in a hack way 203 # another reasons is the the process requires kernel permissions 204 try: 205 raw_process_info.status() 206 except psutil.NoSuchProcess: 207 return None 208 209 return { 210 "pid": raw_process_info.pid, 211 "name": raw_process_info.name(), 212 "start_time": raw_process_info.create_time(), 213 } 214 215 216def claim_mantle_of_responsibility(file_name): 217 """ 218 Checks that no other live processes has this responsibility. 219 If claiming the mantle of responsibility was successful True will be returned. 220 file_name: str 221 Return: bool 222 """ 223 224 # all OSs supported by salt has psutil 225 if not HAS_PSUTIL: 226 log.critical( 227 "Assuming no other Process has this responsibility! pidfile: %s", file_name 228 ) 229 return True 230 231 # add file directory if missing 232 file_directory_name = os.path.dirname(file_name) 233 if not os.path.isdir(file_directory_name) and file_directory_name: 234 os.makedirs(file_directory_name) 235 236 # get process info from file 237 file_process_info = None 238 try: 239 with salt.utils.files.fopen(file_name, "r") as file: 240 file_process_info = json.load(file) 241 except json.decoder.JSONDecodeError: 242 log.error("pidfile: %s is corrupted", file_name) 243 except FileNotFoundError: 244 log.info("pidfile: %s not found", file_name) 245 246 this_process_info = get_process_info() 247 248 # check if this process all ready has the responsibility 249 if file_process_info == this_process_info: 250 return True 251 252 if not isinstance(file_process_info, dict) or not isinstance( 253 file_process_info.get("pid"), int 254 ): 255 file_process_info = None 256 257 # check if process is still alive 258 if isinstance(file_process_info, dict) and file_process_info == get_process_info( 259 file_process_info.get("pid") 260 ): 261 return False 262 263 # process can take the mantle of responsibility 264 with salt.utils.files.fopen(file_name, "w") as file: 265 json.dump(this_process_info, file) 266 return True 267 268 269def check_mantle_of_responsibility(file_name): 270 """ 271 Sees who has the mantle of responsibility 272 file_name: str 273 Return: None or int 274 """ 275 276 # all OSs supported by salt has psutil 277 if not HAS_PSUTIL: 278 log.critical( 279 "Assuming no other Process has this responsibility! pidfile: %s", file_name 280 ) 281 return 282 283 # get process info from file 284 try: 285 with salt.utils.files.fopen(file_name, "r") as file: 286 file_process_info = json.load(file) 287 except json.decoder.JSONDecodeError: 288 log.error("pidfile: %s is corrupted", file_name) 289 return 290 except FileNotFoundError: 291 log.info("pidfile: %s not found", file_name) 292 return 293 294 if not isinstance(file_process_info, dict) or not isinstance( 295 file_process_info.get("pid"), int 296 ): 297 return 298 299 if file_process_info == get_process_info(file_process_info["pid"]): 300 return file_process_info["pid"] 301 302 303def set_pidfile(pidfile, user): 304 """ 305 Save the pidfile 306 """ 307 pdir = os.path.dirname(pidfile) 308 if not os.path.isdir(pdir) and pdir: 309 os.makedirs(pdir) 310 try: 311 with salt.utils.files.fopen(pidfile, "w+") as ofile: 312 ofile.write(str(os.getpid())) 313 except OSError: 314 pass 315 316 log.debug("Created pidfile: %s", pidfile) 317 if salt.utils.platform.is_windows(): 318 return True 319 320 import pwd # after confirming not running Windows 321 322 # import grp 323 try: 324 pwnam = pwd.getpwnam(user) 325 uid = pwnam[2] 326 gid = pwnam[3] 327 # groups = [g.gr_gid for g in grp.getgrall() if user in g.gr_mem] 328 except (KeyError, IndexError): 329 sys.stderr.write( 330 "Failed to set the pid to user: {}. The user is not available.\n".format( 331 user 332 ) 333 ) 334 sys.exit(salt.defaults.exitcodes.EX_NOUSER) 335 336 if os.getuid() == uid: 337 # The current user already owns the pidfile. Return! 338 return 339 340 try: 341 os.chown(pidfile, uid, gid) 342 except OSError as err: 343 msg = "Failed to set the ownership of PID file {} to user {}.".format( 344 pidfile, user 345 ) 346 log.debug("%s Traceback follows:", msg, exc_info=True) 347 sys.stderr.write("{}\n".format(msg)) 348 sys.exit(err.errno) 349 log.debug("Chowned pidfile: %s to user: %s", pidfile, user) 350 351 352def check_pidfile(pidfile): 353 """ 354 Determine if a pidfile has been written out 355 """ 356 return os.path.isfile(pidfile) 357 358 359def get_pidfile(pidfile): 360 """ 361 Return the pid from a pidfile as an integer 362 """ 363 try: 364 with salt.utils.files.fopen(pidfile) as pdf: 365 pid = pdf.read().strip() 366 return int(pid) 367 except (OSError, TypeError, ValueError): 368 return -1 369 370 371def clean_proc(proc, wait_for_kill=10): 372 """ 373 Generic method for cleaning up multiprocessing procs 374 """ 375 # NoneType and other fun stuff need not apply 376 if not proc: 377 return 378 try: 379 waited = 0 380 while proc.is_alive(): 381 proc.terminate() 382 waited += 1 383 time.sleep(0.1) 384 if proc.is_alive() and (waited >= wait_for_kill): 385 log.error("Process did not die with terminate(): %s", proc.pid) 386 os.kill(proc.pid, signal.SIGKILL) 387 except (AssertionError, AttributeError): 388 # Catch AssertionError when the proc is evaluated inside the child 389 # Catch AttributeError when the process dies between proc.is_alive() 390 # and proc.terminate() and turns into a NoneType 391 pass 392 393 394def os_is_running(pid): 395 """ 396 Use OS facilities to determine if a process is running 397 """ 398 if isinstance(pid, str): 399 pid = int(pid) 400 if HAS_PSUTIL: 401 return psutil.pid_exists(pid) 402 else: 403 try: 404 os.kill(pid, 0) # SIG 0 is the "are you alive?" signal 405 return True 406 except OSError: 407 return False 408 409 410class ThreadPool: 411 """ 412 This is a very VERY basic threadpool implementation 413 This was made instead of using multiprocessing ThreadPool because 414 we want to set max queue size and we want to daemonize threads (neither 415 is exposed in the stdlib version). 416 417 Since there isn't much use for this class as of right now this implementation 418 Only supports daemonized threads and will *not* return results 419 420 TODO: if this is found to be more generally useful it would be nice to pull 421 in the majority of code from upstream or from http://bit.ly/1wTeJtM 422 """ 423 424 def __init__(self, num_threads=None, queue_size=0): 425 # if no count passed, default to number of CPUs 426 if num_threads is None: 427 num_threads = multiprocessing.cpu_count() 428 self.num_threads = num_threads 429 430 # create a task queue of queue_size 431 self._job_queue = queue.Queue(queue_size) 432 433 self._workers = [] 434 435 # create worker threads 436 for _ in range(num_threads): 437 thread = threading.Thread(target=self._thread_target) 438 thread.daemon = True 439 thread.start() 440 self._workers.append(thread) 441 442 # intentionally not called "apply_async" since we aren't keeping track of 443 # the return at all, if we want to make this API compatible with multiprocessing 444 # threadpool we can in the future, and we won't have to worry about name collision 445 def fire_async(self, func, args=None, kwargs=None): 446 if args is None: 447 args = [] 448 if kwargs is None: 449 kwargs = {} 450 try: 451 self._job_queue.put_nowait((func, args, kwargs)) 452 return True 453 except queue.Full: 454 return False 455 456 def _thread_target(self): 457 while True: 458 # 1s timeout so that if the parent dies this thread will die within 1s 459 try: 460 try: 461 func, args, kwargs = self._job_queue.get(timeout=1) 462 self._job_queue.task_done() # Mark the task as done once we get it 463 except queue.Empty: 464 continue 465 except AttributeError: 466 # During shutdown, `queue` may not have an `Empty` atttribute. Thusly, 467 # we have to catch a possible exception from our exception handler in 468 # order to avoid an unclean shutdown. Le sigh. 469 continue 470 try: 471 log.debug( 472 "ThreadPool executing func: %s with args=%s kwargs=%s", 473 func, 474 args, 475 kwargs, 476 ) 477 func(*args, **kwargs) 478 except Exception as err: # pylint: disable=broad-except 479 log.debug(err, exc_info=True) 480 481 482class ProcessManager: 483 """ 484 A class which will manage processes that should be running 485 """ 486 487 def __init__(self, name=None, wait_for_kill=1): 488 # pid -> {tgt: foo, Process: object, args: args, kwargs: kwargs} 489 self._process_map = {} 490 491 self.name = name 492 if self.name is None: 493 self.name = self.__class__.__name__ 494 495 self.wait_for_kill = wait_for_kill 496 497 # store some pointers for the SIGTERM handler 498 self._pid = os.getpid() 499 self._sigterm_handler = signal.getsignal(signal.SIGTERM) 500 self._restart_processes = True 501 502 def add_process(self, tgt, args=None, kwargs=None, name=None): 503 """ 504 Create a processes and args + kwargs 505 This will deterimine if it is a Process class, otherwise it assumes 506 it is a function 507 """ 508 if args is None: 509 args = [] 510 511 if kwargs is None: 512 kwargs = {} 513 514 if salt.utils.platform.is_windows(): 515 # Need to ensure that 'log_queue' and 'log_queue_level' is 516 # correctly transferred to processes that inherit from 517 # 'Process'. 518 if type(Process) is type(tgt) and (issubclass(tgt, Process)): 519 need_log_queue = True 520 else: 521 need_log_queue = False 522 523 if need_log_queue: 524 if "log_queue" not in kwargs: 525 if hasattr(self, "log_queue"): 526 kwargs["log_queue"] = self.log_queue 527 else: 528 kwargs[ 529 "log_queue" 530 ] = salt.log.setup.get_multiprocessing_logging_queue() 531 if "log_queue_level" not in kwargs: 532 if hasattr(self, "log_queue_level"): 533 kwargs["log_queue_level"] = self.log_queue_level 534 else: 535 kwargs[ 536 "log_queue_level" 537 ] = salt.log.setup.get_multiprocessing_logging_level() 538 539 # create a nicer name for the debug log 540 if name is None: 541 if isinstance(tgt, types.FunctionType): 542 name = "{}.{}".format( 543 tgt.__module__, 544 tgt.__name__, 545 ) 546 else: 547 name = "{}{}.{}".format( 548 tgt.__module__, 549 ".{}".format(tgt.__class__) 550 if str(tgt.__class__) != "<type 'type'>" 551 else "", 552 tgt.__name__, 553 ) 554 555 if type(multiprocessing.Process) is type(tgt) and issubclass( 556 tgt, multiprocessing.Process 557 ): 558 process = tgt(*args, **kwargs) 559 else: 560 process = multiprocessing.Process( 561 target=tgt, args=args, kwargs=kwargs, name=name 562 ) 563 564 if isinstance(process, SignalHandlingProcess): 565 with default_signals(signal.SIGINT, signal.SIGTERM): 566 process.start() 567 else: 568 process.start() 569 log.debug("Started '%s' with pid %s", name, process.pid) 570 self._process_map[process.pid] = { 571 "tgt": tgt, 572 "args": args, 573 "kwargs": kwargs, 574 "Process": process, 575 } 576 return process 577 578 def restart_process(self, pid): 579 """ 580 Create new process (assuming this one is dead), then remove the old one 581 """ 582 if self._restart_processes is False: 583 return 584 exit = self._process_map[pid]["Process"].exitcode 585 if exit > 0: 586 log.info( 587 "Process %s (%s) died with exit status %s, restarting...", 588 self._process_map[pid]["tgt"], 589 pid, 590 self._process_map[pid]["Process"].exitcode, 591 ) 592 else: 593 log.debug( 594 "Process %s (%s) died with exit status %s, restarting...", 595 self._process_map[pid]["tgt"], 596 pid, 597 self._process_map[pid]["Process"].exitcode, 598 ) 599 # don't block, the process is already dead 600 self._process_map[pid]["Process"].join(1) 601 602 self.add_process( 603 self._process_map[pid]["tgt"], 604 self._process_map[pid]["args"], 605 self._process_map[pid]["kwargs"], 606 ) 607 608 del self._process_map[pid] 609 610 def stop_restarting(self): 611 self._restart_processes = False 612 613 def send_signal_to_processes(self, signal_): 614 if salt.utils.platform.is_windows() and signal_ in ( 615 signal.SIGTERM, 616 signal.SIGINT, 617 ): 618 # On Windows, the subprocesses automatically have their signal 619 # handlers invoked. If you send one of these signals while the 620 # signal handler is running, it will kill the process where it 621 # is currently running and the signal handler will not finish. 622 # This will also break the process tree: children of killed 623 # children will become parentless and not findable when trying 624 # to kill the process tree (they don't inherit their parent's 625 # parent). Hence the 'MWorker' processes would be left over if 626 # the 'ReqServer' process is killed this way since 'taskkill' 627 # with the tree option will not be able to find them. 628 return 629 630 for pid in self._process_map.copy().keys(): 631 try: 632 os.kill(pid, signal_) 633 except OSError as exc: 634 if exc.errno not in (errno.ESRCH, errno.EACCES): 635 # If it's not a "No such process" error, raise it 636 raise 637 # Otherwise, it's a dead process, remove it from the process map 638 del self._process_map[pid] 639 640 @gen.coroutine 641 def run(self, asynchronous=False): 642 """ 643 Load and start all available api modules 644 """ 645 log.debug("Process Manager starting!") 646 appendproctitle(self.name) 647 648 # make sure to kill the subprocesses if the parent is killed 649 if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL: 650 # There are no SIGTERM handlers installed, install ours 651 signal.signal(signal.SIGTERM, self._handle_signals) 652 if signal.getsignal(signal.SIGINT) is signal.SIG_DFL: 653 # There are no SIGINT handlers installed, install ours 654 signal.signal(signal.SIGINT, self._handle_signals) 655 656 while True: 657 log.trace("Process manager iteration") 658 try: 659 # in case someone died while we were waiting... 660 self.check_children() 661 # The event-based subprocesses management code was removed from here 662 # because os.wait() conflicts with the subprocesses management logic 663 # implemented in `multiprocessing` package. See #35480 for details. 664 if asynchronous: 665 yield gen.sleep(10) 666 else: 667 time.sleep(10) 668 if not self._process_map: 669 break 670 # OSError is raised if a signal handler is called (SIGTERM) during os.wait 671 except OSError: 672 break 673 except OSError as exc: # pylint: disable=duplicate-except 674 # IOError with errno of EINTR (4) may be raised 675 # when using time.sleep() on Windows. 676 if exc.errno != errno.EINTR: 677 raise 678 break 679 680 def check_children(self): 681 """ 682 Check the children once 683 """ 684 if self._restart_processes is True: 685 for pid, mapping in self._process_map.copy().items(): 686 if not mapping["Process"].is_alive(): 687 log.trace("Process restart of %s", pid) 688 self.restart_process(pid) 689 690 def kill_children(self, *args, **kwargs): 691 """ 692 Kill all of the children 693 """ 694 if salt.utils.platform.is_windows(): 695 if multiprocessing.current_process().name != "MainProcess": 696 # Since the main process will kill subprocesses by tree, 697 # no need to do anything in the subprocesses. 698 # Sometimes, when both a subprocess and the main process 699 # call 'taskkill', it will leave a 'taskkill' zombie process. 700 # We want to avoid this. 701 return 702 with salt.utils.files.fopen(os.devnull, "wb") as devnull: 703 for pid, p_map in self._process_map.items(): 704 # On Windows, we need to explicitly terminate sub-processes 705 # because the processes don't have a sigterm handler. 706 subprocess.call( 707 ["taskkill", "/F", "/T", "/PID", str(pid)], 708 stdout=devnull, 709 stderr=devnull, 710 ) 711 p_map["Process"].terminate() 712 else: 713 for pid, p_map in self._process_map.copy().items(): 714 log.trace("Terminating pid %s: %s", pid, p_map["Process"]) 715 if args: 716 # escalate the signal to the process 717 try: 718 os.kill(pid, args[0]) 719 except OSError: 720 pass 721 try: 722 p_map["Process"].terminate() 723 except OSError as exc: 724 if exc.errno not in (errno.ESRCH, errno.EACCES): 725 raise 726 if not p_map["Process"].is_alive(): 727 try: 728 del self._process_map[pid] 729 except KeyError: 730 # Race condition 731 pass 732 733 end_time = time.time() + self.wait_for_kill # when to die 734 735 log.trace("Waiting to kill process manager children") 736 while self._process_map and time.time() < end_time: 737 for pid, p_map in self._process_map.copy().items(): 738 log.trace("Joining pid %s: %s", pid, p_map["Process"]) 739 p_map["Process"].join(0) 740 741 if not p_map["Process"].is_alive(): 742 # The process is no longer alive, remove it from the process map dictionary 743 try: 744 del self._process_map[pid] 745 except KeyError: 746 # This is a race condition if a signal was passed to all children 747 pass 748 749 # if any managed processes still remain to be handled, let's kill them 750 kill_iterations = 2 751 while kill_iterations >= 0: 752 kill_iterations -= 1 753 for pid, p_map in self._process_map.copy().items(): 754 if not p_map["Process"].is_alive(): 755 # The process is no longer alive, remove it from the process map dictionary 756 try: 757 del self._process_map[pid] 758 except KeyError: 759 # This is a race condition if a signal was passed to all children 760 pass 761 continue 762 log.trace("Killing pid %s: %s", pid, p_map["Process"]) 763 try: 764 os.kill(pid, signal.SIGKILL) 765 except OSError as exc: 766 log.exception(exc) 767 # in case the process has since decided to die, os.kill returns OSError 768 if not p_map["Process"].is_alive(): 769 # The process is no longer alive, remove it from the process map dictionary 770 try: 771 del self._process_map[pid] 772 except KeyError: 773 # This is a race condition if a signal was passed to all children 774 pass 775 776 if self._process_map: 777 # Some processes disrespected the KILL signal!!!! 778 available_retries = kwargs.get("retry", 3) 779 if available_retries >= 0: 780 log.info( 781 "Some processes failed to respect the KILL signal: %s", 782 "; ".join( 783 "Process: {} (Pid: {})".format(v["Process"], k) 784 for ( # pylint: disable=str-format-in-logging 785 k, 786 v, 787 ) in self._process_map.items() 788 ), 789 ) 790 log.info("kill_children retries left: %s", available_retries) 791 kwargs["retry"] = available_retries - 1 792 return self.kill_children(*args, **kwargs) 793 else: 794 log.warning( 795 "Failed to kill the following processes: %s", 796 "; ".join( 797 "Process: {} (Pid: {})".format(v["Process"], k) 798 for ( # pylint: disable=str-format-in-logging 799 k, 800 v, 801 ) in self._process_map.items() 802 ), 803 ) 804 log.warning( 805 "Salt will either fail to terminate now or leave some " 806 "zombie processes behind" 807 ) 808 809 def terminate(self): 810 """ 811 Properly terminate this process manager instance 812 """ 813 self.stop_restarting() 814 self.send_signal_to_processes(signal.SIGTERM) 815 self.kill_children() 816 817 def _handle_signals(self, *args, **kwargs): 818 # first lets reset signal handlers to default one to prevent running this twice 819 signal.signal(signal.SIGTERM, signal.SIG_IGN) 820 signal.signal(signal.SIGINT, signal.SIG_IGN) 821 822 self.stop_restarting() 823 self.send_signal_to_processes(signal.SIGTERM) 824 825 # check that this is the correct process, children inherit this 826 # handler, if we are in a child lets just run the original handler 827 if os.getpid() != self._pid: 828 if callable(self._sigterm_handler): 829 return self._sigterm_handler(*args) 830 elif self._sigterm_handler is not None: 831 return signal.default_int_handler(signal.SIGTERM)(*args) 832 else: 833 return 834 835 # Terminate child processes 836 self.kill_children(*args, **kwargs) 837 838 839class Process(multiprocessing.Process): 840 """ 841 Salt relies on this custom implementation of :py:class:`~multiprocessing.Process` to 842 simplify/automate some common procedures, for example, logging in the new process is 843 configured for "free" for every new process. 844 This is most important in platforms which default to ``spawn` instead of ``fork`` for 845 new processes. 846 847 This is achieved by some dunder methods in the class: 848 849 * ``__new__``: 850 851 This method ensures that any arguments and/or keyword arguments that are passed to 852 ``__init__`` are captured. 853 854 By having this information captured, we can define ``__setstate__`` and ``__getstate__`` 855 to automatically take care of reconstructing the object state on spawned processes. 856 857 * ``__getstate__``: 858 859 This method should return a dictionary which will be used as the ``state`` argument to 860 :py:method:`salt.utils.process.Process.__setstate__`. 861 Usually, when subclassing, this method does not need to be implemented, however, 862 if implemented, `super()` **must** be called. 863 864 * ``__setstate__``: 865 866 This method reconstructs the object on the spawned process. 867 The ``state`` argument is constructed by the 868 :py:method:`salt.utils.process.Process.__getstate__` method. 869 Usually, when subclassing, this method does not need to be implemented, however, 870 if implemented, `super()` **must** be called. 871 872 873 An example of where ``__setstate__`` and ``__getstate__`` needed to be subclassed can be 874 seen in :py:class:`salt.master.MWorker`. 875 876 The gist of it is something like, if there are internal attributes which need to maintain 877 their state on spawned processes, then, subclasses must implement ``__getstate__`` and 878 ``__setstate__`` to ensure that. 879 880 881 For example: 882 883 884 .. code-block:: python 885 886 import salt.utils.process 887 888 class MyCustomProcess(salt.utils.process.Process): 889 890 def __init__(self, opts, **kwargs): 891 super().__init__(**kwargs) 892 self.opts = opts 893 894 # This attribute, counter, should only start at 0 on the initial(parent) process. 895 # Any child processes, need to carry the current value of the counter(instead of 896 # starting at zero). 897 self.counter = 0 898 899 def __getstate__(self): 900 state = super().__getstate__() 901 state.update( 902 { 903 "counter": self.counter, 904 } 905 ) 906 return state 907 908 def __setstate__(self, state): 909 super().__setstate__(state) 910 self.counter = state["counter"] 911 """ 912 913 def __new__(cls, *args, **kwargs): 914 """ 915 This method ensures that any arguments and/or keyword arguments that are passed to 916 ``__init__`` are captured. 917 918 By having this information captured, we can define ``__setstate__`` and ``__getstate__`` 919 to automatically take care of object pickling which is required for platforms that 920 spawn processes instead of forking them. 921 """ 922 # We implement __new__ because we want to capture the passed in *args and **kwargs 923 # in order to remove the need for each class to implement __getstate__ and __setstate__ 924 # which is required on spawning platforms 925 instance = super().__new__(cls) 926 instance._after_fork_methods = [] 927 instance._finalize_methods = [] 928 929 if salt.utils.platform.spawning_platform(): 930 # On spawning platforms, subclasses should call super if they define 931 # __setstate__ and/or __getstate__ 932 instance._args_for_getstate = copy.copy(args) 933 instance._kwargs_for_getstate = copy.copy(kwargs) 934 return instance 935 936 def __init__(self, *args, **kwargs): 937 log_queue = kwargs.pop("log_queue", None) 938 log_queue_level = kwargs.pop("log_queue_level", None) 939 super().__init__(*args, **kwargs) 940 self.log_queue = log_queue 941 if self.log_queue is None: 942 self.log_queue = salt.log.setup.get_multiprocessing_logging_queue() 943 944 self.log_queue_level = log_queue_level 945 if self.log_queue_level is None: 946 self.log_queue_level = salt.log.setup.get_multiprocessing_logging_level() 947 948 # Because we need to enforce our after fork and finalize routines, 949 # we must wrap this class run method to allow for these extra steps 950 # to be executed pre and post calling the actual run method, 951 # having subclasses call super would just not work. 952 # 953 # We use setattr here to fool pylint not to complain that we're 954 # overriding run from the subclass here 955 setattr(self, "run", self.__decorate_run(self.run)) 956 957 # __setstate__ and __getstate__ are only used on spawning platforms. 958 def __setstate__(self, state): 959 """ 960 This method reconstructs the object on the spawned process. 961 The ``state`` argument is constructed by :py:method:`salt.utils.process.Process.__getstate__`. 962 963 Usually, when subclassing, this method does not need to be implemented, however, 964 if implemented, `super()` **must** be called. 965 """ 966 args = state["args"] 967 kwargs = state["kwargs"] 968 # This will invoke __init__ of the most derived class. 969 self.__init__(*args, **kwargs) 970 for (function, args, kwargs) in state["after_fork_methods"]: 971 self.register_after_fork_method(function, *args, **kwargs) 972 for (function, args, kwargs) in state["finalize_methods"]: 973 self.register_finalize_method(function, *args, **kwargs) 974 975 def __getstate__(self): 976 """ 977 This method should return a dictionary which will be used as the ``state`` argument to 978 :py:method:`salt.utils.process.Process.__setstate__`. 979 Usually, when subclassing, this method does not need to be implemented, however, 980 if implemented, `super()` **must** be called. 981 """ 982 args = self._args_for_getstate 983 kwargs = self._kwargs_for_getstate 984 if "log_queue" not in kwargs: 985 kwargs["log_queue"] = self.log_queue 986 if "log_queue_level" not in kwargs: 987 kwargs["log_queue_level"] = self.log_queue_level 988 return { 989 "args": args, 990 "kwargs": kwargs, 991 "after_fork_methods": self._after_fork_methods, 992 "finalize_methods": self._finalize_methods, 993 } 994 995 def __decorate_run(self, run_func): 996 @functools.wraps(run_func) 997 def wrapped_run_func(): 998 # Static after fork method, always needs to happen first 999 try: 1000 salt.log.setup.set_multiprocessing_logging_queue(self.log_queue) 1001 except Exception: # pylint: disable=broad-except 1002 log.exception( 1003 "Failed to run salt.log.setup.set_multiprocessing_logging_queue() on %s", 1004 self, 1005 ) 1006 try: 1007 salt.log.setup.set_multiprocessing_logging_level(self.log_queue_level) 1008 except Exception: # pylint: disable=broad-except 1009 log.exception( 1010 "Failed to run salt.log.setup.set_multiprocessing_logging_level() on %s", 1011 self, 1012 ) 1013 try: 1014 salt.log.setup.setup_multiprocessing_logging(self.log_queue) 1015 except Exception: # pylint: disable=broad-except 1016 log.exception( 1017 "Failed to run salt.log.setup.setup_multiprocessing_logging() on %s", 1018 self, 1019 ) 1020 1021 for method, args, kwargs in self._after_fork_methods: 1022 try: 1023 method(*args, **kwargs) 1024 except Exception: # pylint: disable=broad-except 1025 log.exception( 1026 "Failed to run after fork callback on %s; method=%r; args=%r; and kwargs=%r", 1027 self, 1028 method, 1029 args, 1030 kwargs, 1031 ) 1032 continue 1033 try: 1034 return run_func() 1035 except SystemExit: # pylint: disable=try-except-raise 1036 # These are handled by multiprocessing.Process._bootstrap() 1037 raise 1038 except Exception: # pylint: disable=broad-except 1039 log.error( 1040 "An un-handled exception from the multiprocessing process " 1041 "'%s' was caught:\n", 1042 self.name, 1043 exc_info=True, 1044 ) 1045 # Re-raise the exception. multiprocessing.Process will write it to 1046 # sys.stderr and set the proper exitcode and we have already logged 1047 # it above. 1048 raise 1049 finally: 1050 try: 1051 for method, args, kwargs in self._finalize_methods: 1052 try: 1053 method(*args, **kwargs) 1054 except Exception: # pylint: disable=broad-except 1055 log.exception( 1056 "Failed to run finalize callback on %s; method=%r; args=%r; and kwargs=%r", 1057 self, 1058 method, 1059 args, 1060 kwargs, 1061 ) 1062 continue 1063 finally: 1064 # Static finalize method, should always run last 1065 try: 1066 salt.log.setup.shutdown_multiprocessing_logging() 1067 except Exception: # pylint: disable=broad-except 1068 log.exception( 1069 "Failed to run salt.log.setup.shutdown_multiprocessing_logging() on %s", 1070 self, 1071 ) 1072 1073 return wrapped_run_func 1074 1075 def register_after_fork_method(self, function, *args, **kwargs): 1076 """ 1077 Register a function to run after the process has forked 1078 """ 1079 after_fork_method_tuple = (function, args, kwargs) 1080 if after_fork_method_tuple not in self._after_fork_methods: 1081 self._after_fork_methods.append(after_fork_method_tuple) 1082 1083 def register_finalize_method(self, function, *args, **kwargs): 1084 """ 1085 Register a function to run as process terminates 1086 """ 1087 finalize_method_tuple = (function, args, kwargs) 1088 if finalize_method_tuple not in self._finalize_methods: 1089 self._finalize_methods.append(finalize_method_tuple) 1090 1091 1092class MultiprocessingProcess(Process): 1093 """ 1094 This class exists for backwards compatibility and to properly deprecate it. 1095 """ 1096 1097 def __init__(self, *args, **kwargs): 1098 salt.utils.versions.warn_until_date( 1099 "20220101", 1100 "Please stop using '{name}.MultiprocessingProcess' and instead use " 1101 "'{name}.Process'. '{name}.MultiprocessingProcess' will go away " 1102 "after {{date}}.".format(name=__name__), 1103 stacklevel=3, 1104 ) 1105 super().__init__(*args, **kwargs) 1106 1107 1108class SignalHandlingProcess(Process): 1109 def __init__(self, *args, **kwargs): 1110 super().__init__(*args, **kwargs) 1111 self._signal_handled = multiprocessing.Event() 1112 self.register_after_fork_method(SignalHandlingProcess._setup_signals, self) 1113 1114 def signal_handled(self): 1115 return self._signal_handled.is_set() 1116 1117 def _setup_signals(self): 1118 signal.signal(signal.SIGINT, self._handle_signals) 1119 signal.signal(signal.SIGTERM, self._handle_signals) 1120 1121 def _handle_signals(self, signum, sigframe): 1122 self._signal_handled.set() 1123 signal.signal(signal.SIGTERM, signal.SIG_IGN) 1124 signal.signal(signal.SIGINT, signal.SIG_IGN) 1125 msg = "{} received a ".format(self.__class__.__name__) 1126 if signum == signal.SIGINT: 1127 msg += "SIGINT" 1128 elif signum == signal.SIGTERM: 1129 msg += "SIGTERM" 1130 msg += ". Exiting" 1131 log.debug(msg) 1132 if HAS_PSUTIL: 1133 try: 1134 process = psutil.Process(os.getpid()) 1135 if hasattr(process, "children"): 1136 for child in process.children(recursive=True): 1137 try: 1138 if child.is_running(): 1139 child.terminate() 1140 except psutil.NoSuchProcess: 1141 log.warning( 1142 "Unable to kill child of process %d, it does " 1143 "not exist. My pid is %d", 1144 self.pid, 1145 os.getpid(), 1146 ) 1147 except psutil.NoSuchProcess: 1148 log.warning( 1149 "Unable to kill children of process %d, it does not exist." 1150 "My pid is %d", 1151 self.pid, 1152 os.getpid(), 1153 ) 1154 sys.exit(salt.defaults.exitcodes.EX_OK) 1155 1156 def start(self): 1157 with default_signals(signal.SIGINT, signal.SIGTERM): 1158 super().start() 1159 1160 1161class SignalHandlingMultiprocessingProcess(SignalHandlingProcess): 1162 """ 1163 This class exists for backwards compatibility and to properly deprecate it. 1164 """ 1165 1166 def __init__(self, *args, **kwargs): 1167 salt.utils.versions.warn_until_date( 1168 "20220101", 1169 "Please stop using '{name}.SignalHandlingMultiprocessingProcess' and" 1170 " instead use '{name}.SignalHandlingProcess'." 1171 " '{name}.SignalHandlingMultiprocessingProcess' will go away after" 1172 " {{date}}.".format(name=__name__), 1173 stacklevel=3, 1174 ) 1175 super().__init__(*args, **kwargs) 1176 1177 1178@contextlib.contextmanager 1179def default_signals(*signals): 1180 old_signals = {} 1181 for signum in signals: 1182 try: 1183 saved_signal = signal.getsignal(signum) 1184 signal.signal(signum, signal.SIG_DFL) 1185 except ValueError as exc: 1186 # This happens when a netapi module attempts to run a function 1187 # using wheel_async, because the process trying to register signals 1188 # will not be the main PID. 1189 log.trace("Failed to register signal for signum %d: %s", signum, exc) 1190 else: 1191 old_signals[signum] = saved_signal 1192 1193 try: 1194 # Do whatever is needed with the reset signals 1195 yield 1196 finally: 1197 # Restore signals 1198 for signum in old_signals: 1199 signal.signal(signum, old_signals[signum]) 1200 1201 del old_signals 1202 1203 1204class SubprocessList: 1205 def __init__(self, processes=None, lock=None): 1206 if processes is None: 1207 self.processes = [] 1208 else: 1209 self.processes = processes 1210 if lock is None: 1211 self.lock = multiprocessing.Lock() 1212 else: 1213 self.lock = lock 1214 self.count = 0 1215 1216 def add(self, proc): 1217 with self.lock: 1218 self.processes.append(proc) 1219 log.debug("Subprocess %s added", proc.name) 1220 self.count += 1 1221 1222 def cleanup(self): 1223 with self.lock: 1224 for proc in self.processes: 1225 if proc.is_alive(): 1226 continue 1227 proc.join() 1228 self.processes.remove(proc) 1229 self.count -= 1 1230 log.debug("Subprocess %s cleaned up", proc.name) 1231