1"""
2Functions for daemonizing and otherwise modifying running processes
3"""
4import contextlib
5import copy
6import errno
7import functools
8import io
9import json
10import logging
11import multiprocessing
12import multiprocessing.util
13import os
14import queue
15import signal
16import socket
17import subprocess
18import sys
19import threading
20import time
21import types
22
23import salt.defaults.exitcodes
24import salt.log.setup
25import salt.utils.files
26import salt.utils.path
27import salt.utils.platform
28import salt.utils.versions
29from salt.ext.tornado import gen
30
31log = logging.getLogger(__name__)
32
33HAS_PSUTIL = False
34try:
35    import psutil
36
37    HAS_PSUTIL = True
38except ImportError:
39    pass
40
41try:
42    import setproctitle
43
44    HAS_SETPROCTITLE = True
45except ImportError:
46    HAS_SETPROCTITLE = False
47
48
49def appendproctitle(name):
50    """
51    Append "name" to the current process title
52    """
53    if HAS_SETPROCTITLE:
54        setproctitle.setproctitle(setproctitle.getproctitle() + " " + name)
55
56
57def daemonize(redirect_out=True):
58    """
59    Daemonize a process
60    """
61    # Avoid circular import
62    import salt.utils.crypt
63
64    try:
65        pid = os.fork()
66        if pid > 0:
67            # exit first parent
68            salt.utils.crypt.reinit_crypto()
69            os._exit(salt.defaults.exitcodes.EX_OK)
70    except OSError as exc:
71        log.error("fork #1 failed: %s (%s)", exc.errno, exc)
72        sys.exit(salt.defaults.exitcodes.EX_GENERIC)
73
74    # decouple from parent environment
75    os.chdir("/")
76    # noinspection PyArgumentList
77    os.setsid()
78    os.umask(0o022)  # pylint: disable=blacklisted-function
79
80    # do second fork
81    try:
82        pid = os.fork()
83        if pid > 0:
84            salt.utils.crypt.reinit_crypto()
85            sys.exit(salt.defaults.exitcodes.EX_OK)
86    except OSError as exc:
87        log.error("fork #2 failed: %s (%s)", exc.errno, exc)
88        sys.exit(salt.defaults.exitcodes.EX_GENERIC)
89
90    salt.utils.crypt.reinit_crypto()
91
92    # A normal daemonization redirects the process output to /dev/null.
93    # Unfortunately when a python multiprocess is called the output is
94    # not cleanly redirected and the parent process dies when the
95    # multiprocessing process attempts to access stdout or err.
96    if redirect_out:
97        with salt.utils.files.fopen("/dev/null", "r+") as dev_null:
98            # Redirect python stdin/out/err
99            # and the os stdin/out/err which can be different
100            dup2(dev_null, sys.stdin)
101            dup2(dev_null, sys.stdout)
102            dup2(dev_null, sys.stderr)
103            dup2(dev_null, 0)
104            dup2(dev_null, 1)
105            dup2(dev_null, 2)
106
107
108def dup2(file1, file2):
109    """
110    Duplicate file descriptor fd to fd2, closing the latter first if necessary.
111    This method is similar to os.dup2 but ignores streams that do not have a
112    supported fileno method.
113    """
114    if isinstance(file1, int):
115        fno1 = file1
116    else:
117        try:
118            fno1 = file1.fileno()
119        except io.UnsupportedOperation:
120            log.warning("Unsupported operation on file: %r", file1)
121            return
122    if isinstance(file2, int):
123        fno2 = file2
124    else:
125        try:
126            fno2 = file2.fileno()
127        except io.UnsupportedOperation:
128            log.warning("Unsupported operation on file: %r", file2)
129            return
130    os.dup2(fno1, fno2)
131
132
133def daemonize_if(opts):
134    """
135    Daemonize a module function process if multiprocessing is True and the
136    process is not being called by salt-call
137    """
138    if "salt-call" in sys.argv[0]:
139        return
140    if not opts.get("multiprocessing", True):
141        return
142    if sys.platform.startswith("win"):
143        return
144    daemonize(False)
145
146
147def systemd_notify_call(action):
148    process = subprocess.Popen(
149        ["systemd-notify", action], stdout=subprocess.PIPE, stderr=subprocess.PIPE
150    )
151    process.communicate()
152    status = process.poll()
153    return status == 0
154
155
156def notify_systemd():
157    """
158    Notify systemd that this process has started
159    """
160    try:
161        import systemd.daemon  # pylint: disable=no-name-in-module
162    except ImportError:
163        if salt.utils.path.which("systemd-notify") and systemd_notify_call("--booted"):
164            # Notify systemd synchronously
165            notify_socket = os.getenv("NOTIFY_SOCKET")
166            if notify_socket:
167                # Handle abstract namespace socket
168                if notify_socket.startswith("@"):
169                    notify_socket = "\0{}".format(notify_socket[1:])
170                try:
171                    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
172                    sock.connect(notify_socket)
173                    sock.sendall(b"READY=1")
174                    sock.close()
175                except OSError:
176                    return systemd_notify_call("--ready")
177                return True
178        return False
179
180    if systemd.daemon.booted():
181        try:
182            return systemd.daemon.notify("READY=1")
183        except SystemError:
184            # Daemon was not started by systemd
185            pass
186
187
188def get_process_info(pid=None):
189    """
190    Gets basic info about a process.
191    pid: None, or int: None will get the current process pid
192    Return: None or Dict
193    """
194    if pid is None:
195        pid = os.getpid()
196    elif not psutil.pid_exists(pid):
197        return
198
199    raw_process_info = psutil.Process(pid)
200
201    # pid_exists can have false positives
202    # for example Windows reserves PID 5 in a hack way
203    # another reasons is the the process requires kernel permissions
204    try:
205        raw_process_info.status()
206    except psutil.NoSuchProcess:
207        return None
208
209    return {
210        "pid": raw_process_info.pid,
211        "name": raw_process_info.name(),
212        "start_time": raw_process_info.create_time(),
213    }
214
215
216def claim_mantle_of_responsibility(file_name):
217    """
218    Checks that no other live processes has this responsibility.
219    If claiming the mantle of responsibility was successful True will be returned.
220    file_name: str
221    Return: bool
222    """
223
224    # all OSs supported by salt has psutil
225    if not HAS_PSUTIL:
226        log.critical(
227            "Assuming no other Process has this responsibility! pidfile: %s", file_name
228        )
229        return True
230
231    # add file directory if missing
232    file_directory_name = os.path.dirname(file_name)
233    if not os.path.isdir(file_directory_name) and file_directory_name:
234        os.makedirs(file_directory_name)
235
236    # get process info from file
237    file_process_info = None
238    try:
239        with salt.utils.files.fopen(file_name, "r") as file:
240            file_process_info = json.load(file)
241    except json.decoder.JSONDecodeError:
242        log.error("pidfile: %s is corrupted", file_name)
243    except FileNotFoundError:
244        log.info("pidfile: %s not found", file_name)
245
246    this_process_info = get_process_info()
247
248    # check if this process all ready has the responsibility
249    if file_process_info == this_process_info:
250        return True
251
252    if not isinstance(file_process_info, dict) or not isinstance(
253        file_process_info.get("pid"), int
254    ):
255        file_process_info = None
256
257    # check if process is still alive
258    if isinstance(file_process_info, dict) and file_process_info == get_process_info(
259        file_process_info.get("pid")
260    ):
261        return False
262
263    # process can take the mantle of responsibility
264    with salt.utils.files.fopen(file_name, "w") as file:
265        json.dump(this_process_info, file)
266    return True
267
268
269def check_mantle_of_responsibility(file_name):
270    """
271    Sees who has the mantle of responsibility
272    file_name: str
273    Return: None or int
274    """
275
276    # all OSs supported by salt has psutil
277    if not HAS_PSUTIL:
278        log.critical(
279            "Assuming no other Process has this responsibility! pidfile: %s", file_name
280        )
281        return
282
283    # get process info from file
284    try:
285        with salt.utils.files.fopen(file_name, "r") as file:
286            file_process_info = json.load(file)
287    except json.decoder.JSONDecodeError:
288        log.error("pidfile: %s is corrupted", file_name)
289        return
290    except FileNotFoundError:
291        log.info("pidfile: %s not found", file_name)
292        return
293
294    if not isinstance(file_process_info, dict) or not isinstance(
295        file_process_info.get("pid"), int
296    ):
297        return
298
299    if file_process_info == get_process_info(file_process_info["pid"]):
300        return file_process_info["pid"]
301
302
303def set_pidfile(pidfile, user):
304    """
305    Save the pidfile
306    """
307    pdir = os.path.dirname(pidfile)
308    if not os.path.isdir(pdir) and pdir:
309        os.makedirs(pdir)
310    try:
311        with salt.utils.files.fopen(pidfile, "w+") as ofile:
312            ofile.write(str(os.getpid()))
313    except OSError:
314        pass
315
316    log.debug("Created pidfile: %s", pidfile)
317    if salt.utils.platform.is_windows():
318        return True
319
320    import pwd  # after confirming not running Windows
321
322    # import grp
323    try:
324        pwnam = pwd.getpwnam(user)
325        uid = pwnam[2]
326        gid = pwnam[3]
327        # groups = [g.gr_gid for g in grp.getgrall() if user in g.gr_mem]
328    except (KeyError, IndexError):
329        sys.stderr.write(
330            "Failed to set the pid to user: {}. The user is not available.\n".format(
331                user
332            )
333        )
334        sys.exit(salt.defaults.exitcodes.EX_NOUSER)
335
336    if os.getuid() == uid:
337        # The current user already owns the pidfile. Return!
338        return
339
340    try:
341        os.chown(pidfile, uid, gid)
342    except OSError as err:
343        msg = "Failed to set the ownership of PID file {} to user {}.".format(
344            pidfile, user
345        )
346        log.debug("%s Traceback follows:", msg, exc_info=True)
347        sys.stderr.write("{}\n".format(msg))
348        sys.exit(err.errno)
349    log.debug("Chowned pidfile: %s to user: %s", pidfile, user)
350
351
352def check_pidfile(pidfile):
353    """
354    Determine if a pidfile has been written out
355    """
356    return os.path.isfile(pidfile)
357
358
359def get_pidfile(pidfile):
360    """
361    Return the pid from a pidfile as an integer
362    """
363    try:
364        with salt.utils.files.fopen(pidfile) as pdf:
365            pid = pdf.read().strip()
366        return int(pid)
367    except (OSError, TypeError, ValueError):
368        return -1
369
370
371def clean_proc(proc, wait_for_kill=10):
372    """
373    Generic method for cleaning up multiprocessing procs
374    """
375    # NoneType and other fun stuff need not apply
376    if not proc:
377        return
378    try:
379        waited = 0
380        while proc.is_alive():
381            proc.terminate()
382            waited += 1
383            time.sleep(0.1)
384            if proc.is_alive() and (waited >= wait_for_kill):
385                log.error("Process did not die with terminate(): %s", proc.pid)
386                os.kill(proc.pid, signal.SIGKILL)
387    except (AssertionError, AttributeError):
388        # Catch AssertionError when the proc is evaluated inside the child
389        # Catch AttributeError when the process dies between proc.is_alive()
390        # and proc.terminate() and turns into a NoneType
391        pass
392
393
394def os_is_running(pid):
395    """
396    Use OS facilities to determine if a process is running
397    """
398    if isinstance(pid, str):
399        pid = int(pid)
400    if HAS_PSUTIL:
401        return psutil.pid_exists(pid)
402    else:
403        try:
404            os.kill(pid, 0)  # SIG 0 is the "are you alive?" signal
405            return True
406        except OSError:
407            return False
408
409
410class ThreadPool:
411    """
412    This is a very VERY basic threadpool implementation
413    This was made instead of using multiprocessing ThreadPool because
414    we want to set max queue size and we want to daemonize threads (neither
415    is exposed in the stdlib version).
416
417    Since there isn't much use for this class as of right now this implementation
418    Only supports daemonized threads and will *not* return results
419
420    TODO: if this is found to be more generally useful it would be nice to pull
421    in the majority of code from upstream or from http://bit.ly/1wTeJtM
422    """
423
424    def __init__(self, num_threads=None, queue_size=0):
425        # if no count passed, default to number of CPUs
426        if num_threads is None:
427            num_threads = multiprocessing.cpu_count()
428        self.num_threads = num_threads
429
430        # create a task queue of queue_size
431        self._job_queue = queue.Queue(queue_size)
432
433        self._workers = []
434
435        # create worker threads
436        for _ in range(num_threads):
437            thread = threading.Thread(target=self._thread_target)
438            thread.daemon = True
439            thread.start()
440            self._workers.append(thread)
441
442    # intentionally not called "apply_async"  since we aren't keeping track of
443    # the return at all, if we want to make this API compatible with multiprocessing
444    # threadpool we can in the future, and we won't have to worry about name collision
445    def fire_async(self, func, args=None, kwargs=None):
446        if args is None:
447            args = []
448        if kwargs is None:
449            kwargs = {}
450        try:
451            self._job_queue.put_nowait((func, args, kwargs))
452            return True
453        except queue.Full:
454            return False
455
456    def _thread_target(self):
457        while True:
458            # 1s timeout so that if the parent dies this thread will die within 1s
459            try:
460                try:
461                    func, args, kwargs = self._job_queue.get(timeout=1)
462                    self._job_queue.task_done()  # Mark the task as done once we get it
463                except queue.Empty:
464                    continue
465            except AttributeError:
466                # During shutdown, `queue` may not have an `Empty` atttribute. Thusly,
467                # we have to catch a possible exception from our exception handler in
468                # order to avoid an unclean shutdown. Le sigh.
469                continue
470            try:
471                log.debug(
472                    "ThreadPool executing func: %s with args=%s kwargs=%s",
473                    func,
474                    args,
475                    kwargs,
476                )
477                func(*args, **kwargs)
478            except Exception as err:  # pylint: disable=broad-except
479                log.debug(err, exc_info=True)
480
481
482class ProcessManager:
483    """
484    A class which will manage processes that should be running
485    """
486
487    def __init__(self, name=None, wait_for_kill=1):
488        # pid -> {tgt: foo, Process: object, args: args, kwargs: kwargs}
489        self._process_map = {}
490
491        self.name = name
492        if self.name is None:
493            self.name = self.__class__.__name__
494
495        self.wait_for_kill = wait_for_kill
496
497        # store some pointers for the SIGTERM handler
498        self._pid = os.getpid()
499        self._sigterm_handler = signal.getsignal(signal.SIGTERM)
500        self._restart_processes = True
501
502    def add_process(self, tgt, args=None, kwargs=None, name=None):
503        """
504        Create a processes and args + kwargs
505        This will deterimine if it is a Process class, otherwise it assumes
506        it is a function
507        """
508        if args is None:
509            args = []
510
511        if kwargs is None:
512            kwargs = {}
513
514        if salt.utils.platform.is_windows():
515            # Need to ensure that 'log_queue' and 'log_queue_level' is
516            # correctly transferred to processes that inherit from
517            # 'Process'.
518            if type(Process) is type(tgt) and (issubclass(tgt, Process)):
519                need_log_queue = True
520            else:
521                need_log_queue = False
522
523            if need_log_queue:
524                if "log_queue" not in kwargs:
525                    if hasattr(self, "log_queue"):
526                        kwargs["log_queue"] = self.log_queue
527                    else:
528                        kwargs[
529                            "log_queue"
530                        ] = salt.log.setup.get_multiprocessing_logging_queue()
531                if "log_queue_level" not in kwargs:
532                    if hasattr(self, "log_queue_level"):
533                        kwargs["log_queue_level"] = self.log_queue_level
534                    else:
535                        kwargs[
536                            "log_queue_level"
537                        ] = salt.log.setup.get_multiprocessing_logging_level()
538
539        # create a nicer name for the debug log
540        if name is None:
541            if isinstance(tgt, types.FunctionType):
542                name = "{}.{}".format(
543                    tgt.__module__,
544                    tgt.__name__,
545                )
546            else:
547                name = "{}{}.{}".format(
548                    tgt.__module__,
549                    ".{}".format(tgt.__class__)
550                    if str(tgt.__class__) != "<type 'type'>"
551                    else "",
552                    tgt.__name__,
553                )
554
555        if type(multiprocessing.Process) is type(tgt) and issubclass(
556            tgt, multiprocessing.Process
557        ):
558            process = tgt(*args, **kwargs)
559        else:
560            process = multiprocessing.Process(
561                target=tgt, args=args, kwargs=kwargs, name=name
562            )
563
564        if isinstance(process, SignalHandlingProcess):
565            with default_signals(signal.SIGINT, signal.SIGTERM):
566                process.start()
567        else:
568            process.start()
569        log.debug("Started '%s' with pid %s", name, process.pid)
570        self._process_map[process.pid] = {
571            "tgt": tgt,
572            "args": args,
573            "kwargs": kwargs,
574            "Process": process,
575        }
576        return process
577
578    def restart_process(self, pid):
579        """
580        Create new process (assuming this one is dead), then remove the old one
581        """
582        if self._restart_processes is False:
583            return
584        exit = self._process_map[pid]["Process"].exitcode
585        if exit > 0:
586            log.info(
587                "Process %s (%s) died with exit status %s, restarting...",
588                self._process_map[pid]["tgt"],
589                pid,
590                self._process_map[pid]["Process"].exitcode,
591            )
592        else:
593            log.debug(
594                "Process %s (%s) died with exit status %s, restarting...",
595                self._process_map[pid]["tgt"],
596                pid,
597                self._process_map[pid]["Process"].exitcode,
598            )
599        # don't block, the process is already dead
600        self._process_map[pid]["Process"].join(1)
601
602        self.add_process(
603            self._process_map[pid]["tgt"],
604            self._process_map[pid]["args"],
605            self._process_map[pid]["kwargs"],
606        )
607
608        del self._process_map[pid]
609
610    def stop_restarting(self):
611        self._restart_processes = False
612
613    def send_signal_to_processes(self, signal_):
614        if salt.utils.platform.is_windows() and signal_ in (
615            signal.SIGTERM,
616            signal.SIGINT,
617        ):
618            # On Windows, the subprocesses automatically have their signal
619            # handlers invoked. If you send one of these signals while the
620            # signal handler is running, it will kill the process where it
621            # is currently running and the signal handler will not finish.
622            # This will also break the process tree: children of killed
623            # children will become parentless and not findable when trying
624            # to kill the process tree (they don't inherit their parent's
625            # parent). Hence the 'MWorker' processes would be left over if
626            # the 'ReqServer' process is killed this way since 'taskkill'
627            # with the tree option will not be able to find them.
628            return
629
630        for pid in self._process_map.copy().keys():
631            try:
632                os.kill(pid, signal_)
633            except OSError as exc:
634                if exc.errno not in (errno.ESRCH, errno.EACCES):
635                    # If it's not a "No such process" error, raise it
636                    raise
637                # Otherwise, it's a dead process, remove it from the process map
638                del self._process_map[pid]
639
640    @gen.coroutine
641    def run(self, asynchronous=False):
642        """
643        Load and start all available api modules
644        """
645        log.debug("Process Manager starting!")
646        appendproctitle(self.name)
647
648        # make sure to kill the subprocesses if the parent is killed
649        if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL:
650            # There are no SIGTERM handlers installed, install ours
651            signal.signal(signal.SIGTERM, self._handle_signals)
652        if signal.getsignal(signal.SIGINT) is signal.SIG_DFL:
653            # There are no SIGINT handlers installed, install ours
654            signal.signal(signal.SIGINT, self._handle_signals)
655
656        while True:
657            log.trace("Process manager iteration")
658            try:
659                # in case someone died while we were waiting...
660                self.check_children()
661                # The event-based subprocesses management code was removed from here
662                # because os.wait() conflicts with the subprocesses management logic
663                # implemented in `multiprocessing` package. See #35480 for details.
664                if asynchronous:
665                    yield gen.sleep(10)
666                else:
667                    time.sleep(10)
668                if not self._process_map:
669                    break
670            # OSError is raised if a signal handler is called (SIGTERM) during os.wait
671            except OSError:
672                break
673            except OSError as exc:  # pylint: disable=duplicate-except
674                # IOError with errno of EINTR (4) may be raised
675                # when using time.sleep() on Windows.
676                if exc.errno != errno.EINTR:
677                    raise
678                break
679
680    def check_children(self):
681        """
682        Check the children once
683        """
684        if self._restart_processes is True:
685            for pid, mapping in self._process_map.copy().items():
686                if not mapping["Process"].is_alive():
687                    log.trace("Process restart of %s", pid)
688                    self.restart_process(pid)
689
690    def kill_children(self, *args, **kwargs):
691        """
692        Kill all of the children
693        """
694        if salt.utils.platform.is_windows():
695            if multiprocessing.current_process().name != "MainProcess":
696                # Since the main process will kill subprocesses by tree,
697                # no need to do anything in the subprocesses.
698                # Sometimes, when both a subprocess and the main process
699                # call 'taskkill', it will leave a 'taskkill' zombie process.
700                # We want to avoid this.
701                return
702            with salt.utils.files.fopen(os.devnull, "wb") as devnull:
703                for pid, p_map in self._process_map.items():
704                    # On Windows, we need to explicitly terminate sub-processes
705                    # because the processes don't have a sigterm handler.
706                    subprocess.call(
707                        ["taskkill", "/F", "/T", "/PID", str(pid)],
708                        stdout=devnull,
709                        stderr=devnull,
710                    )
711                    p_map["Process"].terminate()
712        else:
713            for pid, p_map in self._process_map.copy().items():
714                log.trace("Terminating pid %s: %s", pid, p_map["Process"])
715                if args:
716                    # escalate the signal to the process
717                    try:
718                        os.kill(pid, args[0])
719                    except OSError:
720                        pass
721                try:
722                    p_map["Process"].terminate()
723                except OSError as exc:
724                    if exc.errno not in (errno.ESRCH, errno.EACCES):
725                        raise
726                if not p_map["Process"].is_alive():
727                    try:
728                        del self._process_map[pid]
729                    except KeyError:
730                        # Race condition
731                        pass
732
733        end_time = time.time() + self.wait_for_kill  # when to die
734
735        log.trace("Waiting to kill process manager children")
736        while self._process_map and time.time() < end_time:
737            for pid, p_map in self._process_map.copy().items():
738                log.trace("Joining pid %s: %s", pid, p_map["Process"])
739                p_map["Process"].join(0)
740
741                if not p_map["Process"].is_alive():
742                    # The process is no longer alive, remove it from the process map dictionary
743                    try:
744                        del self._process_map[pid]
745                    except KeyError:
746                        # This is a race condition if a signal was passed to all children
747                        pass
748
749        # if any managed processes still remain to be handled, let's kill them
750        kill_iterations = 2
751        while kill_iterations >= 0:
752            kill_iterations -= 1
753            for pid, p_map in self._process_map.copy().items():
754                if not p_map["Process"].is_alive():
755                    # The process is no longer alive, remove it from the process map dictionary
756                    try:
757                        del self._process_map[pid]
758                    except KeyError:
759                        # This is a race condition if a signal was passed to all children
760                        pass
761                    continue
762                log.trace("Killing pid %s: %s", pid, p_map["Process"])
763                try:
764                    os.kill(pid, signal.SIGKILL)
765                except OSError as exc:
766                    log.exception(exc)
767                    # in case the process has since decided to die, os.kill returns OSError
768                    if not p_map["Process"].is_alive():
769                        # The process is no longer alive, remove it from the process map dictionary
770                        try:
771                            del self._process_map[pid]
772                        except KeyError:
773                            # This is a race condition if a signal was passed to all children
774                            pass
775
776        if self._process_map:
777            # Some processes disrespected the KILL signal!!!!
778            available_retries = kwargs.get("retry", 3)
779            if available_retries >= 0:
780                log.info(
781                    "Some processes failed to respect the KILL signal: %s",
782                    "; ".join(
783                        "Process: {} (Pid: {})".format(v["Process"], k)
784                        for (  # pylint: disable=str-format-in-logging
785                            k,
786                            v,
787                        ) in self._process_map.items()
788                    ),
789                )
790                log.info("kill_children retries left: %s", available_retries)
791                kwargs["retry"] = available_retries - 1
792                return self.kill_children(*args, **kwargs)
793            else:
794                log.warning(
795                    "Failed to kill the following processes: %s",
796                    "; ".join(
797                        "Process: {} (Pid: {})".format(v["Process"], k)
798                        for (  # pylint: disable=str-format-in-logging
799                            k,
800                            v,
801                        ) in self._process_map.items()
802                    ),
803                )
804                log.warning(
805                    "Salt will either fail to terminate now or leave some "
806                    "zombie processes behind"
807                )
808
809    def terminate(self):
810        """
811        Properly terminate this process manager instance
812        """
813        self.stop_restarting()
814        self.send_signal_to_processes(signal.SIGTERM)
815        self.kill_children()
816
817    def _handle_signals(self, *args, **kwargs):
818        # first lets reset signal handlers to default one to prevent running this twice
819        signal.signal(signal.SIGTERM, signal.SIG_IGN)
820        signal.signal(signal.SIGINT, signal.SIG_IGN)
821
822        self.stop_restarting()
823        self.send_signal_to_processes(signal.SIGTERM)
824
825        # check that this is the correct process, children inherit this
826        # handler, if we are in a child lets just run the original handler
827        if os.getpid() != self._pid:
828            if callable(self._sigterm_handler):
829                return self._sigterm_handler(*args)
830            elif self._sigterm_handler is not None:
831                return signal.default_int_handler(signal.SIGTERM)(*args)
832            else:
833                return
834
835        # Terminate child processes
836        self.kill_children(*args, **kwargs)
837
838
839class Process(multiprocessing.Process):
840    """
841    Salt relies on this custom implementation of :py:class:`~multiprocessing.Process` to
842    simplify/automate some common procedures, for example, logging in the new process is
843    configured for "free" for every new process.
844    This is most important in platforms which default to ``spawn` instead of ``fork`` for
845    new processes.
846
847    This is achieved by some dunder methods in the class:
848
849    * ``__new__``:
850
851        This method ensures that any arguments and/or keyword arguments that are passed to
852        ``__init__`` are captured.
853
854        By having this information captured, we can define ``__setstate__`` and ``__getstate__``
855        to automatically take care of reconstructing the object state on spawned processes.
856
857    * ``__getstate__``:
858
859        This method should return a dictionary which will be used as the ``state`` argument to
860        :py:method:`salt.utils.process.Process.__setstate__`.
861        Usually, when subclassing, this method does not need to be implemented, however,
862        if implemented, `super()` **must** be called.
863
864    * ``__setstate__``:
865
866        This method reconstructs the object on the spawned process.
867        The ``state`` argument is constructed by the
868        :py:method:`salt.utils.process.Process.__getstate__` method.
869        Usually, when subclassing, this method does not need to be implemented, however,
870        if implemented, `super()` **must** be called.
871
872
873    An example of where ``__setstate__`` and ``__getstate__`` needed to be subclassed can be
874    seen in :py:class:`salt.master.MWorker`.
875
876    The gist of it is something like, if there are internal attributes which need to maintain
877    their state on spawned processes, then, subclasses must implement ``__getstate__`` and
878    ``__setstate__`` to ensure that.
879
880
881    For example:
882
883
884    .. code-block:: python
885
886        import salt.utils.process
887
888        class MyCustomProcess(salt.utils.process.Process):
889
890            def __init__(self, opts, **kwargs):
891                super().__init__(**kwargs)
892                self.opts = opts
893
894                # This attribute, counter, should only start at 0 on the initial(parent) process.
895                # Any child processes, need to carry the current value of the counter(instead of
896                # starting at zero).
897                self.counter = 0
898
899            def __getstate__(self):
900                state = super().__getstate__()
901                state.update(
902                    {
903                        "counter": self.counter,
904                    }
905                )
906                return state
907
908            def __setstate__(self, state):
909                super().__setstate__(state)
910                self.counter = state["counter"]
911    """
912
913    def __new__(cls, *args, **kwargs):
914        """
915        This method ensures that any arguments and/or keyword arguments that are passed to
916        ``__init__`` are captured.
917
918        By having this information captured, we can define ``__setstate__`` and ``__getstate__``
919        to automatically take care of object pickling which is required for platforms that
920        spawn processes instead of forking them.
921        """
922        # We implement __new__ because we want to capture the passed in *args and **kwargs
923        # in order to remove the need for each class to implement __getstate__ and __setstate__
924        # which is required on spawning platforms
925        instance = super().__new__(cls)
926        instance._after_fork_methods = []
927        instance._finalize_methods = []
928
929        if salt.utils.platform.spawning_platform():
930            # On spawning platforms, subclasses should call super if they define
931            # __setstate__ and/or __getstate__
932            instance._args_for_getstate = copy.copy(args)
933            instance._kwargs_for_getstate = copy.copy(kwargs)
934        return instance
935
936    def __init__(self, *args, **kwargs):
937        log_queue = kwargs.pop("log_queue", None)
938        log_queue_level = kwargs.pop("log_queue_level", None)
939        super().__init__(*args, **kwargs)
940        self.log_queue = log_queue
941        if self.log_queue is None:
942            self.log_queue = salt.log.setup.get_multiprocessing_logging_queue()
943
944        self.log_queue_level = log_queue_level
945        if self.log_queue_level is None:
946            self.log_queue_level = salt.log.setup.get_multiprocessing_logging_level()
947
948        # Because we need to enforce our after fork and finalize routines,
949        # we must wrap this class run method to allow for these extra steps
950        # to be executed pre and post calling the actual run method,
951        # having subclasses call super would just not work.
952        #
953        # We use setattr here to fool pylint not to complain that we're
954        # overriding run from the subclass here
955        setattr(self, "run", self.__decorate_run(self.run))
956
957    # __setstate__ and __getstate__ are only used on spawning platforms.
958    def __setstate__(self, state):
959        """
960        This method reconstructs the object on the spawned process.
961        The ``state`` argument is constructed by :py:method:`salt.utils.process.Process.__getstate__`.
962
963        Usually, when subclassing, this method does not need to be implemented, however,
964        if implemented, `super()` **must** be called.
965        """
966        args = state["args"]
967        kwargs = state["kwargs"]
968        # This will invoke __init__ of the most derived class.
969        self.__init__(*args, **kwargs)
970        for (function, args, kwargs) in state["after_fork_methods"]:
971            self.register_after_fork_method(function, *args, **kwargs)
972        for (function, args, kwargs) in state["finalize_methods"]:
973            self.register_finalize_method(function, *args, **kwargs)
974
975    def __getstate__(self):
976        """
977        This method should return a dictionary which will be used as the ``state`` argument to
978        :py:method:`salt.utils.process.Process.__setstate__`.
979        Usually, when subclassing, this method does not need to be implemented, however,
980        if implemented, `super()` **must** be called.
981        """
982        args = self._args_for_getstate
983        kwargs = self._kwargs_for_getstate
984        if "log_queue" not in kwargs:
985            kwargs["log_queue"] = self.log_queue
986        if "log_queue_level" not in kwargs:
987            kwargs["log_queue_level"] = self.log_queue_level
988        return {
989            "args": args,
990            "kwargs": kwargs,
991            "after_fork_methods": self._after_fork_methods,
992            "finalize_methods": self._finalize_methods,
993        }
994
995    def __decorate_run(self, run_func):
996        @functools.wraps(run_func)
997        def wrapped_run_func():
998            # Static after fork method, always needs to happen first
999            try:
1000                salt.log.setup.set_multiprocessing_logging_queue(self.log_queue)
1001            except Exception:  # pylint: disable=broad-except
1002                log.exception(
1003                    "Failed to run salt.log.setup.set_multiprocessing_logging_queue() on %s",
1004                    self,
1005                )
1006            try:
1007                salt.log.setup.set_multiprocessing_logging_level(self.log_queue_level)
1008            except Exception:  # pylint: disable=broad-except
1009                log.exception(
1010                    "Failed to run salt.log.setup.set_multiprocessing_logging_level() on %s",
1011                    self,
1012                )
1013            try:
1014                salt.log.setup.setup_multiprocessing_logging(self.log_queue)
1015            except Exception:  # pylint: disable=broad-except
1016                log.exception(
1017                    "Failed to run salt.log.setup.setup_multiprocessing_logging() on %s",
1018                    self,
1019                )
1020
1021            for method, args, kwargs in self._after_fork_methods:
1022                try:
1023                    method(*args, **kwargs)
1024                except Exception:  # pylint: disable=broad-except
1025                    log.exception(
1026                        "Failed to run after fork callback on %s; method=%r; args=%r; and kwargs=%r",
1027                        self,
1028                        method,
1029                        args,
1030                        kwargs,
1031                    )
1032                    continue
1033            try:
1034                return run_func()
1035            except SystemExit:  # pylint: disable=try-except-raise
1036                # These are handled by multiprocessing.Process._bootstrap()
1037                raise
1038            except Exception:  # pylint: disable=broad-except
1039                log.error(
1040                    "An un-handled exception from the multiprocessing process "
1041                    "'%s' was caught:\n",
1042                    self.name,
1043                    exc_info=True,
1044                )
1045                # Re-raise the exception. multiprocessing.Process will write it to
1046                # sys.stderr and set the proper exitcode and we have already logged
1047                # it above.
1048                raise
1049            finally:
1050                try:
1051                    for method, args, kwargs in self._finalize_methods:
1052                        try:
1053                            method(*args, **kwargs)
1054                        except Exception:  # pylint: disable=broad-except
1055                            log.exception(
1056                                "Failed to run finalize callback on %s; method=%r; args=%r; and kwargs=%r",
1057                                self,
1058                                method,
1059                                args,
1060                                kwargs,
1061                            )
1062                            continue
1063                finally:
1064                    # Static finalize method, should always run last
1065                    try:
1066                        salt.log.setup.shutdown_multiprocessing_logging()
1067                    except Exception:  # pylint: disable=broad-except
1068                        log.exception(
1069                            "Failed to run salt.log.setup.shutdown_multiprocessing_logging() on %s",
1070                            self,
1071                        )
1072
1073        return wrapped_run_func
1074
1075    def register_after_fork_method(self, function, *args, **kwargs):
1076        """
1077        Register a function to run after the process has forked
1078        """
1079        after_fork_method_tuple = (function, args, kwargs)
1080        if after_fork_method_tuple not in self._after_fork_methods:
1081            self._after_fork_methods.append(after_fork_method_tuple)
1082
1083    def register_finalize_method(self, function, *args, **kwargs):
1084        """
1085        Register a function to run as process terminates
1086        """
1087        finalize_method_tuple = (function, args, kwargs)
1088        if finalize_method_tuple not in self._finalize_methods:
1089            self._finalize_methods.append(finalize_method_tuple)
1090
1091
1092class MultiprocessingProcess(Process):
1093    """
1094    This class exists for backwards compatibility and to properly deprecate it.
1095    """
1096
1097    def __init__(self, *args, **kwargs):
1098        salt.utils.versions.warn_until_date(
1099            "20220101",
1100            "Please stop using '{name}.MultiprocessingProcess' and instead use "
1101            "'{name}.Process'. '{name}.MultiprocessingProcess' will go away "
1102            "after {{date}}.".format(name=__name__),
1103            stacklevel=3,
1104        )
1105        super().__init__(*args, **kwargs)
1106
1107
1108class SignalHandlingProcess(Process):
1109    def __init__(self, *args, **kwargs):
1110        super().__init__(*args, **kwargs)
1111        self._signal_handled = multiprocessing.Event()
1112        self.register_after_fork_method(SignalHandlingProcess._setup_signals, self)
1113
1114    def signal_handled(self):
1115        return self._signal_handled.is_set()
1116
1117    def _setup_signals(self):
1118        signal.signal(signal.SIGINT, self._handle_signals)
1119        signal.signal(signal.SIGTERM, self._handle_signals)
1120
1121    def _handle_signals(self, signum, sigframe):
1122        self._signal_handled.set()
1123        signal.signal(signal.SIGTERM, signal.SIG_IGN)
1124        signal.signal(signal.SIGINT, signal.SIG_IGN)
1125        msg = "{} received a ".format(self.__class__.__name__)
1126        if signum == signal.SIGINT:
1127            msg += "SIGINT"
1128        elif signum == signal.SIGTERM:
1129            msg += "SIGTERM"
1130        msg += ". Exiting"
1131        log.debug(msg)
1132        if HAS_PSUTIL:
1133            try:
1134                process = psutil.Process(os.getpid())
1135                if hasattr(process, "children"):
1136                    for child in process.children(recursive=True):
1137                        try:
1138                            if child.is_running():
1139                                child.terminate()
1140                        except psutil.NoSuchProcess:
1141                            log.warning(
1142                                "Unable to kill child of process %d, it does "
1143                                "not exist. My pid is %d",
1144                                self.pid,
1145                                os.getpid(),
1146                            )
1147            except psutil.NoSuchProcess:
1148                log.warning(
1149                    "Unable to kill children of process %d, it does not exist."
1150                    "My pid is %d",
1151                    self.pid,
1152                    os.getpid(),
1153                )
1154        sys.exit(salt.defaults.exitcodes.EX_OK)
1155
1156    def start(self):
1157        with default_signals(signal.SIGINT, signal.SIGTERM):
1158            super().start()
1159
1160
1161class SignalHandlingMultiprocessingProcess(SignalHandlingProcess):
1162    """
1163    This class exists for backwards compatibility and to properly deprecate it.
1164    """
1165
1166    def __init__(self, *args, **kwargs):
1167        salt.utils.versions.warn_until_date(
1168            "20220101",
1169            "Please stop using '{name}.SignalHandlingMultiprocessingProcess' and"
1170            " instead use '{name}.SignalHandlingProcess'."
1171            " '{name}.SignalHandlingMultiprocessingProcess' will go away after"
1172            " {{date}}.".format(name=__name__),
1173            stacklevel=3,
1174        )
1175        super().__init__(*args, **kwargs)
1176
1177
1178@contextlib.contextmanager
1179def default_signals(*signals):
1180    old_signals = {}
1181    for signum in signals:
1182        try:
1183            saved_signal = signal.getsignal(signum)
1184            signal.signal(signum, signal.SIG_DFL)
1185        except ValueError as exc:
1186            # This happens when a netapi module attempts to run a function
1187            # using wheel_async, because the process trying to register signals
1188            # will not be the main PID.
1189            log.trace("Failed to register signal for signum %d: %s", signum, exc)
1190        else:
1191            old_signals[signum] = saved_signal
1192
1193    try:
1194        # Do whatever is needed with the reset signals
1195        yield
1196    finally:
1197        # Restore signals
1198        for signum in old_signals:
1199            signal.signal(signum, old_signals[signum])
1200
1201        del old_signals
1202
1203
1204class SubprocessList:
1205    def __init__(self, processes=None, lock=None):
1206        if processes is None:
1207            self.processes = []
1208        else:
1209            self.processes = processes
1210        if lock is None:
1211            self.lock = multiprocessing.Lock()
1212        else:
1213            self.lock = lock
1214        self.count = 0
1215
1216    def add(self, proc):
1217        with self.lock:
1218            self.processes.append(proc)
1219            log.debug("Subprocess %s added", proc.name)
1220            self.count += 1
1221
1222    def cleanup(self):
1223        with self.lock:
1224            for proc in self.processes:
1225                if proc.is_alive():
1226                    continue
1227                proc.join()
1228                self.processes.remove(proc)
1229                self.count -= 1
1230                log.debug("Subprocess %s cleaned up", proc.name)
1231