1#!/usr/bin/python3 -OO 2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org> 3# 4# This program is free software; you can redistribute it and/or 5# modify it under the terms of the GNU General Public License 6# as published by the Free Software Foundation; either version 2 7# of the License, or (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 17 18""" 19sabnzbd.scheduler - Event Scheduler 20""" 21 22import random 23import logging 24import time 25from typing import Optional 26 27import sabnzbd.utils.kronos as kronos 28import sabnzbd.rss 29import sabnzbd.downloader 30import sabnzbd.dirscanner 31import sabnzbd.misc 32import sabnzbd.config as config 33import sabnzbd.cfg as cfg 34from sabnzbd.filesystem import diskspace 35from sabnzbd.constants import LOW_PRIORITY, NORMAL_PRIORITY, HIGH_PRIORITY 36 37DAILY_RANGE = list(range(1, 8)) 38 39 40class Scheduler: 41 def __init__(self): 42 self.scheduler = kronos.ThreadedScheduler() 43 self.pause_end: Optional[float] = None # Moment when pause will end 44 self.resume_task: Optional[kronos.Task] = None 45 self.restart_scheduler = False 46 self.pp_pause_event = False 47 self.load_schedules() 48 49 def start(self): 50 """Start the scheduler""" 51 self.scheduler.start() 52 53 def stop(self): 54 """Stop the scheduler, destroy instance""" 55 logging.debug("Stopping scheduler") 56 self.scheduler.stop() 57 58 def restart(self, plan_restart=True): 59 """Stop and start scheduler""" 60 if plan_restart: 61 self.restart_scheduler = True 62 elif self.restart_scheduler: 63 logging.debug("Restarting scheduler") 64 self.restart_scheduler = False 65 self.scheduler.stop() 66 self.scheduler.start() 67 self.analyse(sabnzbd.Downloader.paused) 68 self.load_schedules() 69 70 def abort(self): 71 """Emergency stop, just set the running attribute false so we don't 72 have to wait the full scheduler-check cycle before it really stops""" 73 self.scheduler.running = False 74 75 def is_alive(self): 76 """Thread-like check if we are doing fine""" 77 if self.scheduler.thread: 78 return self.scheduler.thread.is_alive() 79 return False 80 81 def load_schedules(self): 82 rss_planned = False 83 84 for schedule in cfg.schedules(): 85 arguments = [] 86 argument_list = None 87 88 try: 89 enabled, m, h, d, action_name = schedule.split() 90 except: 91 try: 92 enabled, m, h, d, action_name, argument_list = schedule.split(None, 5) 93 except: 94 continue # Bad schedule, ignore 95 96 if argument_list: 97 arguments = argument_list.split() 98 99 action_name = action_name.lower() 100 try: 101 m = int(m) 102 h = int(h) 103 except: 104 logging.warning(T("Bad schedule %s at %s:%s"), action_name, m, h) 105 continue 106 107 if d.isdigit(): 108 d = [int(i) for i in d] 109 else: 110 d = DAILY_RANGE 111 112 if action_name == "resume": 113 action = self.scheduled_resume 114 arguments = [] 115 elif action_name == "pause": 116 action = sabnzbd.Downloader.pause 117 arguments = [] 118 elif action_name == "pause_all": 119 action = sabnzbd.pause_all 120 arguments = [] 121 elif action_name == "shutdown": 122 action = sabnzbd.shutdown_program 123 arguments = [] 124 elif action_name == "restart": 125 action = sabnzbd.restart_program 126 arguments = [] 127 elif action_name == "pause_post": 128 action = pp_pause 129 elif action_name == "resume_post": 130 action = pp_resume 131 elif action_name == "speedlimit" and arguments != []: 132 action = sabnzbd.Downloader.limit_speed 133 elif action_name == "enable_server" and arguments != []: 134 action = sabnzbd.enable_server 135 elif action_name == "disable_server" and arguments != []: 136 action = sabnzbd.disable_server 137 elif action_name == "scan_folder": 138 action = sabnzbd.DirScanner.scan 139 elif action_name == "rss_scan": 140 action = sabnzbd.RSSReader.run 141 rss_planned = True 142 elif action_name == "remove_failed": 143 action = sabnzbd.api.history_remove_failed 144 elif action_name == "remove_completed": 145 action = sabnzbd.api.history_remove_completed 146 elif action_name == "enable_quota": 147 action = sabnzbd.BPSMeter.set_status 148 arguments = [True] 149 elif action_name == "disable_quota": 150 action = sabnzbd.BPSMeter.set_status 151 arguments = [False] 152 elif action_name == "pause_all_low": 153 action = sabnzbd.NzbQueue.pause_on_prio 154 arguments = [LOW_PRIORITY] 155 elif action_name == "pause_all_normal": 156 action = sabnzbd.NzbQueue.pause_on_prio 157 arguments = [NORMAL_PRIORITY] 158 elif action_name == "pause_all_high": 159 action = sabnzbd.NzbQueue.pause_on_prio 160 arguments = [HIGH_PRIORITY] 161 elif action_name == "resume_all_low": 162 action = sabnzbd.NzbQueue.resume_on_prio 163 arguments = [LOW_PRIORITY] 164 elif action_name == "resume_all_normal": 165 action = sabnzbd.NzbQueue.resume_on_prio 166 arguments = [NORMAL_PRIORITY] 167 elif action_name == "resume_all_high": 168 action = sabnzbd.NzbQueue.resume_on_prio 169 arguments = [HIGH_PRIORITY] 170 elif action_name == "pause_cat": 171 action = sabnzbd.NzbQueue.pause_on_cat 172 arguments = [argument_list] 173 elif action_name == "resume_cat": 174 action = sabnzbd.NzbQueue.resume_on_cat 175 arguments = [argument_list] 176 else: 177 logging.warning(T("Unknown action: %s"), action_name) 178 continue 179 180 if enabled == "1": 181 logging.info("Scheduling %s(%s) on days %s at %02d:%02d", action_name, arguments, d, h, m) 182 self.scheduler.add_daytime_task(action, action_name, d, None, (h, m), args=arguments) 183 else: 184 logging.debug("Skipping %s(%s) on days %s at %02d:%02d", action_name, arguments, d, h, m) 185 186 # Set RSS check interval 187 if not rss_planned: 188 interval = cfg.rss_rate() 189 delay = random.randint(0, interval - 1) 190 logging.info("Scheduling RSS interval task every %s min (delay=%s)", interval, delay) 191 sabnzbd.RSSReader.next_run = time.time() + delay * 60 192 self.scheduler.add_interval_task(sabnzbd.RSSReader.run, "RSS", delay * 60, interval * 60) 193 self.scheduler.add_single_task(sabnzbd.RSSReader.run, "RSS", 15) 194 195 if cfg.version_check(): 196 # Check for new release, once per week on random time 197 m = random.randint(0, 59) 198 h = random.randint(0, 23) 199 d = (random.randint(1, 7),) 200 201 logging.info("Scheduling VersionCheck on day %s at %s:%s", d[0], h, m) 202 self.scheduler.add_daytime_task(sabnzbd.misc.check_latest_version, "VerCheck", d, None, (h, m)) 203 204 action, hour, minute = sabnzbd.BPSMeter.get_quota() 205 if action: 206 logging.info("Setting schedule for quota check daily at %s:%s", hour, minute) 207 self.scheduler.add_daytime_task(action, "quota_reset", DAILY_RANGE, None, (hour, minute)) 208 209 if sabnzbd.misc.int_conv(cfg.history_retention()) > 0: 210 logging.info("Setting schedule for midnight auto history-purge") 211 self.scheduler.add_daytime_task( 212 sabnzbd.database.midnight_history_purge, "midnight_history_purge", DAILY_RANGE, None, (0, 0) 213 ) 214 215 logging.info("Setting schedule for midnight BPS reset") 216 self.scheduler.add_daytime_task(sabnzbd.BPSMeter.update, "midnight_bps", DAILY_RANGE, None, (0, 0)) 217 218 logging.info("Setting schedule for server expiration check") 219 self.scheduler.add_daytime_task( 220 sabnzbd.downloader.check_server_expiration, "check_server_expiration", DAILY_RANGE, None, (0, 0) 221 ) 222 223 logging.info("Setting scheduler for server quota check") 224 self.scheduler.add_interval_task( 225 sabnzbd.downloader.check_server_quota, 226 "check_server_quota", 227 0, 228 10 * 60, 229 ) 230 231 # Subscribe to special schedule changes 232 cfg.rss_rate.callback(self.scheduler_restart_guard) 233 234 def analyse(self, was_paused=False, priority=None): 235 """Determine what pause/resume state we would have now. 236 'priority': evaluate only effect for given priority, return True for paused 237 """ 238 self.pp_pause_event = False 239 paused = None 240 paused_all = False 241 pause_post = False 242 pause_low = pause_normal = pause_high = False 243 speedlimit = None 244 quota = True 245 servers = {} 246 247 for ev in sort_schedules(all_events=True): 248 if priority is None: 249 logging.debug("Schedule check result = %s", ev) 250 251 # Skip if disabled 252 if ev[4] == "0": 253 continue 254 255 action = ev[1] 256 try: 257 value = ev[2] 258 except: 259 value = None 260 if action == "pause": 261 paused = True 262 elif action == "pause_all": 263 paused_all = True 264 self.pp_pause_event = True 265 elif action == "resume": 266 paused = False 267 paused_all = False 268 elif action == "pause_post": 269 pause_post = True 270 self.pp_pause_event = True 271 elif action == "resume_post": 272 pause_post = False 273 self.pp_pause_event = True 274 elif action == "speedlimit" and value is not None: 275 speedlimit = ev[2] 276 elif action == "pause_all_low": 277 pause_low = True 278 elif action == "pause_all_normal": 279 pause_normal = True 280 elif action == "pause_all_high": 281 pause_high = True 282 elif action == "resume_all_low": 283 pause_low = False 284 elif action == "resume_all_normal": 285 pause_normal = False 286 elif action == "resume_all_high": 287 pause_high = False 288 elif action == "enable_quota": 289 quota = True 290 elif action == "disable_quota": 291 quota = False 292 elif action == "enable_server": 293 try: 294 servers[value] = 1 295 except: 296 logging.warning(T("Schedule for non-existing server %s"), value) 297 elif action == "disable_server": 298 try: 299 servers[value] = 0 300 except: 301 logging.warning(T("Schedule for non-existing server %s"), value) 302 303 # Special case, a priority was passed, so evaluate only that and return state 304 if priority == LOW_PRIORITY: 305 return pause_low 306 if priority == NORMAL_PRIORITY: 307 return pause_normal 308 if priority == HIGH_PRIORITY: 309 return pause_high 310 if priority is not None: 311 return False 312 313 # Normal analysis 314 if not was_paused: 315 if paused_all: 316 sabnzbd.pause_all() 317 else: 318 sabnzbd.unpause_all() 319 sabnzbd.Downloader.set_paused_state(paused or paused_all) 320 321 sabnzbd.PostProcessor.paused = pause_post 322 if speedlimit is not None: 323 sabnzbd.Downloader.limit_speed(speedlimit) 324 325 sabnzbd.BPSMeter.set_status(quota, action=False) 326 327 for serv in servers: 328 try: 329 item = config.get_config("servers", serv) 330 value = servers[serv] 331 if bool(item.enable()) != bool(value): 332 item.enable.set(value) 333 sabnzbd.Downloader.init_server(serv, serv) 334 except: 335 pass 336 config.save_config() 337 338 def scheduler_restart_guard(self): 339 """Set flag for scheduler restart""" 340 self.restart_scheduler = True 341 342 def scheduled_resume(self): 343 """Scheduled resume, only when no oneshot resume is active""" 344 if self.pause_end is None: 345 sabnzbd.unpause_all() 346 347 def __oneshot_resume(self, when): 348 """Called by delayed resume schedule 349 Only resumes if call comes at the planned time 350 """ 351 if self.pause_end is not None and (when > self.pause_end - 5) and (when < self.pause_end + 55): 352 self.pause_end = None 353 logging.debug("Resume after pause-interval") 354 sabnzbd.unpause_all() 355 else: 356 logging.debug("Ignoring cancelled resume") 357 358 def plan_resume(self, interval): 359 """Set a scheduled resume after the interval""" 360 if interval > 0: 361 self.pause_end = time.time() + (interval * 60) 362 logging.debug("Schedule resume at %s", self.pause_end) 363 self.scheduler.add_single_task(self.__oneshot_resume, "", interval * 60, args=[self.pause_end]) 364 sabnzbd.Downloader.pause() 365 else: 366 self.pause_end = None 367 sabnzbd.unpause_all() 368 369 def __check_diskspace(self, full_dir: str, required_space: float): 370 """Resume if there is sufficient available space""" 371 if not cfg.fulldisk_autoresume(): 372 self.cancel_resume_task() 373 return 374 375 disk_free = diskspace(force=True)[full_dir][1] 376 if disk_free > required_space: 377 logging.info("Resuming, %s has %d GB free, needed %d GB", full_dir, disk_free, required_space) 378 sabnzbd.Downloader.resume() 379 else: 380 logging.info("%s has %d GB free, need %d GB to resume", full_dir, disk_free, required_space) 381 382 # Remove scheduled task if user manually resumed or we auto-resumed 383 if not sabnzbd.Downloader.paused: 384 self.cancel_resume_task() 385 386 def plan_diskspace_resume(self, full_dir: str, required_space: float): 387 """Create regular check for free disk space""" 388 self.cancel_resume_task() 389 logging.info("Will resume when %s has more than %d GB free space", full_dir, required_space) 390 self.resume_task = self.scheduler.add_interval_task( 391 self.__check_diskspace, "check_diskspace", 5 * 60, 9 * 60, "threaded", args=[full_dir, required_space] 392 ) 393 394 def cancel_resume_task(self): 395 """Cancel the current auto resume task""" 396 if self.resume_task: 397 logging.debug("Cancelling existing resume_task '%s'", self.resume_task.name) 398 self.scheduler.cancel(self.resume_task) 399 self.resume_task = None 400 401 def pause_int(self) -> str: 402 """Return minutes:seconds until pause ends""" 403 if self.pause_end is None: 404 return "0" 405 else: 406 val = self.pause_end - time.time() 407 if val < 0: 408 sign = "-" 409 val = abs(val) 410 else: 411 sign = "" 412 mins = int(val / 60) 413 sec = int(val - mins * 60) 414 return "%s%d:%02d" % (sign, mins, sec) 415 416 def pause_check(self): 417 """Unpause when time left is negative, compensate for missed schedule""" 418 if self.pause_end is not None and (self.pause_end - time.time()) < 0: 419 self.pause_end = None 420 logging.debug("Force resume, negative timer") 421 sabnzbd.unpause_all() 422 423 def plan_server(self, action, parms, interval): 424 """Plan to re-activate server after 'interval' minutes""" 425 self.scheduler.add_single_task(action, "", interval * 60, args=parms) 426 427 def force_rss(self): 428 """Add a one-time RSS scan, one second from now""" 429 self.scheduler.add_single_task(sabnzbd.RSSReader.run, "RSS", 1) 430 431 432def pp_pause(): 433 sabnzbd.PostProcessor.paused = True 434 435 436def pp_resume(): 437 sabnzbd.PostProcessor.paused = False 438 439 440def sort_schedules(all_events, now=None): 441 """Sort the schedules, based on order of happening from now 442 `all_events=True`: Return an event for each active day 443 `all_events=False`: Return only first occurring event of the week 444 `now` : for testing: simulated localtime() 445 """ 446 447 day_min = 24 * 60 448 week_min = 7 * day_min 449 events = [] 450 451 now = now or time.localtime() 452 now_hm = now[3] * 60 + now[4] 453 now = now[6] * day_min + now_hm 454 455 for schedule in cfg.schedules(): 456 parms = None 457 try: 458 # Note: the last parameter can have spaces (category name)! 459 enabled, m, h, dd, action, parms = schedule.split(None, 5) 460 except: 461 try: 462 enabled, m, h, dd, action = schedule.split(None, 4) 463 except: 464 continue # Bad schedule, ignore 465 action = action.strip() 466 if dd == "*": 467 dd = "1234567" 468 if not dd.isdigit(): 469 continue # Bad schedule, ignore 470 for d in dd: 471 then = (int(d) - 1) * day_min + int(h) * 60 + int(m) 472 dif = then - now 473 if all_events and dif < 0: 474 # Expired event will occur again after a week 475 dif = dif + week_min 476 477 events.append((dif, action, parms, schedule, enabled)) 478 if not all_events: 479 break 480 481 events.sort(key=lambda x: x[0]) 482 return events 483