1#!/usr/bin/python3 -OO
2# Copyright 2007-2021 The SABnzbd-Team <team@sabnzbd.org>
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; either version 2
7# of the License, or (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17
18"""
19sabnzbd.scheduler - Event Scheduler
20"""
21
22import random
23import logging
24import time
25from typing import Optional
26
27import sabnzbd.utils.kronos as kronos
28import sabnzbd.rss
29import sabnzbd.downloader
30import sabnzbd.dirscanner
31import sabnzbd.misc
32import sabnzbd.config as config
33import sabnzbd.cfg as cfg
34from sabnzbd.filesystem import diskspace
35from sabnzbd.constants import LOW_PRIORITY, NORMAL_PRIORITY, HIGH_PRIORITY
36
37DAILY_RANGE = list(range(1, 8))
38
39
40class Scheduler:
41    def __init__(self):
42        self.scheduler = kronos.ThreadedScheduler()
43        self.pause_end: Optional[float] = None  # Moment when pause will end
44        self.resume_task: Optional[kronos.Task] = None
45        self.restart_scheduler = False
46        self.pp_pause_event = False
47        self.load_schedules()
48
49    def start(self):
50        """Start the scheduler"""
51        self.scheduler.start()
52
53    def stop(self):
54        """Stop the scheduler, destroy instance"""
55        logging.debug("Stopping scheduler")
56        self.scheduler.stop()
57
58    def restart(self, plan_restart=True):
59        """Stop and start scheduler"""
60        if plan_restart:
61            self.restart_scheduler = True
62        elif self.restart_scheduler:
63            logging.debug("Restarting scheduler")
64            self.restart_scheduler = False
65            self.scheduler.stop()
66            self.scheduler.start()
67            self.analyse(sabnzbd.Downloader.paused)
68            self.load_schedules()
69
70    def abort(self):
71        """Emergency stop, just set the running attribute false so we don't
72        have to wait the full scheduler-check cycle before it really stops"""
73        self.scheduler.running = False
74
75    def is_alive(self):
76        """Thread-like check if we are doing fine"""
77        if self.scheduler.thread:
78            return self.scheduler.thread.is_alive()
79        return False
80
81    def load_schedules(self):
82        rss_planned = False
83
84        for schedule in cfg.schedules():
85            arguments = []
86            argument_list = None
87
88            try:
89                enabled, m, h, d, action_name = schedule.split()
90            except:
91                try:
92                    enabled, m, h, d, action_name, argument_list = schedule.split(None, 5)
93                except:
94                    continue  # Bad schedule, ignore
95
96            if argument_list:
97                arguments = argument_list.split()
98
99            action_name = action_name.lower()
100            try:
101                m = int(m)
102                h = int(h)
103            except:
104                logging.warning(T("Bad schedule %s at %s:%s"), action_name, m, h)
105                continue
106
107            if d.isdigit():
108                d = [int(i) for i in d]
109            else:
110                d = DAILY_RANGE
111
112            if action_name == "resume":
113                action = self.scheduled_resume
114                arguments = []
115            elif action_name == "pause":
116                action = sabnzbd.Downloader.pause
117                arguments = []
118            elif action_name == "pause_all":
119                action = sabnzbd.pause_all
120                arguments = []
121            elif action_name == "shutdown":
122                action = sabnzbd.shutdown_program
123                arguments = []
124            elif action_name == "restart":
125                action = sabnzbd.restart_program
126                arguments = []
127            elif action_name == "pause_post":
128                action = pp_pause
129            elif action_name == "resume_post":
130                action = pp_resume
131            elif action_name == "speedlimit" and arguments != []:
132                action = sabnzbd.Downloader.limit_speed
133            elif action_name == "enable_server" and arguments != []:
134                action = sabnzbd.enable_server
135            elif action_name == "disable_server" and arguments != []:
136                action = sabnzbd.disable_server
137            elif action_name == "scan_folder":
138                action = sabnzbd.DirScanner.scan
139            elif action_name == "rss_scan":
140                action = sabnzbd.RSSReader.run
141                rss_planned = True
142            elif action_name == "remove_failed":
143                action = sabnzbd.api.history_remove_failed
144            elif action_name == "remove_completed":
145                action = sabnzbd.api.history_remove_completed
146            elif action_name == "enable_quota":
147                action = sabnzbd.BPSMeter.set_status
148                arguments = [True]
149            elif action_name == "disable_quota":
150                action = sabnzbd.BPSMeter.set_status
151                arguments = [False]
152            elif action_name == "pause_all_low":
153                action = sabnzbd.NzbQueue.pause_on_prio
154                arguments = [LOW_PRIORITY]
155            elif action_name == "pause_all_normal":
156                action = sabnzbd.NzbQueue.pause_on_prio
157                arguments = [NORMAL_PRIORITY]
158            elif action_name == "pause_all_high":
159                action = sabnzbd.NzbQueue.pause_on_prio
160                arguments = [HIGH_PRIORITY]
161            elif action_name == "resume_all_low":
162                action = sabnzbd.NzbQueue.resume_on_prio
163                arguments = [LOW_PRIORITY]
164            elif action_name == "resume_all_normal":
165                action = sabnzbd.NzbQueue.resume_on_prio
166                arguments = [NORMAL_PRIORITY]
167            elif action_name == "resume_all_high":
168                action = sabnzbd.NzbQueue.resume_on_prio
169                arguments = [HIGH_PRIORITY]
170            elif action_name == "pause_cat":
171                action = sabnzbd.NzbQueue.pause_on_cat
172                arguments = [argument_list]
173            elif action_name == "resume_cat":
174                action = sabnzbd.NzbQueue.resume_on_cat
175                arguments = [argument_list]
176            else:
177                logging.warning(T("Unknown action: %s"), action_name)
178                continue
179
180            if enabled == "1":
181                logging.info("Scheduling %s(%s) on days %s at %02d:%02d", action_name, arguments, d, h, m)
182                self.scheduler.add_daytime_task(action, action_name, d, None, (h, m), args=arguments)
183            else:
184                logging.debug("Skipping %s(%s) on days %s at %02d:%02d", action_name, arguments, d, h, m)
185
186        # Set RSS check interval
187        if not rss_planned:
188            interval = cfg.rss_rate()
189            delay = random.randint(0, interval - 1)
190            logging.info("Scheduling RSS interval task every %s min (delay=%s)", interval, delay)
191            sabnzbd.RSSReader.next_run = time.time() + delay * 60
192            self.scheduler.add_interval_task(sabnzbd.RSSReader.run, "RSS", delay * 60, interval * 60)
193            self.scheduler.add_single_task(sabnzbd.RSSReader.run, "RSS", 15)
194
195        if cfg.version_check():
196            # Check for new release, once per week on random time
197            m = random.randint(0, 59)
198            h = random.randint(0, 23)
199            d = (random.randint(1, 7),)
200
201            logging.info("Scheduling VersionCheck on day %s at %s:%s", d[0], h, m)
202            self.scheduler.add_daytime_task(sabnzbd.misc.check_latest_version, "VerCheck", d, None, (h, m))
203
204        action, hour, minute = sabnzbd.BPSMeter.get_quota()
205        if action:
206            logging.info("Setting schedule for quota check daily at %s:%s", hour, minute)
207            self.scheduler.add_daytime_task(action, "quota_reset", DAILY_RANGE, None, (hour, minute))
208
209        if sabnzbd.misc.int_conv(cfg.history_retention()) > 0:
210            logging.info("Setting schedule for midnight auto history-purge")
211            self.scheduler.add_daytime_task(
212                sabnzbd.database.midnight_history_purge, "midnight_history_purge", DAILY_RANGE, None, (0, 0)
213            )
214
215        logging.info("Setting schedule for midnight BPS reset")
216        self.scheduler.add_daytime_task(sabnzbd.BPSMeter.update, "midnight_bps", DAILY_RANGE, None, (0, 0))
217
218        logging.info("Setting schedule for server expiration check")
219        self.scheduler.add_daytime_task(
220            sabnzbd.downloader.check_server_expiration, "check_server_expiration", DAILY_RANGE, None, (0, 0)
221        )
222
223        logging.info("Setting scheduler for server quota check")
224        self.scheduler.add_interval_task(
225            sabnzbd.downloader.check_server_quota,
226            "check_server_quota",
227            0,
228            10 * 60,
229        )
230
231        # Subscribe to special schedule changes
232        cfg.rss_rate.callback(self.scheduler_restart_guard)
233
234    def analyse(self, was_paused=False, priority=None):
235        """Determine what pause/resume state we would have now.
236        'priority': evaluate only effect for given priority, return True for paused
237        """
238        self.pp_pause_event = False
239        paused = None
240        paused_all = False
241        pause_post = False
242        pause_low = pause_normal = pause_high = False
243        speedlimit = None
244        quota = True
245        servers = {}
246
247        for ev in sort_schedules(all_events=True):
248            if priority is None:
249                logging.debug("Schedule check result = %s", ev)
250
251            # Skip if disabled
252            if ev[4] == "0":
253                continue
254
255            action = ev[1]
256            try:
257                value = ev[2]
258            except:
259                value = None
260            if action == "pause":
261                paused = True
262            elif action == "pause_all":
263                paused_all = True
264                self.pp_pause_event = True
265            elif action == "resume":
266                paused = False
267                paused_all = False
268            elif action == "pause_post":
269                pause_post = True
270                self.pp_pause_event = True
271            elif action == "resume_post":
272                pause_post = False
273                self.pp_pause_event = True
274            elif action == "speedlimit" and value is not None:
275                speedlimit = ev[2]
276            elif action == "pause_all_low":
277                pause_low = True
278            elif action == "pause_all_normal":
279                pause_normal = True
280            elif action == "pause_all_high":
281                pause_high = True
282            elif action == "resume_all_low":
283                pause_low = False
284            elif action == "resume_all_normal":
285                pause_normal = False
286            elif action == "resume_all_high":
287                pause_high = False
288            elif action == "enable_quota":
289                quota = True
290            elif action == "disable_quota":
291                quota = False
292            elif action == "enable_server":
293                try:
294                    servers[value] = 1
295                except:
296                    logging.warning(T("Schedule for non-existing server %s"), value)
297            elif action == "disable_server":
298                try:
299                    servers[value] = 0
300                except:
301                    logging.warning(T("Schedule for non-existing server %s"), value)
302
303        # Special case, a priority was passed, so evaluate only that and return state
304        if priority == LOW_PRIORITY:
305            return pause_low
306        if priority == NORMAL_PRIORITY:
307            return pause_normal
308        if priority == HIGH_PRIORITY:
309            return pause_high
310        if priority is not None:
311            return False
312
313        # Normal analysis
314        if not was_paused:
315            if paused_all:
316                sabnzbd.pause_all()
317            else:
318                sabnzbd.unpause_all()
319            sabnzbd.Downloader.set_paused_state(paused or paused_all)
320
321        sabnzbd.PostProcessor.paused = pause_post
322        if speedlimit is not None:
323            sabnzbd.Downloader.limit_speed(speedlimit)
324
325        sabnzbd.BPSMeter.set_status(quota, action=False)
326
327        for serv in servers:
328            try:
329                item = config.get_config("servers", serv)
330                value = servers[serv]
331                if bool(item.enable()) != bool(value):
332                    item.enable.set(value)
333                    sabnzbd.Downloader.init_server(serv, serv)
334            except:
335                pass
336        config.save_config()
337
338    def scheduler_restart_guard(self):
339        """Set flag for scheduler restart"""
340        self.restart_scheduler = True
341
342    def scheduled_resume(self):
343        """Scheduled resume, only when no oneshot resume is active"""
344        if self.pause_end is None:
345            sabnzbd.unpause_all()
346
347    def __oneshot_resume(self, when):
348        """Called by delayed resume schedule
349        Only resumes if call comes at the planned time
350        """
351        if self.pause_end is not None and (when > self.pause_end - 5) and (when < self.pause_end + 55):
352            self.pause_end = None
353            logging.debug("Resume after pause-interval")
354            sabnzbd.unpause_all()
355        else:
356            logging.debug("Ignoring cancelled resume")
357
358    def plan_resume(self, interval):
359        """Set a scheduled resume after the interval"""
360        if interval > 0:
361            self.pause_end = time.time() + (interval * 60)
362            logging.debug("Schedule resume at %s", self.pause_end)
363            self.scheduler.add_single_task(self.__oneshot_resume, "", interval * 60, args=[self.pause_end])
364            sabnzbd.Downloader.pause()
365        else:
366            self.pause_end = None
367            sabnzbd.unpause_all()
368
369    def __check_diskspace(self, full_dir: str, required_space: float):
370        """Resume if there is sufficient available space"""
371        if not cfg.fulldisk_autoresume():
372            self.cancel_resume_task()
373            return
374
375        disk_free = diskspace(force=True)[full_dir][1]
376        if disk_free > required_space:
377            logging.info("Resuming, %s has %d GB free, needed %d GB", full_dir, disk_free, required_space)
378            sabnzbd.Downloader.resume()
379        else:
380            logging.info("%s has %d GB free, need %d GB to resume", full_dir, disk_free, required_space)
381
382        # Remove scheduled task if user manually resumed or we auto-resumed
383        if not sabnzbd.Downloader.paused:
384            self.cancel_resume_task()
385
386    def plan_diskspace_resume(self, full_dir: str, required_space: float):
387        """Create regular check for free disk space"""
388        self.cancel_resume_task()
389        logging.info("Will resume when %s has more than %d GB free space", full_dir, required_space)
390        self.resume_task = self.scheduler.add_interval_task(
391            self.__check_diskspace, "check_diskspace", 5 * 60, 9 * 60, "threaded", args=[full_dir, required_space]
392        )
393
394    def cancel_resume_task(self):
395        """Cancel the current auto resume task"""
396        if self.resume_task:
397            logging.debug("Cancelling existing resume_task '%s'", self.resume_task.name)
398            self.scheduler.cancel(self.resume_task)
399            self.resume_task = None
400
401    def pause_int(self) -> str:
402        """Return minutes:seconds until pause ends"""
403        if self.pause_end is None:
404            return "0"
405        else:
406            val = self.pause_end - time.time()
407            if val < 0:
408                sign = "-"
409                val = abs(val)
410            else:
411                sign = ""
412            mins = int(val / 60)
413            sec = int(val - mins * 60)
414            return "%s%d:%02d" % (sign, mins, sec)
415
416    def pause_check(self):
417        """Unpause when time left is negative, compensate for missed schedule"""
418        if self.pause_end is not None and (self.pause_end - time.time()) < 0:
419            self.pause_end = None
420            logging.debug("Force resume, negative timer")
421            sabnzbd.unpause_all()
422
423    def plan_server(self, action, parms, interval):
424        """Plan to re-activate server after 'interval' minutes"""
425        self.scheduler.add_single_task(action, "", interval * 60, args=parms)
426
427    def force_rss(self):
428        """Add a one-time RSS scan, one second from now"""
429        self.scheduler.add_single_task(sabnzbd.RSSReader.run, "RSS", 1)
430
431
432def pp_pause():
433    sabnzbd.PostProcessor.paused = True
434
435
436def pp_resume():
437    sabnzbd.PostProcessor.paused = False
438
439
440def sort_schedules(all_events, now=None):
441    """Sort the schedules, based on order of happening from now
442    `all_events=True`: Return an event for each active day
443    `all_events=False`: Return only first occurring event of the week
444    `now` : for testing: simulated localtime()
445    """
446
447    day_min = 24 * 60
448    week_min = 7 * day_min
449    events = []
450
451    now = now or time.localtime()
452    now_hm = now[3] * 60 + now[4]
453    now = now[6] * day_min + now_hm
454
455    for schedule in cfg.schedules():
456        parms = None
457        try:
458            # Note: the last parameter can have spaces (category name)!
459            enabled, m, h, dd, action, parms = schedule.split(None, 5)
460        except:
461            try:
462                enabled, m, h, dd, action = schedule.split(None, 4)
463            except:
464                continue  # Bad schedule, ignore
465        action = action.strip()
466        if dd == "*":
467            dd = "1234567"
468        if not dd.isdigit():
469            continue  # Bad schedule, ignore
470        for d in dd:
471            then = (int(d) - 1) * day_min + int(h) * 60 + int(m)
472            dif = then - now
473            if all_events and dif < 0:
474                # Expired event will occur again after a week
475                dif = dif + week_min
476
477            events.append((dif, action, parms, schedule, enabled))
478            if not all_events:
479                break
480
481    events.sort(key=lambda x: x[0])
482    return events
483