1# See doc/topics/jobs/index.rst
2"""
3Scheduling routines are located here. To activate the scheduler make the
4``schedule`` option available to the master or minion configurations (master
5config file or for the minion via config or pillar).
6
7Detailed tutorial about scheduling jobs can be found :ref:`here
8<scheduling-jobs>`.
9"""
10
11
12import copy
13import datetime
14import errno
15import itertools
16import logging
17import os
18import random
19import signal
20import sys
21import threading
22import time
23import weakref
24
25import salt.config
26import salt.defaults.exitcodes
27import salt.exceptions
28import salt.loader
29import salt.minion
30import salt.payload
31import salt.syspaths
32import salt.utils.args
33import salt.utils.error
34import salt.utils.event
35import salt.utils.files
36import salt.utils.jid
37import salt.utils.master
38import salt.utils.minion
39import salt.utils.platform
40import salt.utils.process
41import salt.utils.stringutils
42import salt.utils.user
43import salt.utils.yaml
44from salt.exceptions import SaltInvocationError
45from salt.utils.odict import OrderedDict
46
47# pylint: disable=import-error
48try:
49    import dateutil.parser as dateutil_parser
50
51    _WHEN_SUPPORTED = True
52    _RANGE_SUPPORTED = True
53except ImportError:
54    _WHEN_SUPPORTED = False
55    _RANGE_SUPPORTED = False
56
57try:
58    import croniter
59
60    _CRON_SUPPORTED = True
61except ImportError:
62    _CRON_SUPPORTED = False
63# pylint: enable=import-error
64
65log = logging.getLogger(__name__)
66
67
68class Schedule:
69    """
70    Create a Schedule object, pass in the opts and the functions dict to use
71    """
72
73    instance = None
74
75    def __new__(
76        cls,
77        opts,
78        functions,
79        returners=None,
80        intervals=None,
81        cleanup=None,
82        proxy=None,
83        standalone=False,
84        new_instance=False,
85        utils=None,
86        _subprocess_list=None,
87    ):
88        """
89        Only create one instance of Schedule
90        """
91        if cls.instance is None or new_instance is True:
92            log.debug("Initializing new Schedule")
93            # we need to make a local variable for this, as we are going to store
94            # it in a WeakValueDictionary-- which will remove the item if no one
95            # references it-- this forces a reference while we return to the caller
96            instance = object.__new__(cls)
97            instance.__singleton_init__(
98                opts,
99                functions,
100                returners=returners,
101                intervals=intervals,
102                cleanup=cleanup,
103                proxy=proxy,
104                standalone=standalone,
105                utils=utils,
106                _subprocess_list=_subprocess_list,
107            )
108            if new_instance is True:
109                return instance
110            cls.instance = instance
111        else:
112            log.debug("Re-using Schedule")
113        return cls.instance
114
115    # has to remain empty for singletons, since __init__ will *always* be called
116    def __init__(
117        self,
118        opts,
119        functions,
120        returners=None,
121        intervals=None,
122        cleanup=None,
123        proxy=None,
124        standalone=False,
125        new_instance=False,
126        utils=None,
127        _subprocess_list=None,
128    ):
129        pass
130
131    # an init for the singleton instance to call
132    def __singleton_init__(
133        self,
134        opts,
135        functions,
136        returners=None,
137        intervals=None,
138        cleanup=None,
139        proxy=None,
140        standalone=False,
141        utils=None,
142        _subprocess_list=None,
143    ):
144        self.opts = opts
145        self.proxy = proxy
146        self.functions = functions
147        self.utils = utils or salt.loader.utils(opts)
148        self.standalone = standalone
149        self.skip_function = None
150        self.skip_during_range = None
151        self.splay = None
152        self.enabled = True
153        if isinstance(intervals, dict):
154            self.intervals = intervals
155        else:
156            self.intervals = {}
157        if not self.standalone:
158            if hasattr(returners, "__getitem__"):
159                self.returners = returners
160            else:
161                self.returners = returners.loader.gen_functions()
162        try:
163            self.time_offset = self.functions.get(
164                "timezone.get_offset", lambda: "0000"
165            )()
166        except Exception:  # pylint: disable=W0703
167            # get_offset can fail, if that happens, default to 0000
168            log.warning(
169                "Unable to obtain correct timezone offset, defaulting to 0000",
170                exc_info_on_loglevel=logging.DEBUG,
171            )
172            self.time_offset = "0000"
173
174        self.schedule_returner = self.option("schedule_returner")
175        # Keep track of the lowest loop interval needed in this variable
176        self.loop_interval = sys.maxsize
177        if not self.standalone:
178            clean_proc_dir(opts)
179        if cleanup:
180            for prefix in cleanup:
181                self.delete_job_prefix(prefix)
182        if _subprocess_list is None:
183            self._subprocess_list = salt.utils.process.SubprocessList()
184        else:
185            self._subprocess_list = _subprocess_list
186
187    def __getnewargs__(self):
188        return self.opts, self.functions, self.returners, self.intervals, None
189
190    def option(self, opt):
191        """
192        Return options merged from config and pillar
193        """
194        if "config.merge" in self.functions:
195            return self.functions["config.merge"](opt, {}, omit_master=True)
196        return self.opts.get(opt, {})
197
198    def _get_schedule(
199        self, include_opts=True, include_pillar=True, remove_hidden=False
200    ):
201        """
202        Return the schedule data structure
203        """
204        schedule = {}
205        if include_pillar:
206            pillar_schedule = self.opts.get("pillar", {}).get("schedule", {})
207            if not isinstance(pillar_schedule, dict):
208                raise ValueError("Schedule must be of type dict.")
209            schedule.update(pillar_schedule)
210        if include_opts:
211            opts_schedule = self.opts.get("schedule", {})
212            if not isinstance(opts_schedule, dict):
213                raise ValueError("Schedule must be of type dict.")
214            schedule.update(opts_schedule)
215
216        if remove_hidden:
217            _schedule = copy.deepcopy(schedule)
218            for job in _schedule:
219                if isinstance(_schedule[job], dict):
220                    for item in _schedule[job]:
221                        if item.startswith("_"):
222                            del schedule[job][item]
223        return schedule
224
225    def _check_max_running(self, func, data, opts, now):
226        """
227        Return the schedule data structure
228        """
229        # Check to see if there are other jobs with this
230        # signature running.  If there are more than maxrunning
231        # jobs present then don't start another.
232        # If jid_include is False for this job we can ignore all this
233        # NOTE--jid_include defaults to True, thus if it is missing from the data
234        # dict we treat it like it was there and is True
235
236        # Check if we're able to run
237        if "run" not in data or not data["run"]:
238            return data
239        if "jid_include" not in data or data["jid_include"]:
240            jobcount = 0
241            if self.opts["__role"] == "master":
242                current_jobs = salt.utils.master.get_running_jobs(self.opts)
243            else:
244                current_jobs = salt.utils.minion.running(self.opts)
245            for job in current_jobs:
246                if "schedule" in job:
247                    log.debug(
248                        "schedule.handle_func: Checking job against fun %s: %s",
249                        func,
250                        job,
251                    )
252                    if data["name"] == job[
253                        "schedule"
254                    ] and salt.utils.process.os_is_running(job["pid"]):
255                        jobcount += 1
256                        log.debug(
257                            "schedule.handle_func: Incrementing jobcount, "
258                            "now %s, maxrunning is %s",
259                            jobcount,
260                            data["maxrunning"],
261                        )
262                        if jobcount >= data["maxrunning"]:
263                            log.debug(
264                                "schedule.handle_func: The scheduled job "
265                                "%s was not started, %s already running",
266                                data["name"],
267                                data["maxrunning"],
268                            )
269                            data["_skip_reason"] = "maxrunning"
270                            data["_skipped"] = True
271                            data["_skipped_time"] = now
272                            data["run"] = False
273                            return data
274        return data
275
276    def persist(self):
277        """
278        Persist the modified schedule into <<configdir>>/<<default_include>>/_schedule.conf
279        """
280        config_dir = self.opts.get("conf_dir", None)
281        if config_dir is None and "conf_file" in self.opts:
282            config_dir = os.path.dirname(self.opts["conf_file"])
283        if config_dir is None:
284            config_dir = salt.syspaths.CONFIG_DIR
285
286        minion_d_dir = os.path.join(
287            config_dir,
288            os.path.dirname(
289                self.opts.get(
290                    "default_include",
291                    salt.config.DEFAULT_MINION_OPTS["default_include"],
292                )
293            ),
294        )
295
296        if not os.path.isdir(minion_d_dir):
297            os.makedirs(minion_d_dir)
298
299        schedule_conf = os.path.join(minion_d_dir, "_schedule.conf")
300        log.debug("Persisting schedule")
301        schedule_data = self._get_schedule(include_pillar=False, remove_hidden=True)
302        try:
303            with salt.utils.files.fopen(schedule_conf, "wb+") as fp_:
304                fp_.write(
305                    salt.utils.stringutils.to_bytes(
306                        salt.utils.yaml.safe_dump({"schedule": schedule_data})
307                    )
308                )
309        except OSError:
310            log.error(
311                "Failed to persist the updated schedule",
312                exc_info_on_loglevel=logging.DEBUG,
313            )
314
315    def delete_job(self, name, persist=True):
316        """
317        Deletes a job from the scheduler. Ignore jobs from pillar
318        """
319        # ensure job exists, then delete it
320        if name in self.opts["schedule"]:
321            del self.opts["schedule"][name]
322        elif name in self._get_schedule(include_opts=False):
323            log.warning("Cannot delete job %s, it's in the pillar!", name)
324
325        # Fire the complete event back along with updated list of schedule
326        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
327            evt.fire_event(
328                {"complete": True, "schedule": self._get_schedule()},
329                tag="/salt/minion/minion_schedule_delete_complete",
330            )
331
332        # remove from self.intervals
333        if name in self.intervals:
334            del self.intervals[name]
335
336        if persist:
337            self.persist()
338
339    def reset(self):
340        """
341        Reset the scheduler to defaults
342        """
343        self.skip_function = None
344        self.skip_during_range = None
345        self.enabled = True
346        self.splay = None
347        self.opts["schedule"] = {}
348
349    def delete_job_prefix(self, name, persist=True):
350        """
351        Deletes a job from the scheduler. Ignores jobs from pillar
352        """
353        # ensure job exists, then delete it
354        for job in list(self.opts["schedule"].keys()):
355            if job.startswith(name):
356                del self.opts["schedule"][job]
357        for job in self._get_schedule(include_opts=False):
358            if job.startswith(name):
359                log.warning("Cannot delete job %s, it's in the pillar!", job)
360
361        # Fire the complete event back along with updated list of schedule
362        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
363            evt.fire_event(
364                {"complete": True, "schedule": self._get_schedule()},
365                tag="/salt/minion/minion_schedule_delete_complete",
366            )
367
368        # remove from self.intervals
369        for job in list(self.intervals.keys()):
370            if job.startswith(name):
371                del self.intervals[job]
372
373        if persist:
374            self.persist()
375
376    def add_job(self, data, persist=True):
377        """
378        Adds a new job to the scheduler. The format is the same as required in
379        the configuration file. See the docs on how YAML is interpreted into
380        python data-structures to make sure, you pass correct dictionaries.
381        """
382
383        # we don't do any checking here besides making sure its a dict.
384        # eval() already does for us and raises errors accordingly
385        if not isinstance(data, dict):
386            raise ValueError("Scheduled jobs have to be of type dict.")
387        if not len(data) == 1:
388            raise ValueError("You can only schedule one new job at a time.")
389
390        # if enabled is not included in the job,
391        # assume job is enabled.
392        for job in data:
393            if "enabled" not in data[job]:
394                data[job]["enabled"] = True
395
396        new_job = next(iter(data.keys()))
397
398        if new_job in self._get_schedule(include_opts=False):
399            log.warning("Cannot update job %s, it's in the pillar!", new_job)
400
401        elif new_job in self.opts["schedule"]:
402            log.info("Updating job settings for scheduled job: %s", new_job)
403            self.opts["schedule"].update(data)
404
405        else:
406            log.info("Added new job %s to scheduler", new_job)
407            self.opts["schedule"].update(data)
408
409        # Fire the complete event back along with updated list of schedule
410        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
411            evt.fire_event(
412                {"complete": True, "schedule": self._get_schedule()},
413                tag="/salt/minion/minion_schedule_add_complete",
414            )
415
416        if persist:
417            self.persist()
418
419    def enable_job(self, name, persist=True):
420        """
421        Enable a job in the scheduler. Ignores jobs from pillar
422        """
423        # ensure job exists, then enable it
424        if name in self.opts["schedule"]:
425            self.opts["schedule"][name]["enabled"] = True
426            log.info("Enabling job %s in scheduler", name)
427        elif name in self._get_schedule(include_opts=False):
428            log.warning("Cannot modify job %s, it's in the pillar!", name)
429
430        # Fire the complete event back along with updated list of schedule
431        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
432            evt.fire_event(
433                {"complete": True, "schedule": self._get_schedule()},
434                tag="/salt/minion/minion_schedule_enabled_job_complete",
435            )
436
437        if persist:
438            self.persist()
439
440    def disable_job(self, name, persist=True):
441        """
442        Disable a job in the scheduler. Ignores jobs from pillar
443        """
444        # ensure job exists, then disable it
445        if name in self.opts["schedule"]:
446            self.opts["schedule"][name]["enabled"] = False
447            log.info("Disabling job %s in scheduler", name)
448        elif name in self._get_schedule(include_opts=False):
449            log.warning("Cannot modify job %s, it's in the pillar!", name)
450
451        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
452            # Fire the complete event back along with updated list of schedule
453            evt.fire_event(
454                {"complete": True, "schedule": self._get_schedule()},
455                tag="/salt/minion/minion_schedule_disabled_job_complete",
456            )
457
458        if persist:
459            self.persist()
460
461    def modify_job(self, name, schedule, persist=True):
462        """
463        Modify a job in the scheduler. Ignores jobs from pillar
464        """
465        # ensure job exists, then replace it
466        if name in self.opts["schedule"]:
467            self.delete_job(name, persist)
468        elif name in self._get_schedule(include_opts=False):
469            log.warning("Cannot modify job %s, it's in the pillar!", name)
470            return
471
472        self.opts["schedule"][name] = schedule
473
474        if persist:
475            self.persist()
476
477    def run_job(self, name):
478        """
479        Run a schedule job now
480        """
481        data = self._get_schedule().get(name, {})
482
483        if "function" in data:
484            func = data["function"]
485        elif "func" in data:
486            func = data["func"]
487        elif "fun" in data:
488            func = data["fun"]
489        else:
490            func = None
491        if func not in self.functions:
492            log.info("Invalid function: %s in scheduled job %s.", func, name)
493
494        if "name" not in data:
495            data["name"] = name
496
497        # Assume run should be True until we check max_running
498        if "run" not in data:
499            data["run"] = True
500
501        if not self.standalone:
502            data = self._check_max_running(
503                func, data, self.opts, datetime.datetime.now()
504            )
505
506        # Grab run, assume True
507        if data.get("run"):
508            log.info("Running Job: %s", name)
509            self._run_job(func, data)
510
511    def enable_schedule(self, persist=True):
512        """
513        Enable the scheduler.
514        """
515        self.opts["schedule"]["enabled"] = True
516
517        # Fire the complete event back along with updated list of schedule
518        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
519            evt.fire_event(
520                {"complete": True, "schedule": self._get_schedule()},
521                tag="/salt/minion/minion_schedule_enabled_complete",
522            )
523
524        if persist:
525            self.persist()
526
527    def disable_schedule(self, persist=True):
528        """
529        Disable the scheduler.
530        """
531        self.opts["schedule"]["enabled"] = False
532
533        # Fire the complete event back along with updated list of schedule
534        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
535            evt.fire_event(
536                {"complete": True, "schedule": self._get_schedule()},
537                tag="/salt/minion/minion_schedule_disabled_complete",
538            )
539
540        if persist:
541            self.persist()
542
543    def reload(self, schedule):
544        """
545        Reload the schedule from saved schedule file.
546        """
547        # Remove all jobs from self.intervals
548        self.intervals = {}
549
550        if "schedule" in schedule:
551            schedule = schedule["schedule"]
552        self.opts.setdefault("schedule", {}).update(schedule)
553
554    def list(self, where):
555        """
556        List the current schedule items
557        """
558        if where == "pillar":
559            schedule = self._get_schedule(include_opts=False)
560        elif where == "opts":
561            schedule = self._get_schedule(include_pillar=False)
562        else:
563            schedule = self._get_schedule()
564
565        # Fire the complete event back along with the list of schedule
566        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
567            evt.fire_event(
568                {"complete": True, "schedule": schedule},
569                tag="/salt/minion/minion_schedule_list_complete",
570            )
571
572    def save_schedule(self):
573        """
574        Save the current schedule
575        """
576        self.persist()
577
578        # Fire the complete event back along with the list of schedule
579        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
580            evt.fire_event({"complete": True}, tag="/salt/minion/minion_schedule_saved")
581
582    def postpone_job(self, name, data):
583        """
584        Postpone a job in the scheduler.
585        Ignores jobs from pillar
586        """
587        time = data["time"]
588        new_time = data["new_time"]
589        time_fmt = data.get("time_fmt", "%Y-%m-%dT%H:%M:%S")
590
591        # ensure job exists, then disable it
592        if name in self.opts["schedule"]:
593            if "skip_explicit" not in self.opts["schedule"][name]:
594                self.opts["schedule"][name]["skip_explicit"] = []
595            self.opts["schedule"][name]["skip_explicit"].append(
596                {"time": time, "time_fmt": time_fmt}
597            )
598
599            if "run_explicit" not in self.opts["schedule"][name]:
600                self.opts["schedule"][name]["run_explicit"] = []
601            self.opts["schedule"][name]["run_explicit"].append(
602                {"time": new_time, "time_fmt": time_fmt}
603            )
604
605        elif name in self._get_schedule(include_opts=False):
606            log.warning("Cannot modify job %s, it's in the pillar!", name)
607
608        # Fire the complete event back along with updated list of schedule
609        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
610            evt.fire_event(
611                {"complete": True, "schedule": self._get_schedule()},
612                tag="/salt/minion/minion_schedule_postpone_job_complete",
613            )
614
615    def skip_job(self, name, data):
616        """
617        Skip a job at a specific time in the scheduler.
618        Ignores jobs from pillar
619        """
620        time = data["time"]
621        time_fmt = data.get("time_fmt", "%Y-%m-%dT%H:%M:%S")
622
623        # ensure job exists, then disable it
624        if name in self.opts["schedule"]:
625            if "skip_explicit" not in self.opts["schedule"][name]:
626                self.opts["schedule"][name]["skip_explicit"] = []
627            self.opts["schedule"][name]["skip_explicit"].append(
628                {"time": time, "time_fmt": time_fmt}
629            )
630
631        elif name in self._get_schedule(include_opts=False):
632            log.warning("Cannot modify job %s, it's in the pillar!", name)
633
634        # Fire the complete event back along with updated list of schedule
635        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
636            evt.fire_event(
637                {"complete": True, "schedule": self._get_schedule()},
638                tag="/salt/minion/minion_schedule_skip_job_complete",
639            )
640
641    def get_next_fire_time(self, name, fmt="%Y-%m-%dT%H:%M:%S"):
642        """
643        Return the next fire time for the specified job
644        """
645
646        schedule = self._get_schedule()
647        _next_fire_time = None
648        if schedule:
649            _next_fire_time = schedule.get(name, {}).get("_next_fire_time", None)
650            if _next_fire_time:
651                _next_fire_time = _next_fire_time.strftime(fmt)
652
653        # Fire the complete event back along with updated list of schedule
654        with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
655            evt.fire_event(
656                {"complete": True, "next_fire_time": _next_fire_time},
657                tag="/salt/minion/minion_schedule_next_fire_time_complete",
658            )
659
660    def job_status(self, name, fire_event=False):
661        """
662        Return the specified schedule item
663        """
664
665        if fire_event:
666            schedule = self._get_schedule()
667            data = schedule.get(name, {})
668
669            # Fire the complete event back along with updated list of schedule
670            with salt.utils.event.get_event(
671                "minion", opts=self.opts, listen=False
672            ) as evt:
673                evt.fire_event(
674                    {"complete": True, "data": data},
675                    tag="/salt/minion/minion_schedule_job_status_complete",
676                )
677        else:
678            schedule = self._get_schedule()
679            return schedule.get(name, {})
680
681    def handle_func(self, multiprocessing_enabled, func, data, jid=None):
682        """
683        Execute this method in a multiprocess or thread
684        """
685        if salt.utils.platform.is_windows() or self.opts.get("transport") == "zeromq":
686            # Since function references can't be pickled and pickling
687            # is required when spawning new processes on Windows, regenerate
688            # the functions and returners.
689            # This also needed for ZeroMQ transport to reset all functions
690            # context data that could keep paretns connections. ZeroMQ will
691            # hang on polling parents connections from the child process.
692            self.utils = salt.loader.utils(self.opts)
693            if self.opts["__role"] == "master":
694                self.functions = salt.loader.runner(self.opts, utils=self.utils)
695            else:
696                self.functions = salt.loader.minion_mods(
697                    self.opts, proxy=self.proxy, utils=self.utils
698                )
699            self.returners = salt.loader.returners(
700                self.opts, self.functions, proxy=self.proxy
701            )
702        if jid is None:
703            jid = salt.utils.jid.gen_jid(self.opts)
704        ret = {
705            "id": self.opts.get("id", "master"),
706            "fun": func,
707            "fun_args": [],
708            "schedule": data["name"],
709            "jid": jid,
710        }
711
712        if "metadata" in data:
713            if isinstance(data["metadata"], dict):
714                ret["metadata"] = data["metadata"]
715                ret["metadata"]["_TOS"] = self.time_offset
716                ret["metadata"]["_TS"] = time.ctime()
717                ret["metadata"]["_TT"] = time.strftime(
718                    "%Y %B %d %a %H %m", time.gmtime()
719                )
720            else:
721                log.warning(
722                    "schedule: The metadata parameter must be "
723                    "specified as a dictionary.  Ignoring."
724                )
725
726        if multiprocessing_enabled:
727            # We just want to modify the process name if we're on a different process
728            salt.utils.process.appendproctitle(
729                "{} {}".format(self.__class__.__name__, ret["jid"])
730            )
731        data_returner = data.get("returner", None)
732
733        if not self.standalone:
734            proc_fn = os.path.join(
735                salt.minion.get_proc_dir(self.opts["cachedir"]), ret["jid"]
736            )
737
738        # TODO: Make it readable! Splt to funcs, remove nested try-except-finally sections.
739        try:
740
741            minion_blackout_violation = False
742            if self.opts.get("pillar", {}).get("minion_blackout", False):
743                whitelist = self.opts.get("pillar", {}).get(
744                    "minion_blackout_whitelist", []
745                )
746                # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
747                if func != "saltutil.refresh_pillar" and func not in whitelist:
748                    minion_blackout_violation = True
749            elif self.opts.get("grains", {}).get("minion_blackout", False):
750                whitelist = self.opts.get("grains", {}).get(
751                    "minion_blackout_whitelist", []
752                )
753                if func != "saltutil.refresh_pillar" and func not in whitelist:
754                    minion_blackout_violation = True
755            if minion_blackout_violation:
756                raise SaltInvocationError(
757                    "Minion in blackout mode. Set 'minion_blackout' "
758                    "to False in pillar or grains to resume operations. Only "
759                    "saltutil.refresh_pillar allowed in blackout mode."
760                )
761
762            ret["pid"] = os.getpid()
763
764            args = tuple()
765            if "args" in data:
766                args = copy.deepcopy(data["args"])
767                ret["fun_args"].extend(data["args"])
768
769            kwargs = {}
770            if "kwargs" in data:
771                kwargs = copy.deepcopy(data["kwargs"])
772                ret["fun_args"].append(copy.deepcopy(kwargs))
773
774            if func not in self.functions:
775                ret["return"] = self.functions.missing_fun_string(func)
776                salt.utils.error.raise_error(
777                    message=self.functions.missing_fun_string(func)
778                )
779
780            if not self.standalone:
781                if "jid_include" not in data or data["jid_include"]:
782                    log.debug(
783                        "schedule.handle_func: adding this job to the "
784                        "jobcache with data %s",
785                        ret,
786                    )
787                    # write this to /var/cache/salt/minion/proc
788                    with salt.utils.files.fopen(proc_fn, "w+b") as fp_:
789                        fp_.write(salt.payload.dumps(ret))
790
791            # if the func support **kwargs, lets pack in the pub data we have
792            # TODO: pack the *same* pub data as a minion?
793            argspec = salt.utils.args.get_function_argspec(self.functions[func])
794            if argspec.keywords:
795                # this function accepts **kwargs, pack in the publish data
796                for key, val in ret.items():
797                    if key != "kwargs":
798                        kwargs["__pub_{}".format(key)] = copy.deepcopy(val)
799
800            # Only include these when running runner modules
801            if self.opts["__role"] == "master":
802                jid = salt.utils.jid.gen_jid(self.opts)
803                tag = salt.utils.event.tagify(jid, prefix="salt/scheduler/")
804
805                namespaced_event = salt.utils.event.NamespacedEvent(
806                    salt.utils.event.get_event(
807                        self.opts["__role"],
808                        self.opts["sock_dir"],
809                        self.opts["transport"],
810                        opts=self.opts,
811                        listen=False,
812                    ),
813                    tag,
814                    print_func=None,
815                )
816
817                func_globals = {
818                    "__jid__": jid,
819                    "__user__": salt.utils.user.get_user(),
820                    "__tag__": tag,
821                    "__jid_event__": weakref.proxy(namespaced_event),
822                }
823                self_functions = copy.copy(self.functions)
824                salt.utils.lazy.verify_fun(self_functions, func)
825
826                # Inject some useful globals to *all* the function's global
827                # namespace only once per module-- not per func
828                completed_funcs = []
829
830                for mod_name in self_functions.keys():
831                    if "." not in mod_name:
832                        continue
833                    mod, _ = mod_name.split(".", 1)
834                    if mod in completed_funcs:
835                        continue
836                    completed_funcs.append(mod)
837                    for global_key, value in func_globals.items():
838                        self.functions[mod_name].__globals__[global_key] = value
839
840            self.functions.pack["__context__"]["retcode"] = 0
841
842            ret["return"] = self.functions[func](*args, **kwargs)
843
844            if not self.standalone:
845                # runners do not provide retcode
846                if "retcode" in self.functions.pack["__context__"]:
847                    ret["retcode"] = self.functions.pack["__context__"]["retcode"]
848
849                ret["success"] = True
850
851                if data_returner or self.schedule_returner:
852                    if "return_config" in data:
853                        ret["ret_config"] = data["return_config"]
854                    if "return_kwargs" in data:
855                        ret["ret_kwargs"] = data["return_kwargs"]
856                    rets = []
857                    for returner in [data_returner, self.schedule_returner]:
858                        if isinstance(returner, str):
859                            rets.append(returner)
860                        elif isinstance(returner, list):
861                            rets.extend(returner)
862                    # simple de-duplication with order retained
863                    for returner in OrderedDict.fromkeys(rets):
864                        ret_str = "{}.returner".format(returner)
865                        if ret_str in self.returners:
866                            self.returners[ret_str](ret)
867                        else:
868                            log.info(
869                                "Job %s using invalid returner: %s. Ignoring.",
870                                func,
871                                returner,
872                            )
873
874        except Exception:  # pylint: disable=broad-except
875            log.exception("Unhandled exception running %s", ret["fun"])
876            # Although catch-all exception handlers are bad, the exception here
877            # is to let the exception bubble up to the top of the thread context,
878            # where the thread will die silently, which is worse.
879            if "return" not in ret:
880                ret["return"] = "Unhandled exception running {}".format(ret["fun"])
881            ret["success"] = False
882            ret["retcode"] = 254
883        finally:
884            # Only attempt to return data to the master if the scheduled job is running
885            # on a master itself or a minion.
886            if "__role" in self.opts and self.opts["__role"] in ("master", "minion"):
887                # The 'return_job' option is enabled by default even if not set
888                if "return_job" in data and not data["return_job"]:
889                    pass
890                else:
891                    # Send back to master so the job is included in the job list
892                    mret = ret.copy()
893                    # No returners defined, so we're only sending back to the master
894                    if not data_returner and not self.schedule_returner:
895                        mret["jid"] = "req"
896                        if data.get("return_job") == "nocache":
897                            # overwrite 'req' to signal to master that
898                            # this job shouldn't be stored
899                            mret["jid"] = "nocache"
900                    load = {"cmd": "_return", "id": self.opts["id"]}
901                    for key, value in mret.items():
902                        load[key] = value
903
904                    if "__role" in self.opts and self.opts["__role"] == "minion":
905                        event = salt.utils.event.get_event(
906                            "minion", opts=self.opts, listen=False
907                        )
908                    elif "__role" in self.opts and self.opts["__role"] == "master":
909                        event = salt.utils.event.get_master_event(
910                            self.opts, self.opts["sock_dir"]
911                        )
912                    try:
913                        event.fire_event(load, "__schedule_return")
914                    except Exception as exc:  # pylint: disable=broad-except
915                        log.exception(
916                            "Unhandled exception firing __schedule_return event"
917                        )
918                    finally:
919                        event.destroy()
920
921            if self.opts["__role"] == "master":
922                namespaced_event.destroy()
923
924            if not self.standalone:
925                log.debug("schedule.handle_func: Removing %s", proc_fn)
926
927                try:
928                    os.unlink(proc_fn)
929                except OSError as exc:
930                    if exc.errno == errno.EEXIST or exc.errno == errno.ENOENT:
931                        # EEXIST and ENOENT are OK because the file is gone and that's what
932                        # we wanted
933                        pass
934                    else:
935                        log.error("Failed to delete '%s': %s", proc_fn, exc.errno)
936                        # Otherwise, failing to delete this file is not something
937                        # we can cleanly handle.
938                        raise
939                finally:
940                    if multiprocessing_enabled:
941                        # Let's make sure we exit the process!
942                        sys.exit(salt.defaults.exitcodes.EX_GENERIC)
943
944    def eval(self, now=None):
945        """
946        Evaluate and execute the schedule
947
948        :param datetime now: Override current time with a datetime object instance``
949
950        """
951
952        log.trace("==== evaluating schedule now %s =====", now)
953
954        jids = []
955        loop_interval = self.opts["loop_interval"]
956        if not isinstance(loop_interval, datetime.timedelta):
957            loop_interval = datetime.timedelta(seconds=loop_interval)
958
959        def _splay(splaytime):
960            """
961            Calculate splaytime
962            """
963            splay_ = None
964            if isinstance(splaytime, dict):
965                if splaytime["end"] >= splaytime["start"]:
966                    splay_ = random.randint(splaytime["start"], splaytime["end"])
967                else:
968                    log.error(
969                        "schedule.handle_func: Invalid Splay, "
970                        "end must be larger than start. Ignoring splay."
971                    )
972            else:
973                splay_ = random.randint(1, splaytime)
974            return splay_
975
976        def _handle_time_elements(data):
977            """
978            Handle schedule item with time elements
979            seconds, minutes, hours, days
980            """
981            if "_seconds" not in data:
982                interval = int(data.get("seconds", 0))
983                interval += int(data.get("minutes", 0)) * 60
984                interval += int(data.get("hours", 0)) * 3600
985                interval += int(data.get("days", 0)) * 86400
986
987                data["_seconds"] = interval
988
989                if not data["_next_fire_time"]:
990                    data["_next_fire_time"] = now + datetime.timedelta(
991                        seconds=data["_seconds"]
992                    )
993
994                if interval < self.loop_interval:
995                    self.loop_interval = interval
996
997                data["_next_scheduled_fire_time"] = now + datetime.timedelta(
998                    seconds=data["_seconds"]
999                )
1000
1001        def _handle_once(data, loop_interval):
1002            """
1003            Handle schedule item with once
1004            """
1005            if data["_next_fire_time"]:
1006                if (
1007                    data["_next_fire_time"] < now - loop_interval
1008                    or data["_next_fire_time"] > now
1009                    and not data["_splay"]
1010                ):
1011                    data["_continue"] = True
1012
1013            if not data["_next_fire_time"] and not data["_splay"]:
1014                once = data["once"]
1015                if not isinstance(once, datetime.datetime):
1016                    once_fmt = data.get("once_fmt", "%Y-%m-%dT%H:%M:%S")
1017                    try:
1018                        once = datetime.datetime.strptime(data["once"], once_fmt)
1019                    except (TypeError, ValueError):
1020                        data["_error"] = (
1021                            "Date string could not "
1022                            "be parsed: {}, {}. "
1023                            "Ignoring job {}.".format(
1024                                data["once"], once_fmt, data["name"]
1025                            )
1026                        )
1027                        log.error(data["_error"])
1028                        return
1029                data["_next_fire_time"] = once
1030                data["_next_scheduled_fire_time"] = once
1031                # If _next_fire_time is less than now, continue
1032                if once < now - loop_interval:
1033                    data["_continue"] = True
1034
1035        def _handle_when(data, loop_interval):
1036            """
1037            Handle schedule item with when
1038            """
1039            if not _WHEN_SUPPORTED:
1040                data["_error"] = "Missing python-dateutil. Ignoring job {}.".format(
1041                    data["name"]
1042                )
1043                log.error(data["_error"])
1044                return
1045
1046            if not isinstance(data["when"], list):
1047                _when_data = [data["when"]]
1048            else:
1049                _when_data = data["when"]
1050
1051            _when = []
1052            for i in _when_data:
1053                if (
1054                    "pillar" in self.opts
1055                    and "whens" in self.opts["pillar"]
1056                    and i in self.opts["pillar"]["whens"]
1057                ):
1058                    if not isinstance(self.opts["pillar"]["whens"], dict):
1059                        data["_error"] = (
1060                            'Pillar item "whens" '
1061                            "must be a dict. "
1062                            "Ignoring job {}.".format(data["name"])
1063                        )
1064                        log.error(data["_error"])
1065                        return
1066                    when_ = self.opts["pillar"]["whens"][i]
1067                elif (
1068                    "whens" in self.opts["grains"] and i in self.opts["grains"]["whens"]
1069                ):
1070                    if not isinstance(self.opts["grains"]["whens"], dict):
1071                        data[
1072                            "_error"
1073                        ] = 'Grain "whens" must be a dict. Ignoring job {}.'.format(
1074                            data["name"]
1075                        )
1076                        log.error(data["_error"])
1077                        return
1078                    when_ = self.opts["grains"]["whens"][i]
1079                else:
1080                    when_ = i
1081
1082                if not isinstance(when_, datetime.datetime):
1083                    try:
1084                        when_ = dateutil_parser.parse(when_)
1085                    except ValueError:
1086                        data[
1087                            "_error"
1088                        ] = "Invalid date string {}. Ignoring job {}.".format(
1089                            i, data["name"]
1090                        )
1091                        log.error(data["_error"])
1092                        return
1093
1094                _when.append(when_)
1095
1096            if data["_splay"]:
1097                _when.append(data["_splay"])
1098
1099            # Sort the list of "whens" from earlier to later schedules
1100            _when.sort()
1101
1102            # Copy the list so we can loop through it
1103            for i in copy.deepcopy(_when):
1104                if len(_when) > 1:
1105                    if i < now - loop_interval:
1106                        # Remove all missed schedules except the latest one.
1107                        # We need it to detect if it was triggered previously.
1108                        _when.remove(i)
1109
1110            if _when:
1111                # Grab the first element, which is the next run time or
1112                # last scheduled time in the past.
1113                when = _when[0]
1114
1115                if (
1116                    when < now - loop_interval
1117                    and not data.get("_run", False)
1118                    and not run
1119                    and not data["_splay"]
1120                ):
1121                    data["_next_fire_time"] = None
1122                    data["_continue"] = True
1123                    return
1124
1125                if "_run" not in data:
1126                    # Prevent run of jobs from the past
1127                    data["_run"] = bool(when >= now - loop_interval)
1128
1129                if not data["_next_fire_time"]:
1130                    data["_next_fire_time"] = when
1131
1132                data["_next_scheduled_fire_time"] = when
1133
1134                if data["_next_fire_time"] < when and not run and not data["_run"]:
1135                    data["_next_fire_time"] = when
1136                    data["_run"] = True
1137
1138            elif not data.get("_run", False):
1139                data["_next_fire_time"] = None
1140                data["_continue"] = True
1141
1142        def _handle_cron(data, loop_interval):
1143            """
1144            Handle schedule item with cron
1145            """
1146            if not _CRON_SUPPORTED:
1147                data["_error"] = "Missing python-croniter. Ignoring job {}.".format(
1148                    data["name"]
1149                )
1150                log.error(data["_error"])
1151                return
1152
1153            if data["_next_fire_time"] is None:
1154                # Get next time frame for a "cron" job if it has been never
1155                # executed before or already executed in the past.
1156                try:
1157                    data["_next_fire_time"] = croniter.croniter(
1158                        data["cron"], now
1159                    ).get_next(datetime.datetime)
1160                    data["_next_scheduled_fire_time"] = croniter.croniter(
1161                        data["cron"], now
1162                    ).get_next(datetime.datetime)
1163                except (ValueError, KeyError):
1164                    data["_error"] = "Invalid cron string. Ignoring job {}.".format(
1165                        data["name"]
1166                    )
1167                    log.error(data["_error"])
1168                    return
1169
1170                # If next job run is scheduled more than 1 minute ahead and
1171                # configured loop interval is longer than that, we should
1172                # shorten it to get our job executed closer to the beginning
1173                # of desired time.
1174                interval = (now - data["_next_fire_time"]).total_seconds()
1175                if interval >= 60 and interval < self.loop_interval:
1176                    self.loop_interval = interval
1177
1178        def _handle_run_explicit(data, loop_interval):
1179            """
1180            Handle schedule item with run_explicit
1181            """
1182            _run_explicit = []
1183            for _run_time in data["run_explicit"]:
1184                if isinstance(_run_time, datetime.datetime):
1185                    _run_explicit.append(_run_time)
1186                else:
1187                    _run_explicit.append(
1188                        datetime.datetime.strptime(
1189                            _run_time["time"], _run_time["time_fmt"]
1190                        )
1191                    )
1192            data["run"] = False
1193
1194            # Copy the list so we can loop through it
1195            for i in copy.deepcopy(_run_explicit):
1196                if len(_run_explicit) > 1:
1197                    if i < now - loop_interval:
1198                        _run_explicit.remove(i)
1199
1200            if _run_explicit:
1201                if _run_explicit[0] <= now < _run_explicit[0] + loop_interval:
1202                    data["run"] = True
1203                    data["_next_fire_time"] = _run_explicit[0]
1204
1205        def _handle_skip_explicit(data, loop_interval):
1206            """
1207            Handle schedule item with skip_explicit
1208            """
1209            data["run"] = False
1210
1211            _skip_explicit = []
1212            for _skip_time in data["skip_explicit"]:
1213                if isinstance(_skip_time, datetime.datetime):
1214                    _skip_explicit.append(_skip_time)
1215                else:
1216                    _skip_explicit.append(
1217                        datetime.datetime.strptime(
1218                            _skip_time["time"], _skip_time["time_fmt"]
1219                        )
1220                    )
1221
1222            # Copy the list so we can loop through it
1223            for i in copy.deepcopy(_skip_explicit):
1224                if i < now - loop_interval:
1225                    _skip_explicit.remove(i)
1226
1227            if _skip_explicit:
1228                if _skip_explicit[0] <= now <= (_skip_explicit[0] + loop_interval):
1229                    if self.skip_function:
1230                        data["run"] = True
1231                        data["func"] = self.skip_function
1232                    else:
1233                        data["_skip_reason"] = "skip_explicit"
1234                        data["_skipped_time"] = now
1235                        data["_skipped"] = True
1236                        data["run"] = False
1237            else:
1238                data["run"] = True
1239
1240        def _handle_skip_during_range(data, loop_interval):
1241            """
1242            Handle schedule item with skip_explicit
1243            """
1244            if not _RANGE_SUPPORTED:
1245                data["_error"] = "Missing python-dateutil. Ignoring job {}.".format(
1246                    data["name"]
1247                )
1248                log.error(data["_error"])
1249                return
1250
1251            if not isinstance(data["skip_during_range"], dict):
1252                data["_error"] = (
1253                    "schedule.handle_func: Invalid, range "
1254                    "must be specified as a dictionary. "
1255                    "Ignoring job {}.".format(data["name"])
1256                )
1257                log.error(data["_error"])
1258                return
1259
1260            start = data["skip_during_range"]["start"]
1261            end = data["skip_during_range"]["end"]
1262            if not isinstance(start, datetime.datetime):
1263                try:
1264                    start = dateutil_parser.parse(start)
1265                except ValueError:
1266                    data["_error"] = (
1267                        "Invalid date string for start in "
1268                        "skip_during_range. Ignoring "
1269                        "job {}.".format(data["name"])
1270                    )
1271                    log.error(data["_error"])
1272                    return
1273
1274            if not isinstance(end, datetime.datetime):
1275                try:
1276                    end = dateutil_parser.parse(end)
1277                except ValueError:
1278                    data["_error"] = (
1279                        "Invalid date string for end in "
1280                        "skip_during_range. Ignoring "
1281                        "job {}.".format(data["name"])
1282                    )
1283                    log.error(data["_error"])
1284                    return
1285
1286            # Check to see if we should run the job immediately
1287            # after the skip_during_range is over
1288            if "run_after_skip_range" in data and data["run_after_skip_range"]:
1289                if "run_explicit" not in data:
1290                    data["run_explicit"] = []
1291                # Add a run_explicit for immediately after the
1292                # skip_during_range ends
1293                _run_immediate = (end + loop_interval).strftime("%Y-%m-%dT%H:%M:%S")
1294                if _run_immediate not in data["run_explicit"]:
1295                    data["run_explicit"].append(
1296                        {"time": _run_immediate, "time_fmt": "%Y-%m-%dT%H:%M:%S"}
1297                    )
1298
1299            if end > start:
1300                if start <= now <= end:
1301                    if self.skip_function:
1302                        data["run"] = True
1303                        data["func"] = self.skip_function
1304                    else:
1305                        data["_skip_reason"] = "in_skip_range"
1306                        data["_skipped_time"] = now
1307                        data["_skipped"] = True
1308                        data["run"] = False
1309                else:
1310                    data["run"] = True
1311            else:
1312                data["_error"] = (
1313                    "schedule.handle_func: Invalid "
1314                    "range, end must be larger than "
1315                    "start. Ignoring job {}.".format(data["name"])
1316                )
1317                log.error(data["_error"])
1318
1319        def _handle_range(data):
1320            """
1321            Handle schedule item with skip_explicit
1322            """
1323            if not _RANGE_SUPPORTED:
1324                data["_error"] = "Missing python-dateutil. Ignoring job {}".format(
1325                    data["name"]
1326                )
1327                log.error(data["_error"])
1328                return
1329
1330            if not isinstance(data["range"], dict):
1331                data["_error"] = (
1332                    "schedule.handle_func: Invalid, range "
1333                    "must be specified as a dictionary."
1334                    "Ignoring job {}.".format(data["name"])
1335                )
1336                log.error(data["_error"])
1337                return
1338
1339            start = data["range"]["start"]
1340            end = data["range"]["end"]
1341            if not isinstance(start, datetime.datetime):
1342                try:
1343                    start = dateutil_parser.parse(start)
1344                except ValueError:
1345                    data[
1346                        "_error"
1347                    ] = "Invalid date string for start. Ignoring job {}.".format(
1348                        data["name"]
1349                    )
1350                    log.error(data["_error"])
1351                    return
1352
1353            if not isinstance(end, datetime.datetime):
1354                try:
1355                    end = dateutil_parser.parse(end)
1356                except ValueError:
1357                    data[
1358                        "_error"
1359                    ] = "Invalid date string for end. Ignoring job {}.".format(
1360                        data["name"]
1361                    )
1362                    log.error(data["_error"])
1363                    return
1364
1365            if end > start:
1366                if "invert" in data["range"] and data["range"]["invert"]:
1367                    if now <= start or now >= end:
1368                        data["run"] = True
1369                    else:
1370                        data["_skip_reason"] = "in_skip_range"
1371                        data["run"] = False
1372                else:
1373                    if start <= now <= end:
1374                        data["run"] = True
1375                    else:
1376                        if self.skip_function:
1377                            data["run"] = True
1378                            data["func"] = self.skip_function
1379                        else:
1380                            data["_skip_reason"] = "not_in_range"
1381                            data["run"] = False
1382            else:
1383                data["_error"] = (
1384                    "schedule.handle_func: Invalid "
1385                    "range, end must be larger "
1386                    "than start. Ignoring job {}.".format(data["name"])
1387                )
1388                log.error(data["_error"])
1389
1390        def _handle_after(data):
1391            """
1392            Handle schedule item with after
1393            """
1394            if not _WHEN_SUPPORTED:
1395                data["_error"] = "Missing python-dateutil. Ignoring job {}".format(
1396                    data["name"]
1397                )
1398                log.error(data["_error"])
1399                return
1400
1401            after = data["after"]
1402            if not isinstance(after, datetime.datetime):
1403                after = dateutil_parser.parse(after)
1404
1405            if after >= now:
1406                log.debug("After time has not passed skipping job: %s.", data["name"])
1407                data["_skip_reason"] = "after_not_passed"
1408                data["_skipped_time"] = now
1409                data["_skipped"] = True
1410                data["run"] = False
1411            else:
1412                data["run"] = True
1413
1414        def _handle_until(data):
1415            """
1416            Handle schedule item with until
1417            """
1418            if not _WHEN_SUPPORTED:
1419                data["_error"] = "Missing python-dateutil. Ignoring job {}".format(
1420                    data["name"]
1421                )
1422                log.error(data["_error"])
1423                return
1424
1425            until = data["until"]
1426            if not isinstance(until, datetime.datetime):
1427                until = dateutil_parser.parse(until)
1428
1429            if until <= now:
1430                log.debug("Until time has passed skipping job: %s.", data["name"])
1431                data["_skip_reason"] = "until_passed"
1432                data["_skipped_time"] = now
1433                data["_skipped"] = True
1434                data["run"] = False
1435            else:
1436                data["run"] = True
1437
1438        def _chop_ms(dt):
1439            """
1440            Remove the microseconds from a datetime object
1441            """
1442            return dt - datetime.timedelta(microseconds=dt.microsecond)
1443
1444        schedule = self._get_schedule()
1445        if not isinstance(schedule, dict):
1446            raise ValueError("Schedule must be of type dict.")
1447        if "skip_function" in schedule:
1448            self.skip_function = schedule["skip_function"]
1449        if "skip_during_range" in schedule:
1450            self.skip_during_range = schedule["skip_during_range"]
1451        if "enabled" in schedule:
1452            self.enabled = schedule["enabled"]
1453        if "splay" in schedule:
1454            self.splay = schedule["splay"]
1455
1456        _hidden = ["enabled", "skip_function", "skip_during_range", "splay"]
1457        for job, data in schedule.items():
1458
1459            # Skip anything that is a global setting
1460            if job in _hidden:
1461                continue
1462
1463            # Clear these out between runs
1464            for item in [
1465                "_continue",
1466                "_error",
1467                "_enabled",
1468                "_skipped",
1469                "_skip_reason",
1470                "_skipped_time",
1471            ]:
1472                if item in data:
1473                    del data[item]
1474            run = False
1475
1476            if "name" in data:
1477                job_name = data["name"]
1478            else:
1479                job_name = data["name"] = job
1480
1481            if not isinstance(data, dict):
1482                log.error(
1483                    'Scheduled job "%s" should have a dict value, not %s',
1484                    job_name,
1485                    type(data),
1486                )
1487                continue
1488
1489            if "function" in data:
1490                func = data["function"]
1491            elif "func" in data:
1492                func = data["func"]
1493            elif "fun" in data:
1494                func = data["fun"]
1495            else:
1496                func = None
1497
1498            if func not in self.functions:
1499                log.info("Invalid function: %s in scheduled job %s.", func, job_name)
1500
1501            if "_next_fire_time" not in data:
1502                data["_next_fire_time"] = None
1503
1504            if "_splay" not in data:
1505                data["_splay"] = None
1506
1507            if (
1508                "run_on_start" in data
1509                and data["run_on_start"]
1510                and "_run_on_start" not in data
1511            ):
1512                data["_run_on_start"] = True
1513
1514            if not now:
1515                now = datetime.datetime.now()
1516
1517            # Used for quick lookups when detecting invalid option
1518            # combinations.
1519            schedule_keys = set(data.keys())
1520
1521            time_elements = ("seconds", "minutes", "hours", "days")
1522            scheduling_elements = ("when", "cron", "once")
1523
1524            invalid_sched_combos = [
1525                set(i) for i in itertools.combinations(scheduling_elements, 2)
1526            ]
1527
1528            if any(i <= schedule_keys for i in invalid_sched_combos):
1529                log.error(
1530                    'Unable to use "%s" options together. Ignoring.',
1531                    '", "'.join(scheduling_elements),
1532                )
1533                continue
1534
1535            invalid_time_combos = []
1536            for item in scheduling_elements:
1537                all_items = itertools.chain([item], time_elements)
1538                invalid_time_combos.append(set(itertools.combinations(all_items, 2)))
1539
1540            if any(set(x) <= schedule_keys for x in invalid_time_combos):
1541                log.error(
1542                    'Unable to use "%s" with "%s" options. Ignoring',
1543                    '", "'.join(time_elements),
1544                    '", "'.join(scheduling_elements),
1545                )
1546                continue
1547
1548            if "run_explicit" in data:
1549                _handle_run_explicit(data, loop_interval)
1550                run = data["run"]
1551
1552            if True in [True for item in time_elements if item in data]:
1553                _handle_time_elements(data)
1554            elif "once" in data:
1555                _handle_once(data, loop_interval)
1556            elif "when" in data:
1557                _handle_when(data, loop_interval)
1558            elif "cron" in data:
1559                _handle_cron(data, loop_interval)
1560            else:
1561                continue
1562
1563            # Something told us to continue, so we continue
1564            if "_continue" in data and data["_continue"]:
1565                continue
1566
1567            # An error occurred so we bail out
1568            if "_error" in data and data["_error"]:
1569                continue
1570
1571            seconds = int(
1572                (_chop_ms(data["_next_fire_time"]) - _chop_ms(now)).total_seconds()
1573            )
1574
1575            # If there is no job specific splay available,
1576            # grab the global which defaults to None.
1577            if "splay" not in data:
1578                data["splay"] = self.splay
1579
1580            if "splay" in data and data["splay"]:
1581                # Got "splay" configured, make decision to run a job based on that
1582                if not data["_splay"]:
1583                    # Try to add "splay" time only if next job fire time is
1584                    # still in the future. We should trigger job run
1585                    # immediately otherwise.
1586                    splay = _splay(data["splay"])
1587                    if now < data["_next_fire_time"] + datetime.timedelta(
1588                        seconds=splay
1589                    ):
1590                        log.debug(
1591                            "schedule.handle_func: Adding splay of "
1592                            "%s seconds to next run.",
1593                            splay,
1594                        )
1595                        data["_splay"] = data["_next_fire_time"] + datetime.timedelta(
1596                            seconds=splay
1597                        )
1598                        if "when" in data:
1599                            data["_run"] = True
1600                    else:
1601                        run = True
1602
1603                if data["_splay"]:
1604                    # The "splay" configuration has been already processed, just use it
1605                    seconds = (data["_splay"] - now).total_seconds()
1606                    if "when" in data:
1607                        data["_next_fire_time"] = data["_splay"]
1608
1609            if "_seconds" in data:
1610                if seconds <= 0:
1611                    run = True
1612            elif "when" in data and data["_run"]:
1613                if (
1614                    data["_next_fire_time"]
1615                    <= now
1616                    <= (data["_next_fire_time"] + loop_interval)
1617                ):
1618                    data["_run"] = False
1619                    run = True
1620            elif "cron" in data:
1621                # Reset next scheduled time because it is in the past now,
1622                # and we should trigger the job run, then wait for the next one.
1623                if seconds <= 0:
1624                    data["_next_fire_time"] = None
1625                    run = True
1626            elif "once" in data:
1627                if (
1628                    data["_next_fire_time"]
1629                    <= now
1630                    <= (data["_next_fire_time"] + loop_interval)
1631                ):
1632                    run = True
1633            elif seconds == 0:
1634                run = True
1635
1636            if "_run_on_start" in data and data["_run_on_start"]:
1637                run = True
1638                data["_run_on_start"] = False
1639            elif run:
1640                if "range" in data:
1641                    _handle_range(data)
1642
1643                    # An error occurred so we bail out
1644                    if "_error" in data and data["_error"]:
1645                        continue
1646
1647                    run = data["run"]
1648                    # Override the functiton if passed back
1649                    if "func" in data:
1650                        func = data["func"]
1651
1652                # If there is no job specific skip_during_range available,
1653                # grab the global which defaults to None.
1654                if "skip_during_range" not in data and self.skip_during_range:
1655                    data["skip_during_range"] = self.skip_during_range
1656
1657                if "skip_during_range" in data and data["skip_during_range"]:
1658                    _handle_skip_during_range(data, loop_interval)
1659
1660                    # An error occurred so we bail out
1661                    if "_error" in data and data["_error"]:
1662                        continue
1663
1664                    run = data["run"]
1665                    # Override the functiton if passed back
1666                    if "func" in data:
1667                        func = data["func"]
1668
1669                if "skip_explicit" in data:
1670                    _handle_skip_explicit(data, loop_interval)
1671
1672                    # An error occurred so we bail out
1673                    if "_error" in data and data["_error"]:
1674                        continue
1675
1676                    run = data["run"]
1677                    # Override the functiton if passed back
1678                    if "func" in data:
1679                        func = data["func"]
1680
1681                if "until" in data:
1682                    _handle_until(data)
1683
1684                    # An error occurred so we bail out
1685                    if "_error" in data and data["_error"]:
1686                        continue
1687
1688                    run = data["run"]
1689
1690                if "after" in data:
1691                    _handle_after(data)
1692
1693                    # An error occurred so we bail out
1694                    if "_error" in data and data["_error"]:
1695                        continue
1696
1697                    run = data["run"]
1698
1699            # If the job item has continue, then we set run to False
1700            # so the job does not run but we still get the important
1701            # information calculated, eg. _next_fire_time
1702            if "_continue" in data and data["_continue"]:
1703                run = False
1704
1705            # If globally disabled or job
1706            # is diabled skip the job
1707            if not self.enabled or not data.get("enabled", True):
1708                log.trace("Job: %s is disabled", job_name)
1709                data["_skip_reason"] = "disabled"
1710                data["_skipped_time"] = now
1711                data["_skipped"] = True
1712                run = False
1713
1714            miss_msg = ""
1715            if seconds < 0:
1716                miss_msg = " (runtime missed by {} seconds)".format(abs(seconds))
1717
1718            try:
1719                if run:
1720                    if "jid_include" not in data or data["jid_include"]:
1721                        data["jid_include"] = True
1722                        log.debug(
1723                            "schedule: Job %s was scheduled with jid_include, "
1724                            "adding to cache (jid_include defaults to True)",
1725                            job_name,
1726                        )
1727                        if "maxrunning" in data:
1728                            log.debug(
1729                                "schedule: Job %s was scheduled with a max "
1730                                "number of %s",
1731                                job_name,
1732                                data["maxrunning"],
1733                            )
1734                        else:
1735                            log.info(
1736                                "schedule: maxrunning parameter was not specified for "
1737                                "job %s, defaulting to 1.",
1738                                job_name,
1739                            )
1740                            data["maxrunning"] = 1
1741
1742                    if not self.standalone:
1743                        data["run"] = run
1744                        data = self._check_max_running(func, data, self.opts, now)
1745                        run = data["run"]
1746
1747                # Check run again, just in case _check_max_running
1748                # set run to False
1749                if run:
1750                    jid = salt.utils.jid.gen_jid(self.opts)
1751                    jids.append(jid)
1752                    log.info(
1753                        "Running scheduled job: %s%s with jid %s",
1754                        job_name,
1755                        miss_msg,
1756                        jid,
1757                    )
1758                    self._run_job(func, data, jid=jid)
1759
1760            finally:
1761                # Only set _last_run if the job ran
1762                if run:
1763                    data["_last_run"] = now
1764                    data["_splay"] = None
1765                if "_seconds" in data:
1766                    if self.standalone:
1767                        data["_next_fire_time"] = now + datetime.timedelta(
1768                            seconds=data["_seconds"]
1769                        )
1770                    elif "_skipped" in data and data["_skipped"]:
1771                        data["_next_fire_time"] = now + datetime.timedelta(
1772                            seconds=data["_seconds"]
1773                        )
1774                    elif run:
1775                        data["_next_fire_time"] = now + datetime.timedelta(
1776                            seconds=data["_seconds"]
1777                        )
1778        return jids
1779
1780    def _run_job(self, func, data, jid=None):
1781        job_dry_run = data.get("dry_run", False)
1782        if job_dry_run:
1783            log.debug("Job %s has 'dry_run' set to True. Not running it.", data["name"])
1784            return
1785
1786        multiprocessing_enabled = self.opts.get("multiprocessing", True)
1787        run_schedule_jobs_in_background = self.opts.get(
1788            "run_schedule_jobs_in_background", True
1789        )
1790
1791        if run_schedule_jobs_in_background is False:
1792            # Explicitly pass False for multiprocessing_enabled
1793            self.handle_func(False, func, data, jid)
1794            return
1795
1796        if multiprocessing_enabled and salt.utils.platform.is_windows():
1797            # Temporarily stash our function references.
1798            # You can't pickle function references, and pickling is
1799            # required when spawning new processes on Windows.
1800            functions = self.functions
1801            self.functions = {}
1802            returners = self.returners
1803            self.returners = {}
1804            utils = self.utils
1805            self.utils = {}
1806
1807        try:
1808            if multiprocessing_enabled:
1809                thread_cls = salt.utils.process.SignalHandlingProcess
1810            else:
1811                thread_cls = threading.Thread
1812
1813            if multiprocessing_enabled:
1814                with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
1815                    proc = thread_cls(
1816                        target=self.handle_func,
1817                        args=(multiprocessing_enabled, func, data, jid),
1818                    )
1819                    # Reset current signals before starting the process in
1820                    # order not to inherit the current signal handlers
1821                    proc.start()
1822                    proc.name = "{}-Schedule-{}".format(proc.name, data["name"])
1823                    self._subprocess_list.add(proc)
1824            else:
1825                proc = thread_cls(
1826                    target=self.handle_func,
1827                    args=(multiprocessing_enabled, func, data, jid),
1828                )
1829                proc.start()
1830                proc.name = "{}-Schedule-{}".format(proc.name, data["name"])
1831                self._subprocess_list.add(proc)
1832        finally:
1833            if multiprocessing_enabled and salt.utils.platform.is_windows():
1834                # Restore our function references.
1835                self.functions = functions
1836                self.returners = returners
1837                self.utils = utils
1838
1839    def cleanup_subprocesses(self):
1840        self._subprocess_list.cleanup()
1841
1842
1843def clean_proc_dir(opts):
1844
1845    """
1846    Loop through jid files in the minion proc directory (default /var/cache/salt/minion/proc)
1847    and remove any that refer to processes that no longer exist
1848    """
1849
1850    for basefilename in os.listdir(salt.minion.get_proc_dir(opts["cachedir"])):
1851        fn_ = os.path.join(salt.minion.get_proc_dir(opts["cachedir"]), basefilename)
1852        with salt.utils.files.fopen(fn_, "rb") as fp_:
1853            job = None
1854            try:
1855                job = salt.payload.load(fp_)
1856            except Exception:  # pylint: disable=broad-except
1857                # It's corrupted
1858                # Windows cannot delete an open file
1859                if salt.utils.platform.is_windows():
1860                    fp_.close()
1861                try:
1862                    os.unlink(fn_)
1863                    continue
1864                except OSError:
1865                    continue
1866            log.debug(
1867                "schedule.clean_proc_dir: checking job %s for process existence", job
1868            )
1869            if job is not None and "pid" in job:
1870                if salt.utils.process.os_is_running(job["pid"]):
1871                    log.debug(
1872                        "schedule.clean_proc_dir: Cleaning proc dir, pid %s "
1873                        "still exists.",
1874                        job["pid"],
1875                    )
1876                else:
1877                    # Windows cannot delete an open file
1878                    if salt.utils.platform.is_windows():
1879                        fp_.close()
1880                    # Maybe the file is already gone
1881                    try:
1882                        os.unlink(fn_)
1883                    except OSError:
1884                        pass
1885