1# See doc/topics/jobs/index.rst 2""" 3Scheduling routines are located here. To activate the scheduler make the 4``schedule`` option available to the master or minion configurations (master 5config file or for the minion via config or pillar). 6 7Detailed tutorial about scheduling jobs can be found :ref:`here 8<scheduling-jobs>`. 9""" 10 11 12import copy 13import datetime 14import errno 15import itertools 16import logging 17import os 18import random 19import signal 20import sys 21import threading 22import time 23import weakref 24 25import salt.config 26import salt.defaults.exitcodes 27import salt.exceptions 28import salt.loader 29import salt.minion 30import salt.payload 31import salt.syspaths 32import salt.utils.args 33import salt.utils.error 34import salt.utils.event 35import salt.utils.files 36import salt.utils.jid 37import salt.utils.master 38import salt.utils.minion 39import salt.utils.platform 40import salt.utils.process 41import salt.utils.stringutils 42import salt.utils.user 43import salt.utils.yaml 44from salt.exceptions import SaltInvocationError 45from salt.utils.odict import OrderedDict 46 47# pylint: disable=import-error 48try: 49 import dateutil.parser as dateutil_parser 50 51 _WHEN_SUPPORTED = True 52 _RANGE_SUPPORTED = True 53except ImportError: 54 _WHEN_SUPPORTED = False 55 _RANGE_SUPPORTED = False 56 57try: 58 import croniter 59 60 _CRON_SUPPORTED = True 61except ImportError: 62 _CRON_SUPPORTED = False 63# pylint: enable=import-error 64 65log = logging.getLogger(__name__) 66 67 68class Schedule: 69 """ 70 Create a Schedule object, pass in the opts and the functions dict to use 71 """ 72 73 instance = None 74 75 def __new__( 76 cls, 77 opts, 78 functions, 79 returners=None, 80 intervals=None, 81 cleanup=None, 82 proxy=None, 83 standalone=False, 84 new_instance=False, 85 utils=None, 86 _subprocess_list=None, 87 ): 88 """ 89 Only create one instance of Schedule 90 """ 91 if cls.instance is None or new_instance is True: 92 log.debug("Initializing new Schedule") 93 # we need to make a local variable for this, as we are going to store 94 # it in a WeakValueDictionary-- which will remove the item if no one 95 # references it-- this forces a reference while we return to the caller 96 instance = object.__new__(cls) 97 instance.__singleton_init__( 98 opts, 99 functions, 100 returners=returners, 101 intervals=intervals, 102 cleanup=cleanup, 103 proxy=proxy, 104 standalone=standalone, 105 utils=utils, 106 _subprocess_list=_subprocess_list, 107 ) 108 if new_instance is True: 109 return instance 110 cls.instance = instance 111 else: 112 log.debug("Re-using Schedule") 113 return cls.instance 114 115 # has to remain empty for singletons, since __init__ will *always* be called 116 def __init__( 117 self, 118 opts, 119 functions, 120 returners=None, 121 intervals=None, 122 cleanup=None, 123 proxy=None, 124 standalone=False, 125 new_instance=False, 126 utils=None, 127 _subprocess_list=None, 128 ): 129 pass 130 131 # an init for the singleton instance to call 132 def __singleton_init__( 133 self, 134 opts, 135 functions, 136 returners=None, 137 intervals=None, 138 cleanup=None, 139 proxy=None, 140 standalone=False, 141 utils=None, 142 _subprocess_list=None, 143 ): 144 self.opts = opts 145 self.proxy = proxy 146 self.functions = functions 147 self.utils = utils or salt.loader.utils(opts) 148 self.standalone = standalone 149 self.skip_function = None 150 self.skip_during_range = None 151 self.splay = None 152 self.enabled = True 153 if isinstance(intervals, dict): 154 self.intervals = intervals 155 else: 156 self.intervals = {} 157 if not self.standalone: 158 if hasattr(returners, "__getitem__"): 159 self.returners = returners 160 else: 161 self.returners = returners.loader.gen_functions() 162 try: 163 self.time_offset = self.functions.get( 164 "timezone.get_offset", lambda: "0000" 165 )() 166 except Exception: # pylint: disable=W0703 167 # get_offset can fail, if that happens, default to 0000 168 log.warning( 169 "Unable to obtain correct timezone offset, defaulting to 0000", 170 exc_info_on_loglevel=logging.DEBUG, 171 ) 172 self.time_offset = "0000" 173 174 self.schedule_returner = self.option("schedule_returner") 175 # Keep track of the lowest loop interval needed in this variable 176 self.loop_interval = sys.maxsize 177 if not self.standalone: 178 clean_proc_dir(opts) 179 if cleanup: 180 for prefix in cleanup: 181 self.delete_job_prefix(prefix) 182 if _subprocess_list is None: 183 self._subprocess_list = salt.utils.process.SubprocessList() 184 else: 185 self._subprocess_list = _subprocess_list 186 187 def __getnewargs__(self): 188 return self.opts, self.functions, self.returners, self.intervals, None 189 190 def option(self, opt): 191 """ 192 Return options merged from config and pillar 193 """ 194 if "config.merge" in self.functions: 195 return self.functions["config.merge"](opt, {}, omit_master=True) 196 return self.opts.get(opt, {}) 197 198 def _get_schedule( 199 self, include_opts=True, include_pillar=True, remove_hidden=False 200 ): 201 """ 202 Return the schedule data structure 203 """ 204 schedule = {} 205 if include_pillar: 206 pillar_schedule = self.opts.get("pillar", {}).get("schedule", {}) 207 if not isinstance(pillar_schedule, dict): 208 raise ValueError("Schedule must be of type dict.") 209 schedule.update(pillar_schedule) 210 if include_opts: 211 opts_schedule = self.opts.get("schedule", {}) 212 if not isinstance(opts_schedule, dict): 213 raise ValueError("Schedule must be of type dict.") 214 schedule.update(opts_schedule) 215 216 if remove_hidden: 217 _schedule = copy.deepcopy(schedule) 218 for job in _schedule: 219 if isinstance(_schedule[job], dict): 220 for item in _schedule[job]: 221 if item.startswith("_"): 222 del schedule[job][item] 223 return schedule 224 225 def _check_max_running(self, func, data, opts, now): 226 """ 227 Return the schedule data structure 228 """ 229 # Check to see if there are other jobs with this 230 # signature running. If there are more than maxrunning 231 # jobs present then don't start another. 232 # If jid_include is False for this job we can ignore all this 233 # NOTE--jid_include defaults to True, thus if it is missing from the data 234 # dict we treat it like it was there and is True 235 236 # Check if we're able to run 237 if "run" not in data or not data["run"]: 238 return data 239 if "jid_include" not in data or data["jid_include"]: 240 jobcount = 0 241 if self.opts["__role"] == "master": 242 current_jobs = salt.utils.master.get_running_jobs(self.opts) 243 else: 244 current_jobs = salt.utils.minion.running(self.opts) 245 for job in current_jobs: 246 if "schedule" in job: 247 log.debug( 248 "schedule.handle_func: Checking job against fun %s: %s", 249 func, 250 job, 251 ) 252 if data["name"] == job[ 253 "schedule" 254 ] and salt.utils.process.os_is_running(job["pid"]): 255 jobcount += 1 256 log.debug( 257 "schedule.handle_func: Incrementing jobcount, " 258 "now %s, maxrunning is %s", 259 jobcount, 260 data["maxrunning"], 261 ) 262 if jobcount >= data["maxrunning"]: 263 log.debug( 264 "schedule.handle_func: The scheduled job " 265 "%s was not started, %s already running", 266 data["name"], 267 data["maxrunning"], 268 ) 269 data["_skip_reason"] = "maxrunning" 270 data["_skipped"] = True 271 data["_skipped_time"] = now 272 data["run"] = False 273 return data 274 return data 275 276 def persist(self): 277 """ 278 Persist the modified schedule into <<configdir>>/<<default_include>>/_schedule.conf 279 """ 280 config_dir = self.opts.get("conf_dir", None) 281 if config_dir is None and "conf_file" in self.opts: 282 config_dir = os.path.dirname(self.opts["conf_file"]) 283 if config_dir is None: 284 config_dir = salt.syspaths.CONFIG_DIR 285 286 minion_d_dir = os.path.join( 287 config_dir, 288 os.path.dirname( 289 self.opts.get( 290 "default_include", 291 salt.config.DEFAULT_MINION_OPTS["default_include"], 292 ) 293 ), 294 ) 295 296 if not os.path.isdir(minion_d_dir): 297 os.makedirs(minion_d_dir) 298 299 schedule_conf = os.path.join(minion_d_dir, "_schedule.conf") 300 log.debug("Persisting schedule") 301 schedule_data = self._get_schedule(include_pillar=False, remove_hidden=True) 302 try: 303 with salt.utils.files.fopen(schedule_conf, "wb+") as fp_: 304 fp_.write( 305 salt.utils.stringutils.to_bytes( 306 salt.utils.yaml.safe_dump({"schedule": schedule_data}) 307 ) 308 ) 309 except OSError: 310 log.error( 311 "Failed to persist the updated schedule", 312 exc_info_on_loglevel=logging.DEBUG, 313 ) 314 315 def delete_job(self, name, persist=True): 316 """ 317 Deletes a job from the scheduler. Ignore jobs from pillar 318 """ 319 # ensure job exists, then delete it 320 if name in self.opts["schedule"]: 321 del self.opts["schedule"][name] 322 elif name in self._get_schedule(include_opts=False): 323 log.warning("Cannot delete job %s, it's in the pillar!", name) 324 325 # Fire the complete event back along with updated list of schedule 326 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 327 evt.fire_event( 328 {"complete": True, "schedule": self._get_schedule()}, 329 tag="/salt/minion/minion_schedule_delete_complete", 330 ) 331 332 # remove from self.intervals 333 if name in self.intervals: 334 del self.intervals[name] 335 336 if persist: 337 self.persist() 338 339 def reset(self): 340 """ 341 Reset the scheduler to defaults 342 """ 343 self.skip_function = None 344 self.skip_during_range = None 345 self.enabled = True 346 self.splay = None 347 self.opts["schedule"] = {} 348 349 def delete_job_prefix(self, name, persist=True): 350 """ 351 Deletes a job from the scheduler. Ignores jobs from pillar 352 """ 353 # ensure job exists, then delete it 354 for job in list(self.opts["schedule"].keys()): 355 if job.startswith(name): 356 del self.opts["schedule"][job] 357 for job in self._get_schedule(include_opts=False): 358 if job.startswith(name): 359 log.warning("Cannot delete job %s, it's in the pillar!", job) 360 361 # Fire the complete event back along with updated list of schedule 362 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 363 evt.fire_event( 364 {"complete": True, "schedule": self._get_schedule()}, 365 tag="/salt/minion/minion_schedule_delete_complete", 366 ) 367 368 # remove from self.intervals 369 for job in list(self.intervals.keys()): 370 if job.startswith(name): 371 del self.intervals[job] 372 373 if persist: 374 self.persist() 375 376 def add_job(self, data, persist=True): 377 """ 378 Adds a new job to the scheduler. The format is the same as required in 379 the configuration file. See the docs on how YAML is interpreted into 380 python data-structures to make sure, you pass correct dictionaries. 381 """ 382 383 # we don't do any checking here besides making sure its a dict. 384 # eval() already does for us and raises errors accordingly 385 if not isinstance(data, dict): 386 raise ValueError("Scheduled jobs have to be of type dict.") 387 if not len(data) == 1: 388 raise ValueError("You can only schedule one new job at a time.") 389 390 # if enabled is not included in the job, 391 # assume job is enabled. 392 for job in data: 393 if "enabled" not in data[job]: 394 data[job]["enabled"] = True 395 396 new_job = next(iter(data.keys())) 397 398 if new_job in self._get_schedule(include_opts=False): 399 log.warning("Cannot update job %s, it's in the pillar!", new_job) 400 401 elif new_job in self.opts["schedule"]: 402 log.info("Updating job settings for scheduled job: %s", new_job) 403 self.opts["schedule"].update(data) 404 405 else: 406 log.info("Added new job %s to scheduler", new_job) 407 self.opts["schedule"].update(data) 408 409 # Fire the complete event back along with updated list of schedule 410 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 411 evt.fire_event( 412 {"complete": True, "schedule": self._get_schedule()}, 413 tag="/salt/minion/minion_schedule_add_complete", 414 ) 415 416 if persist: 417 self.persist() 418 419 def enable_job(self, name, persist=True): 420 """ 421 Enable a job in the scheduler. Ignores jobs from pillar 422 """ 423 # ensure job exists, then enable it 424 if name in self.opts["schedule"]: 425 self.opts["schedule"][name]["enabled"] = True 426 log.info("Enabling job %s in scheduler", name) 427 elif name in self._get_schedule(include_opts=False): 428 log.warning("Cannot modify job %s, it's in the pillar!", name) 429 430 # Fire the complete event back along with updated list of schedule 431 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 432 evt.fire_event( 433 {"complete": True, "schedule": self._get_schedule()}, 434 tag="/salt/minion/minion_schedule_enabled_job_complete", 435 ) 436 437 if persist: 438 self.persist() 439 440 def disable_job(self, name, persist=True): 441 """ 442 Disable a job in the scheduler. Ignores jobs from pillar 443 """ 444 # ensure job exists, then disable it 445 if name in self.opts["schedule"]: 446 self.opts["schedule"][name]["enabled"] = False 447 log.info("Disabling job %s in scheduler", name) 448 elif name in self._get_schedule(include_opts=False): 449 log.warning("Cannot modify job %s, it's in the pillar!", name) 450 451 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 452 # Fire the complete event back along with updated list of schedule 453 evt.fire_event( 454 {"complete": True, "schedule": self._get_schedule()}, 455 tag="/salt/minion/minion_schedule_disabled_job_complete", 456 ) 457 458 if persist: 459 self.persist() 460 461 def modify_job(self, name, schedule, persist=True): 462 """ 463 Modify a job in the scheduler. Ignores jobs from pillar 464 """ 465 # ensure job exists, then replace it 466 if name in self.opts["schedule"]: 467 self.delete_job(name, persist) 468 elif name in self._get_schedule(include_opts=False): 469 log.warning("Cannot modify job %s, it's in the pillar!", name) 470 return 471 472 self.opts["schedule"][name] = schedule 473 474 if persist: 475 self.persist() 476 477 def run_job(self, name): 478 """ 479 Run a schedule job now 480 """ 481 data = self._get_schedule().get(name, {}) 482 483 if "function" in data: 484 func = data["function"] 485 elif "func" in data: 486 func = data["func"] 487 elif "fun" in data: 488 func = data["fun"] 489 else: 490 func = None 491 if func not in self.functions: 492 log.info("Invalid function: %s in scheduled job %s.", func, name) 493 494 if "name" not in data: 495 data["name"] = name 496 497 # Assume run should be True until we check max_running 498 if "run" not in data: 499 data["run"] = True 500 501 if not self.standalone: 502 data = self._check_max_running( 503 func, data, self.opts, datetime.datetime.now() 504 ) 505 506 # Grab run, assume True 507 if data.get("run"): 508 log.info("Running Job: %s", name) 509 self._run_job(func, data) 510 511 def enable_schedule(self, persist=True): 512 """ 513 Enable the scheduler. 514 """ 515 self.opts["schedule"]["enabled"] = True 516 517 # Fire the complete event back along with updated list of schedule 518 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 519 evt.fire_event( 520 {"complete": True, "schedule": self._get_schedule()}, 521 tag="/salt/minion/minion_schedule_enabled_complete", 522 ) 523 524 if persist: 525 self.persist() 526 527 def disable_schedule(self, persist=True): 528 """ 529 Disable the scheduler. 530 """ 531 self.opts["schedule"]["enabled"] = False 532 533 # Fire the complete event back along with updated list of schedule 534 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 535 evt.fire_event( 536 {"complete": True, "schedule": self._get_schedule()}, 537 tag="/salt/minion/minion_schedule_disabled_complete", 538 ) 539 540 if persist: 541 self.persist() 542 543 def reload(self, schedule): 544 """ 545 Reload the schedule from saved schedule file. 546 """ 547 # Remove all jobs from self.intervals 548 self.intervals = {} 549 550 if "schedule" in schedule: 551 schedule = schedule["schedule"] 552 self.opts.setdefault("schedule", {}).update(schedule) 553 554 def list(self, where): 555 """ 556 List the current schedule items 557 """ 558 if where == "pillar": 559 schedule = self._get_schedule(include_opts=False) 560 elif where == "opts": 561 schedule = self._get_schedule(include_pillar=False) 562 else: 563 schedule = self._get_schedule() 564 565 # Fire the complete event back along with the list of schedule 566 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 567 evt.fire_event( 568 {"complete": True, "schedule": schedule}, 569 tag="/salt/minion/minion_schedule_list_complete", 570 ) 571 572 def save_schedule(self): 573 """ 574 Save the current schedule 575 """ 576 self.persist() 577 578 # Fire the complete event back along with the list of schedule 579 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 580 evt.fire_event({"complete": True}, tag="/salt/minion/minion_schedule_saved") 581 582 def postpone_job(self, name, data): 583 """ 584 Postpone a job in the scheduler. 585 Ignores jobs from pillar 586 """ 587 time = data["time"] 588 new_time = data["new_time"] 589 time_fmt = data.get("time_fmt", "%Y-%m-%dT%H:%M:%S") 590 591 # ensure job exists, then disable it 592 if name in self.opts["schedule"]: 593 if "skip_explicit" not in self.opts["schedule"][name]: 594 self.opts["schedule"][name]["skip_explicit"] = [] 595 self.opts["schedule"][name]["skip_explicit"].append( 596 {"time": time, "time_fmt": time_fmt} 597 ) 598 599 if "run_explicit" not in self.opts["schedule"][name]: 600 self.opts["schedule"][name]["run_explicit"] = [] 601 self.opts["schedule"][name]["run_explicit"].append( 602 {"time": new_time, "time_fmt": time_fmt} 603 ) 604 605 elif name in self._get_schedule(include_opts=False): 606 log.warning("Cannot modify job %s, it's in the pillar!", name) 607 608 # Fire the complete event back along with updated list of schedule 609 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 610 evt.fire_event( 611 {"complete": True, "schedule": self._get_schedule()}, 612 tag="/salt/minion/minion_schedule_postpone_job_complete", 613 ) 614 615 def skip_job(self, name, data): 616 """ 617 Skip a job at a specific time in the scheduler. 618 Ignores jobs from pillar 619 """ 620 time = data["time"] 621 time_fmt = data.get("time_fmt", "%Y-%m-%dT%H:%M:%S") 622 623 # ensure job exists, then disable it 624 if name in self.opts["schedule"]: 625 if "skip_explicit" not in self.opts["schedule"][name]: 626 self.opts["schedule"][name]["skip_explicit"] = [] 627 self.opts["schedule"][name]["skip_explicit"].append( 628 {"time": time, "time_fmt": time_fmt} 629 ) 630 631 elif name in self._get_schedule(include_opts=False): 632 log.warning("Cannot modify job %s, it's in the pillar!", name) 633 634 # Fire the complete event back along with updated list of schedule 635 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 636 evt.fire_event( 637 {"complete": True, "schedule": self._get_schedule()}, 638 tag="/salt/minion/minion_schedule_skip_job_complete", 639 ) 640 641 def get_next_fire_time(self, name, fmt="%Y-%m-%dT%H:%M:%S"): 642 """ 643 Return the next fire time for the specified job 644 """ 645 646 schedule = self._get_schedule() 647 _next_fire_time = None 648 if schedule: 649 _next_fire_time = schedule.get(name, {}).get("_next_fire_time", None) 650 if _next_fire_time: 651 _next_fire_time = _next_fire_time.strftime(fmt) 652 653 # Fire the complete event back along with updated list of schedule 654 with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: 655 evt.fire_event( 656 {"complete": True, "next_fire_time": _next_fire_time}, 657 tag="/salt/minion/minion_schedule_next_fire_time_complete", 658 ) 659 660 def job_status(self, name, fire_event=False): 661 """ 662 Return the specified schedule item 663 """ 664 665 if fire_event: 666 schedule = self._get_schedule() 667 data = schedule.get(name, {}) 668 669 # Fire the complete event back along with updated list of schedule 670 with salt.utils.event.get_event( 671 "minion", opts=self.opts, listen=False 672 ) as evt: 673 evt.fire_event( 674 {"complete": True, "data": data}, 675 tag="/salt/minion/minion_schedule_job_status_complete", 676 ) 677 else: 678 schedule = self._get_schedule() 679 return schedule.get(name, {}) 680 681 def handle_func(self, multiprocessing_enabled, func, data, jid=None): 682 """ 683 Execute this method in a multiprocess or thread 684 """ 685 if salt.utils.platform.is_windows() or self.opts.get("transport") == "zeromq": 686 # Since function references can't be pickled and pickling 687 # is required when spawning new processes on Windows, regenerate 688 # the functions and returners. 689 # This also needed for ZeroMQ transport to reset all functions 690 # context data that could keep paretns connections. ZeroMQ will 691 # hang on polling parents connections from the child process. 692 self.utils = salt.loader.utils(self.opts) 693 if self.opts["__role"] == "master": 694 self.functions = salt.loader.runner(self.opts, utils=self.utils) 695 else: 696 self.functions = salt.loader.minion_mods( 697 self.opts, proxy=self.proxy, utils=self.utils 698 ) 699 self.returners = salt.loader.returners( 700 self.opts, self.functions, proxy=self.proxy 701 ) 702 if jid is None: 703 jid = salt.utils.jid.gen_jid(self.opts) 704 ret = { 705 "id": self.opts.get("id", "master"), 706 "fun": func, 707 "fun_args": [], 708 "schedule": data["name"], 709 "jid": jid, 710 } 711 712 if "metadata" in data: 713 if isinstance(data["metadata"], dict): 714 ret["metadata"] = data["metadata"] 715 ret["metadata"]["_TOS"] = self.time_offset 716 ret["metadata"]["_TS"] = time.ctime() 717 ret["metadata"]["_TT"] = time.strftime( 718 "%Y %B %d %a %H %m", time.gmtime() 719 ) 720 else: 721 log.warning( 722 "schedule: The metadata parameter must be " 723 "specified as a dictionary. Ignoring." 724 ) 725 726 if multiprocessing_enabled: 727 # We just want to modify the process name if we're on a different process 728 salt.utils.process.appendproctitle( 729 "{} {}".format(self.__class__.__name__, ret["jid"]) 730 ) 731 data_returner = data.get("returner", None) 732 733 if not self.standalone: 734 proc_fn = os.path.join( 735 salt.minion.get_proc_dir(self.opts["cachedir"]), ret["jid"] 736 ) 737 738 # TODO: Make it readable! Splt to funcs, remove nested try-except-finally sections. 739 try: 740 741 minion_blackout_violation = False 742 if self.opts.get("pillar", {}).get("minion_blackout", False): 743 whitelist = self.opts.get("pillar", {}).get( 744 "minion_blackout_whitelist", [] 745 ) 746 # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist 747 if func != "saltutil.refresh_pillar" and func not in whitelist: 748 minion_blackout_violation = True 749 elif self.opts.get("grains", {}).get("minion_blackout", False): 750 whitelist = self.opts.get("grains", {}).get( 751 "minion_blackout_whitelist", [] 752 ) 753 if func != "saltutil.refresh_pillar" and func not in whitelist: 754 minion_blackout_violation = True 755 if minion_blackout_violation: 756 raise SaltInvocationError( 757 "Minion in blackout mode. Set 'minion_blackout' " 758 "to False in pillar or grains to resume operations. Only " 759 "saltutil.refresh_pillar allowed in blackout mode." 760 ) 761 762 ret["pid"] = os.getpid() 763 764 args = tuple() 765 if "args" in data: 766 args = copy.deepcopy(data["args"]) 767 ret["fun_args"].extend(data["args"]) 768 769 kwargs = {} 770 if "kwargs" in data: 771 kwargs = copy.deepcopy(data["kwargs"]) 772 ret["fun_args"].append(copy.deepcopy(kwargs)) 773 774 if func not in self.functions: 775 ret["return"] = self.functions.missing_fun_string(func) 776 salt.utils.error.raise_error( 777 message=self.functions.missing_fun_string(func) 778 ) 779 780 if not self.standalone: 781 if "jid_include" not in data or data["jid_include"]: 782 log.debug( 783 "schedule.handle_func: adding this job to the " 784 "jobcache with data %s", 785 ret, 786 ) 787 # write this to /var/cache/salt/minion/proc 788 with salt.utils.files.fopen(proc_fn, "w+b") as fp_: 789 fp_.write(salt.payload.dumps(ret)) 790 791 # if the func support **kwargs, lets pack in the pub data we have 792 # TODO: pack the *same* pub data as a minion? 793 argspec = salt.utils.args.get_function_argspec(self.functions[func]) 794 if argspec.keywords: 795 # this function accepts **kwargs, pack in the publish data 796 for key, val in ret.items(): 797 if key != "kwargs": 798 kwargs["__pub_{}".format(key)] = copy.deepcopy(val) 799 800 # Only include these when running runner modules 801 if self.opts["__role"] == "master": 802 jid = salt.utils.jid.gen_jid(self.opts) 803 tag = salt.utils.event.tagify(jid, prefix="salt/scheduler/") 804 805 namespaced_event = salt.utils.event.NamespacedEvent( 806 salt.utils.event.get_event( 807 self.opts["__role"], 808 self.opts["sock_dir"], 809 self.opts["transport"], 810 opts=self.opts, 811 listen=False, 812 ), 813 tag, 814 print_func=None, 815 ) 816 817 func_globals = { 818 "__jid__": jid, 819 "__user__": salt.utils.user.get_user(), 820 "__tag__": tag, 821 "__jid_event__": weakref.proxy(namespaced_event), 822 } 823 self_functions = copy.copy(self.functions) 824 salt.utils.lazy.verify_fun(self_functions, func) 825 826 # Inject some useful globals to *all* the function's global 827 # namespace only once per module-- not per func 828 completed_funcs = [] 829 830 for mod_name in self_functions.keys(): 831 if "." not in mod_name: 832 continue 833 mod, _ = mod_name.split(".", 1) 834 if mod in completed_funcs: 835 continue 836 completed_funcs.append(mod) 837 for global_key, value in func_globals.items(): 838 self.functions[mod_name].__globals__[global_key] = value 839 840 self.functions.pack["__context__"]["retcode"] = 0 841 842 ret["return"] = self.functions[func](*args, **kwargs) 843 844 if not self.standalone: 845 # runners do not provide retcode 846 if "retcode" in self.functions.pack["__context__"]: 847 ret["retcode"] = self.functions.pack["__context__"]["retcode"] 848 849 ret["success"] = True 850 851 if data_returner or self.schedule_returner: 852 if "return_config" in data: 853 ret["ret_config"] = data["return_config"] 854 if "return_kwargs" in data: 855 ret["ret_kwargs"] = data["return_kwargs"] 856 rets = [] 857 for returner in [data_returner, self.schedule_returner]: 858 if isinstance(returner, str): 859 rets.append(returner) 860 elif isinstance(returner, list): 861 rets.extend(returner) 862 # simple de-duplication with order retained 863 for returner in OrderedDict.fromkeys(rets): 864 ret_str = "{}.returner".format(returner) 865 if ret_str in self.returners: 866 self.returners[ret_str](ret) 867 else: 868 log.info( 869 "Job %s using invalid returner: %s. Ignoring.", 870 func, 871 returner, 872 ) 873 874 except Exception: # pylint: disable=broad-except 875 log.exception("Unhandled exception running %s", ret["fun"]) 876 # Although catch-all exception handlers are bad, the exception here 877 # is to let the exception bubble up to the top of the thread context, 878 # where the thread will die silently, which is worse. 879 if "return" not in ret: 880 ret["return"] = "Unhandled exception running {}".format(ret["fun"]) 881 ret["success"] = False 882 ret["retcode"] = 254 883 finally: 884 # Only attempt to return data to the master if the scheduled job is running 885 # on a master itself or a minion. 886 if "__role" in self.opts and self.opts["__role"] in ("master", "minion"): 887 # The 'return_job' option is enabled by default even if not set 888 if "return_job" in data and not data["return_job"]: 889 pass 890 else: 891 # Send back to master so the job is included in the job list 892 mret = ret.copy() 893 # No returners defined, so we're only sending back to the master 894 if not data_returner and not self.schedule_returner: 895 mret["jid"] = "req" 896 if data.get("return_job") == "nocache": 897 # overwrite 'req' to signal to master that 898 # this job shouldn't be stored 899 mret["jid"] = "nocache" 900 load = {"cmd": "_return", "id": self.opts["id"]} 901 for key, value in mret.items(): 902 load[key] = value 903 904 if "__role" in self.opts and self.opts["__role"] == "minion": 905 event = salt.utils.event.get_event( 906 "minion", opts=self.opts, listen=False 907 ) 908 elif "__role" in self.opts and self.opts["__role"] == "master": 909 event = salt.utils.event.get_master_event( 910 self.opts, self.opts["sock_dir"] 911 ) 912 try: 913 event.fire_event(load, "__schedule_return") 914 except Exception as exc: # pylint: disable=broad-except 915 log.exception( 916 "Unhandled exception firing __schedule_return event" 917 ) 918 finally: 919 event.destroy() 920 921 if self.opts["__role"] == "master": 922 namespaced_event.destroy() 923 924 if not self.standalone: 925 log.debug("schedule.handle_func: Removing %s", proc_fn) 926 927 try: 928 os.unlink(proc_fn) 929 except OSError as exc: 930 if exc.errno == errno.EEXIST or exc.errno == errno.ENOENT: 931 # EEXIST and ENOENT are OK because the file is gone and that's what 932 # we wanted 933 pass 934 else: 935 log.error("Failed to delete '%s': %s", proc_fn, exc.errno) 936 # Otherwise, failing to delete this file is not something 937 # we can cleanly handle. 938 raise 939 finally: 940 if multiprocessing_enabled: 941 # Let's make sure we exit the process! 942 sys.exit(salt.defaults.exitcodes.EX_GENERIC) 943 944 def eval(self, now=None): 945 """ 946 Evaluate and execute the schedule 947 948 :param datetime now: Override current time with a datetime object instance`` 949 950 """ 951 952 log.trace("==== evaluating schedule now %s =====", now) 953 954 jids = [] 955 loop_interval = self.opts["loop_interval"] 956 if not isinstance(loop_interval, datetime.timedelta): 957 loop_interval = datetime.timedelta(seconds=loop_interval) 958 959 def _splay(splaytime): 960 """ 961 Calculate splaytime 962 """ 963 splay_ = None 964 if isinstance(splaytime, dict): 965 if splaytime["end"] >= splaytime["start"]: 966 splay_ = random.randint(splaytime["start"], splaytime["end"]) 967 else: 968 log.error( 969 "schedule.handle_func: Invalid Splay, " 970 "end must be larger than start. Ignoring splay." 971 ) 972 else: 973 splay_ = random.randint(1, splaytime) 974 return splay_ 975 976 def _handle_time_elements(data): 977 """ 978 Handle schedule item with time elements 979 seconds, minutes, hours, days 980 """ 981 if "_seconds" not in data: 982 interval = int(data.get("seconds", 0)) 983 interval += int(data.get("minutes", 0)) * 60 984 interval += int(data.get("hours", 0)) * 3600 985 interval += int(data.get("days", 0)) * 86400 986 987 data["_seconds"] = interval 988 989 if not data["_next_fire_time"]: 990 data["_next_fire_time"] = now + datetime.timedelta( 991 seconds=data["_seconds"] 992 ) 993 994 if interval < self.loop_interval: 995 self.loop_interval = interval 996 997 data["_next_scheduled_fire_time"] = now + datetime.timedelta( 998 seconds=data["_seconds"] 999 ) 1000 1001 def _handle_once(data, loop_interval): 1002 """ 1003 Handle schedule item with once 1004 """ 1005 if data["_next_fire_time"]: 1006 if ( 1007 data["_next_fire_time"] < now - loop_interval 1008 or data["_next_fire_time"] > now 1009 and not data["_splay"] 1010 ): 1011 data["_continue"] = True 1012 1013 if not data["_next_fire_time"] and not data["_splay"]: 1014 once = data["once"] 1015 if not isinstance(once, datetime.datetime): 1016 once_fmt = data.get("once_fmt", "%Y-%m-%dT%H:%M:%S") 1017 try: 1018 once = datetime.datetime.strptime(data["once"], once_fmt) 1019 except (TypeError, ValueError): 1020 data["_error"] = ( 1021 "Date string could not " 1022 "be parsed: {}, {}. " 1023 "Ignoring job {}.".format( 1024 data["once"], once_fmt, data["name"] 1025 ) 1026 ) 1027 log.error(data["_error"]) 1028 return 1029 data["_next_fire_time"] = once 1030 data["_next_scheduled_fire_time"] = once 1031 # If _next_fire_time is less than now, continue 1032 if once < now - loop_interval: 1033 data["_continue"] = True 1034 1035 def _handle_when(data, loop_interval): 1036 """ 1037 Handle schedule item with when 1038 """ 1039 if not _WHEN_SUPPORTED: 1040 data["_error"] = "Missing python-dateutil. Ignoring job {}.".format( 1041 data["name"] 1042 ) 1043 log.error(data["_error"]) 1044 return 1045 1046 if not isinstance(data["when"], list): 1047 _when_data = [data["when"]] 1048 else: 1049 _when_data = data["when"] 1050 1051 _when = [] 1052 for i in _when_data: 1053 if ( 1054 "pillar" in self.opts 1055 and "whens" in self.opts["pillar"] 1056 and i in self.opts["pillar"]["whens"] 1057 ): 1058 if not isinstance(self.opts["pillar"]["whens"], dict): 1059 data["_error"] = ( 1060 'Pillar item "whens" ' 1061 "must be a dict. " 1062 "Ignoring job {}.".format(data["name"]) 1063 ) 1064 log.error(data["_error"]) 1065 return 1066 when_ = self.opts["pillar"]["whens"][i] 1067 elif ( 1068 "whens" in self.opts["grains"] and i in self.opts["grains"]["whens"] 1069 ): 1070 if not isinstance(self.opts["grains"]["whens"], dict): 1071 data[ 1072 "_error" 1073 ] = 'Grain "whens" must be a dict. Ignoring job {}.'.format( 1074 data["name"] 1075 ) 1076 log.error(data["_error"]) 1077 return 1078 when_ = self.opts["grains"]["whens"][i] 1079 else: 1080 when_ = i 1081 1082 if not isinstance(when_, datetime.datetime): 1083 try: 1084 when_ = dateutil_parser.parse(when_) 1085 except ValueError: 1086 data[ 1087 "_error" 1088 ] = "Invalid date string {}. Ignoring job {}.".format( 1089 i, data["name"] 1090 ) 1091 log.error(data["_error"]) 1092 return 1093 1094 _when.append(when_) 1095 1096 if data["_splay"]: 1097 _when.append(data["_splay"]) 1098 1099 # Sort the list of "whens" from earlier to later schedules 1100 _when.sort() 1101 1102 # Copy the list so we can loop through it 1103 for i in copy.deepcopy(_when): 1104 if len(_when) > 1: 1105 if i < now - loop_interval: 1106 # Remove all missed schedules except the latest one. 1107 # We need it to detect if it was triggered previously. 1108 _when.remove(i) 1109 1110 if _when: 1111 # Grab the first element, which is the next run time or 1112 # last scheduled time in the past. 1113 when = _when[0] 1114 1115 if ( 1116 when < now - loop_interval 1117 and not data.get("_run", False) 1118 and not run 1119 and not data["_splay"] 1120 ): 1121 data["_next_fire_time"] = None 1122 data["_continue"] = True 1123 return 1124 1125 if "_run" not in data: 1126 # Prevent run of jobs from the past 1127 data["_run"] = bool(when >= now - loop_interval) 1128 1129 if not data["_next_fire_time"]: 1130 data["_next_fire_time"] = when 1131 1132 data["_next_scheduled_fire_time"] = when 1133 1134 if data["_next_fire_time"] < when and not run and not data["_run"]: 1135 data["_next_fire_time"] = when 1136 data["_run"] = True 1137 1138 elif not data.get("_run", False): 1139 data["_next_fire_time"] = None 1140 data["_continue"] = True 1141 1142 def _handle_cron(data, loop_interval): 1143 """ 1144 Handle schedule item with cron 1145 """ 1146 if not _CRON_SUPPORTED: 1147 data["_error"] = "Missing python-croniter. Ignoring job {}.".format( 1148 data["name"] 1149 ) 1150 log.error(data["_error"]) 1151 return 1152 1153 if data["_next_fire_time"] is None: 1154 # Get next time frame for a "cron" job if it has been never 1155 # executed before or already executed in the past. 1156 try: 1157 data["_next_fire_time"] = croniter.croniter( 1158 data["cron"], now 1159 ).get_next(datetime.datetime) 1160 data["_next_scheduled_fire_time"] = croniter.croniter( 1161 data["cron"], now 1162 ).get_next(datetime.datetime) 1163 except (ValueError, KeyError): 1164 data["_error"] = "Invalid cron string. Ignoring job {}.".format( 1165 data["name"] 1166 ) 1167 log.error(data["_error"]) 1168 return 1169 1170 # If next job run is scheduled more than 1 minute ahead and 1171 # configured loop interval is longer than that, we should 1172 # shorten it to get our job executed closer to the beginning 1173 # of desired time. 1174 interval = (now - data["_next_fire_time"]).total_seconds() 1175 if interval >= 60 and interval < self.loop_interval: 1176 self.loop_interval = interval 1177 1178 def _handle_run_explicit(data, loop_interval): 1179 """ 1180 Handle schedule item with run_explicit 1181 """ 1182 _run_explicit = [] 1183 for _run_time in data["run_explicit"]: 1184 if isinstance(_run_time, datetime.datetime): 1185 _run_explicit.append(_run_time) 1186 else: 1187 _run_explicit.append( 1188 datetime.datetime.strptime( 1189 _run_time["time"], _run_time["time_fmt"] 1190 ) 1191 ) 1192 data["run"] = False 1193 1194 # Copy the list so we can loop through it 1195 for i in copy.deepcopy(_run_explicit): 1196 if len(_run_explicit) > 1: 1197 if i < now - loop_interval: 1198 _run_explicit.remove(i) 1199 1200 if _run_explicit: 1201 if _run_explicit[0] <= now < _run_explicit[0] + loop_interval: 1202 data["run"] = True 1203 data["_next_fire_time"] = _run_explicit[0] 1204 1205 def _handle_skip_explicit(data, loop_interval): 1206 """ 1207 Handle schedule item with skip_explicit 1208 """ 1209 data["run"] = False 1210 1211 _skip_explicit = [] 1212 for _skip_time in data["skip_explicit"]: 1213 if isinstance(_skip_time, datetime.datetime): 1214 _skip_explicit.append(_skip_time) 1215 else: 1216 _skip_explicit.append( 1217 datetime.datetime.strptime( 1218 _skip_time["time"], _skip_time["time_fmt"] 1219 ) 1220 ) 1221 1222 # Copy the list so we can loop through it 1223 for i in copy.deepcopy(_skip_explicit): 1224 if i < now - loop_interval: 1225 _skip_explicit.remove(i) 1226 1227 if _skip_explicit: 1228 if _skip_explicit[0] <= now <= (_skip_explicit[0] + loop_interval): 1229 if self.skip_function: 1230 data["run"] = True 1231 data["func"] = self.skip_function 1232 else: 1233 data["_skip_reason"] = "skip_explicit" 1234 data["_skipped_time"] = now 1235 data["_skipped"] = True 1236 data["run"] = False 1237 else: 1238 data["run"] = True 1239 1240 def _handle_skip_during_range(data, loop_interval): 1241 """ 1242 Handle schedule item with skip_explicit 1243 """ 1244 if not _RANGE_SUPPORTED: 1245 data["_error"] = "Missing python-dateutil. Ignoring job {}.".format( 1246 data["name"] 1247 ) 1248 log.error(data["_error"]) 1249 return 1250 1251 if not isinstance(data["skip_during_range"], dict): 1252 data["_error"] = ( 1253 "schedule.handle_func: Invalid, range " 1254 "must be specified as a dictionary. " 1255 "Ignoring job {}.".format(data["name"]) 1256 ) 1257 log.error(data["_error"]) 1258 return 1259 1260 start = data["skip_during_range"]["start"] 1261 end = data["skip_during_range"]["end"] 1262 if not isinstance(start, datetime.datetime): 1263 try: 1264 start = dateutil_parser.parse(start) 1265 except ValueError: 1266 data["_error"] = ( 1267 "Invalid date string for start in " 1268 "skip_during_range. Ignoring " 1269 "job {}.".format(data["name"]) 1270 ) 1271 log.error(data["_error"]) 1272 return 1273 1274 if not isinstance(end, datetime.datetime): 1275 try: 1276 end = dateutil_parser.parse(end) 1277 except ValueError: 1278 data["_error"] = ( 1279 "Invalid date string for end in " 1280 "skip_during_range. Ignoring " 1281 "job {}.".format(data["name"]) 1282 ) 1283 log.error(data["_error"]) 1284 return 1285 1286 # Check to see if we should run the job immediately 1287 # after the skip_during_range is over 1288 if "run_after_skip_range" in data and data["run_after_skip_range"]: 1289 if "run_explicit" not in data: 1290 data["run_explicit"] = [] 1291 # Add a run_explicit for immediately after the 1292 # skip_during_range ends 1293 _run_immediate = (end + loop_interval).strftime("%Y-%m-%dT%H:%M:%S") 1294 if _run_immediate not in data["run_explicit"]: 1295 data["run_explicit"].append( 1296 {"time": _run_immediate, "time_fmt": "%Y-%m-%dT%H:%M:%S"} 1297 ) 1298 1299 if end > start: 1300 if start <= now <= end: 1301 if self.skip_function: 1302 data["run"] = True 1303 data["func"] = self.skip_function 1304 else: 1305 data["_skip_reason"] = "in_skip_range" 1306 data["_skipped_time"] = now 1307 data["_skipped"] = True 1308 data["run"] = False 1309 else: 1310 data["run"] = True 1311 else: 1312 data["_error"] = ( 1313 "schedule.handle_func: Invalid " 1314 "range, end must be larger than " 1315 "start. Ignoring job {}.".format(data["name"]) 1316 ) 1317 log.error(data["_error"]) 1318 1319 def _handle_range(data): 1320 """ 1321 Handle schedule item with skip_explicit 1322 """ 1323 if not _RANGE_SUPPORTED: 1324 data["_error"] = "Missing python-dateutil. Ignoring job {}".format( 1325 data["name"] 1326 ) 1327 log.error(data["_error"]) 1328 return 1329 1330 if not isinstance(data["range"], dict): 1331 data["_error"] = ( 1332 "schedule.handle_func: Invalid, range " 1333 "must be specified as a dictionary." 1334 "Ignoring job {}.".format(data["name"]) 1335 ) 1336 log.error(data["_error"]) 1337 return 1338 1339 start = data["range"]["start"] 1340 end = data["range"]["end"] 1341 if not isinstance(start, datetime.datetime): 1342 try: 1343 start = dateutil_parser.parse(start) 1344 except ValueError: 1345 data[ 1346 "_error" 1347 ] = "Invalid date string for start. Ignoring job {}.".format( 1348 data["name"] 1349 ) 1350 log.error(data["_error"]) 1351 return 1352 1353 if not isinstance(end, datetime.datetime): 1354 try: 1355 end = dateutil_parser.parse(end) 1356 except ValueError: 1357 data[ 1358 "_error" 1359 ] = "Invalid date string for end. Ignoring job {}.".format( 1360 data["name"] 1361 ) 1362 log.error(data["_error"]) 1363 return 1364 1365 if end > start: 1366 if "invert" in data["range"] and data["range"]["invert"]: 1367 if now <= start or now >= end: 1368 data["run"] = True 1369 else: 1370 data["_skip_reason"] = "in_skip_range" 1371 data["run"] = False 1372 else: 1373 if start <= now <= end: 1374 data["run"] = True 1375 else: 1376 if self.skip_function: 1377 data["run"] = True 1378 data["func"] = self.skip_function 1379 else: 1380 data["_skip_reason"] = "not_in_range" 1381 data["run"] = False 1382 else: 1383 data["_error"] = ( 1384 "schedule.handle_func: Invalid " 1385 "range, end must be larger " 1386 "than start. Ignoring job {}.".format(data["name"]) 1387 ) 1388 log.error(data["_error"]) 1389 1390 def _handle_after(data): 1391 """ 1392 Handle schedule item with after 1393 """ 1394 if not _WHEN_SUPPORTED: 1395 data["_error"] = "Missing python-dateutil. Ignoring job {}".format( 1396 data["name"] 1397 ) 1398 log.error(data["_error"]) 1399 return 1400 1401 after = data["after"] 1402 if not isinstance(after, datetime.datetime): 1403 after = dateutil_parser.parse(after) 1404 1405 if after >= now: 1406 log.debug("After time has not passed skipping job: %s.", data["name"]) 1407 data["_skip_reason"] = "after_not_passed" 1408 data["_skipped_time"] = now 1409 data["_skipped"] = True 1410 data["run"] = False 1411 else: 1412 data["run"] = True 1413 1414 def _handle_until(data): 1415 """ 1416 Handle schedule item with until 1417 """ 1418 if not _WHEN_SUPPORTED: 1419 data["_error"] = "Missing python-dateutil. Ignoring job {}".format( 1420 data["name"] 1421 ) 1422 log.error(data["_error"]) 1423 return 1424 1425 until = data["until"] 1426 if not isinstance(until, datetime.datetime): 1427 until = dateutil_parser.parse(until) 1428 1429 if until <= now: 1430 log.debug("Until time has passed skipping job: %s.", data["name"]) 1431 data["_skip_reason"] = "until_passed" 1432 data["_skipped_time"] = now 1433 data["_skipped"] = True 1434 data["run"] = False 1435 else: 1436 data["run"] = True 1437 1438 def _chop_ms(dt): 1439 """ 1440 Remove the microseconds from a datetime object 1441 """ 1442 return dt - datetime.timedelta(microseconds=dt.microsecond) 1443 1444 schedule = self._get_schedule() 1445 if not isinstance(schedule, dict): 1446 raise ValueError("Schedule must be of type dict.") 1447 if "skip_function" in schedule: 1448 self.skip_function = schedule["skip_function"] 1449 if "skip_during_range" in schedule: 1450 self.skip_during_range = schedule["skip_during_range"] 1451 if "enabled" in schedule: 1452 self.enabled = schedule["enabled"] 1453 if "splay" in schedule: 1454 self.splay = schedule["splay"] 1455 1456 _hidden = ["enabled", "skip_function", "skip_during_range", "splay"] 1457 for job, data in schedule.items(): 1458 1459 # Skip anything that is a global setting 1460 if job in _hidden: 1461 continue 1462 1463 # Clear these out between runs 1464 for item in [ 1465 "_continue", 1466 "_error", 1467 "_enabled", 1468 "_skipped", 1469 "_skip_reason", 1470 "_skipped_time", 1471 ]: 1472 if item in data: 1473 del data[item] 1474 run = False 1475 1476 if "name" in data: 1477 job_name = data["name"] 1478 else: 1479 job_name = data["name"] = job 1480 1481 if not isinstance(data, dict): 1482 log.error( 1483 'Scheduled job "%s" should have a dict value, not %s', 1484 job_name, 1485 type(data), 1486 ) 1487 continue 1488 1489 if "function" in data: 1490 func = data["function"] 1491 elif "func" in data: 1492 func = data["func"] 1493 elif "fun" in data: 1494 func = data["fun"] 1495 else: 1496 func = None 1497 1498 if func not in self.functions: 1499 log.info("Invalid function: %s in scheduled job %s.", func, job_name) 1500 1501 if "_next_fire_time" not in data: 1502 data["_next_fire_time"] = None 1503 1504 if "_splay" not in data: 1505 data["_splay"] = None 1506 1507 if ( 1508 "run_on_start" in data 1509 and data["run_on_start"] 1510 and "_run_on_start" not in data 1511 ): 1512 data["_run_on_start"] = True 1513 1514 if not now: 1515 now = datetime.datetime.now() 1516 1517 # Used for quick lookups when detecting invalid option 1518 # combinations. 1519 schedule_keys = set(data.keys()) 1520 1521 time_elements = ("seconds", "minutes", "hours", "days") 1522 scheduling_elements = ("when", "cron", "once") 1523 1524 invalid_sched_combos = [ 1525 set(i) for i in itertools.combinations(scheduling_elements, 2) 1526 ] 1527 1528 if any(i <= schedule_keys for i in invalid_sched_combos): 1529 log.error( 1530 'Unable to use "%s" options together. Ignoring.', 1531 '", "'.join(scheduling_elements), 1532 ) 1533 continue 1534 1535 invalid_time_combos = [] 1536 for item in scheduling_elements: 1537 all_items = itertools.chain([item], time_elements) 1538 invalid_time_combos.append(set(itertools.combinations(all_items, 2))) 1539 1540 if any(set(x) <= schedule_keys for x in invalid_time_combos): 1541 log.error( 1542 'Unable to use "%s" with "%s" options. Ignoring', 1543 '", "'.join(time_elements), 1544 '", "'.join(scheduling_elements), 1545 ) 1546 continue 1547 1548 if "run_explicit" in data: 1549 _handle_run_explicit(data, loop_interval) 1550 run = data["run"] 1551 1552 if True in [True for item in time_elements if item in data]: 1553 _handle_time_elements(data) 1554 elif "once" in data: 1555 _handle_once(data, loop_interval) 1556 elif "when" in data: 1557 _handle_when(data, loop_interval) 1558 elif "cron" in data: 1559 _handle_cron(data, loop_interval) 1560 else: 1561 continue 1562 1563 # Something told us to continue, so we continue 1564 if "_continue" in data and data["_continue"]: 1565 continue 1566 1567 # An error occurred so we bail out 1568 if "_error" in data and data["_error"]: 1569 continue 1570 1571 seconds = int( 1572 (_chop_ms(data["_next_fire_time"]) - _chop_ms(now)).total_seconds() 1573 ) 1574 1575 # If there is no job specific splay available, 1576 # grab the global which defaults to None. 1577 if "splay" not in data: 1578 data["splay"] = self.splay 1579 1580 if "splay" in data and data["splay"]: 1581 # Got "splay" configured, make decision to run a job based on that 1582 if not data["_splay"]: 1583 # Try to add "splay" time only if next job fire time is 1584 # still in the future. We should trigger job run 1585 # immediately otherwise. 1586 splay = _splay(data["splay"]) 1587 if now < data["_next_fire_time"] + datetime.timedelta( 1588 seconds=splay 1589 ): 1590 log.debug( 1591 "schedule.handle_func: Adding splay of " 1592 "%s seconds to next run.", 1593 splay, 1594 ) 1595 data["_splay"] = data["_next_fire_time"] + datetime.timedelta( 1596 seconds=splay 1597 ) 1598 if "when" in data: 1599 data["_run"] = True 1600 else: 1601 run = True 1602 1603 if data["_splay"]: 1604 # The "splay" configuration has been already processed, just use it 1605 seconds = (data["_splay"] - now).total_seconds() 1606 if "when" in data: 1607 data["_next_fire_time"] = data["_splay"] 1608 1609 if "_seconds" in data: 1610 if seconds <= 0: 1611 run = True 1612 elif "when" in data and data["_run"]: 1613 if ( 1614 data["_next_fire_time"] 1615 <= now 1616 <= (data["_next_fire_time"] + loop_interval) 1617 ): 1618 data["_run"] = False 1619 run = True 1620 elif "cron" in data: 1621 # Reset next scheduled time because it is in the past now, 1622 # and we should trigger the job run, then wait for the next one. 1623 if seconds <= 0: 1624 data["_next_fire_time"] = None 1625 run = True 1626 elif "once" in data: 1627 if ( 1628 data["_next_fire_time"] 1629 <= now 1630 <= (data["_next_fire_time"] + loop_interval) 1631 ): 1632 run = True 1633 elif seconds == 0: 1634 run = True 1635 1636 if "_run_on_start" in data and data["_run_on_start"]: 1637 run = True 1638 data["_run_on_start"] = False 1639 elif run: 1640 if "range" in data: 1641 _handle_range(data) 1642 1643 # An error occurred so we bail out 1644 if "_error" in data and data["_error"]: 1645 continue 1646 1647 run = data["run"] 1648 # Override the functiton if passed back 1649 if "func" in data: 1650 func = data["func"] 1651 1652 # If there is no job specific skip_during_range available, 1653 # grab the global which defaults to None. 1654 if "skip_during_range" not in data and self.skip_during_range: 1655 data["skip_during_range"] = self.skip_during_range 1656 1657 if "skip_during_range" in data and data["skip_during_range"]: 1658 _handle_skip_during_range(data, loop_interval) 1659 1660 # An error occurred so we bail out 1661 if "_error" in data and data["_error"]: 1662 continue 1663 1664 run = data["run"] 1665 # Override the functiton if passed back 1666 if "func" in data: 1667 func = data["func"] 1668 1669 if "skip_explicit" in data: 1670 _handle_skip_explicit(data, loop_interval) 1671 1672 # An error occurred so we bail out 1673 if "_error" in data and data["_error"]: 1674 continue 1675 1676 run = data["run"] 1677 # Override the functiton if passed back 1678 if "func" in data: 1679 func = data["func"] 1680 1681 if "until" in data: 1682 _handle_until(data) 1683 1684 # An error occurred so we bail out 1685 if "_error" in data and data["_error"]: 1686 continue 1687 1688 run = data["run"] 1689 1690 if "after" in data: 1691 _handle_after(data) 1692 1693 # An error occurred so we bail out 1694 if "_error" in data and data["_error"]: 1695 continue 1696 1697 run = data["run"] 1698 1699 # If the job item has continue, then we set run to False 1700 # so the job does not run but we still get the important 1701 # information calculated, eg. _next_fire_time 1702 if "_continue" in data and data["_continue"]: 1703 run = False 1704 1705 # If globally disabled or job 1706 # is diabled skip the job 1707 if not self.enabled or not data.get("enabled", True): 1708 log.trace("Job: %s is disabled", job_name) 1709 data["_skip_reason"] = "disabled" 1710 data["_skipped_time"] = now 1711 data["_skipped"] = True 1712 run = False 1713 1714 miss_msg = "" 1715 if seconds < 0: 1716 miss_msg = " (runtime missed by {} seconds)".format(abs(seconds)) 1717 1718 try: 1719 if run: 1720 if "jid_include" not in data or data["jid_include"]: 1721 data["jid_include"] = True 1722 log.debug( 1723 "schedule: Job %s was scheduled with jid_include, " 1724 "adding to cache (jid_include defaults to True)", 1725 job_name, 1726 ) 1727 if "maxrunning" in data: 1728 log.debug( 1729 "schedule: Job %s was scheduled with a max " 1730 "number of %s", 1731 job_name, 1732 data["maxrunning"], 1733 ) 1734 else: 1735 log.info( 1736 "schedule: maxrunning parameter was not specified for " 1737 "job %s, defaulting to 1.", 1738 job_name, 1739 ) 1740 data["maxrunning"] = 1 1741 1742 if not self.standalone: 1743 data["run"] = run 1744 data = self._check_max_running(func, data, self.opts, now) 1745 run = data["run"] 1746 1747 # Check run again, just in case _check_max_running 1748 # set run to False 1749 if run: 1750 jid = salt.utils.jid.gen_jid(self.opts) 1751 jids.append(jid) 1752 log.info( 1753 "Running scheduled job: %s%s with jid %s", 1754 job_name, 1755 miss_msg, 1756 jid, 1757 ) 1758 self._run_job(func, data, jid=jid) 1759 1760 finally: 1761 # Only set _last_run if the job ran 1762 if run: 1763 data["_last_run"] = now 1764 data["_splay"] = None 1765 if "_seconds" in data: 1766 if self.standalone: 1767 data["_next_fire_time"] = now + datetime.timedelta( 1768 seconds=data["_seconds"] 1769 ) 1770 elif "_skipped" in data and data["_skipped"]: 1771 data["_next_fire_time"] = now + datetime.timedelta( 1772 seconds=data["_seconds"] 1773 ) 1774 elif run: 1775 data["_next_fire_time"] = now + datetime.timedelta( 1776 seconds=data["_seconds"] 1777 ) 1778 return jids 1779 1780 def _run_job(self, func, data, jid=None): 1781 job_dry_run = data.get("dry_run", False) 1782 if job_dry_run: 1783 log.debug("Job %s has 'dry_run' set to True. Not running it.", data["name"]) 1784 return 1785 1786 multiprocessing_enabled = self.opts.get("multiprocessing", True) 1787 run_schedule_jobs_in_background = self.opts.get( 1788 "run_schedule_jobs_in_background", True 1789 ) 1790 1791 if run_schedule_jobs_in_background is False: 1792 # Explicitly pass False for multiprocessing_enabled 1793 self.handle_func(False, func, data, jid) 1794 return 1795 1796 if multiprocessing_enabled and salt.utils.platform.is_windows(): 1797 # Temporarily stash our function references. 1798 # You can't pickle function references, and pickling is 1799 # required when spawning new processes on Windows. 1800 functions = self.functions 1801 self.functions = {} 1802 returners = self.returners 1803 self.returners = {} 1804 utils = self.utils 1805 self.utils = {} 1806 1807 try: 1808 if multiprocessing_enabled: 1809 thread_cls = salt.utils.process.SignalHandlingProcess 1810 else: 1811 thread_cls = threading.Thread 1812 1813 if multiprocessing_enabled: 1814 with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM): 1815 proc = thread_cls( 1816 target=self.handle_func, 1817 args=(multiprocessing_enabled, func, data, jid), 1818 ) 1819 # Reset current signals before starting the process in 1820 # order not to inherit the current signal handlers 1821 proc.start() 1822 proc.name = "{}-Schedule-{}".format(proc.name, data["name"]) 1823 self._subprocess_list.add(proc) 1824 else: 1825 proc = thread_cls( 1826 target=self.handle_func, 1827 args=(multiprocessing_enabled, func, data, jid), 1828 ) 1829 proc.start() 1830 proc.name = "{}-Schedule-{}".format(proc.name, data["name"]) 1831 self._subprocess_list.add(proc) 1832 finally: 1833 if multiprocessing_enabled and salt.utils.platform.is_windows(): 1834 # Restore our function references. 1835 self.functions = functions 1836 self.returners = returners 1837 self.utils = utils 1838 1839 def cleanup_subprocesses(self): 1840 self._subprocess_list.cleanup() 1841 1842 1843def clean_proc_dir(opts): 1844 1845 """ 1846 Loop through jid files in the minion proc directory (default /var/cache/salt/minion/proc) 1847 and remove any that refer to processes that no longer exist 1848 """ 1849 1850 for basefilename in os.listdir(salt.minion.get_proc_dir(opts["cachedir"])): 1851 fn_ = os.path.join(salt.minion.get_proc_dir(opts["cachedir"]), basefilename) 1852 with salt.utils.files.fopen(fn_, "rb") as fp_: 1853 job = None 1854 try: 1855 job = salt.payload.load(fp_) 1856 except Exception: # pylint: disable=broad-except 1857 # It's corrupted 1858 # Windows cannot delete an open file 1859 if salt.utils.platform.is_windows(): 1860 fp_.close() 1861 try: 1862 os.unlink(fn_) 1863 continue 1864 except OSError: 1865 continue 1866 log.debug( 1867 "schedule.clean_proc_dir: checking job %s for process existence", job 1868 ) 1869 if job is not None and "pid" in job: 1870 if salt.utils.process.os_is_running(job["pid"]): 1871 log.debug( 1872 "schedule.clean_proc_dir: Cleaning proc dir, pid %s " 1873 "still exists.", 1874 job["pid"], 1875 ) 1876 else: 1877 # Windows cannot delete an open file 1878 if salt.utils.platform.is_windows(): 1879 fp_.close() 1880 # Maybe the file is already gone 1881 try: 1882 os.unlink(fn_) 1883 except OSError: 1884 pass 1885