1 /*
2 * This file is part of nzbget. See <http://nzbget.net>.
3 *
4 * Copyright (C) 2008-2019 Andrey Prygunkov <hugbug@users.sourceforge.net>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20
21 #include "nzbget.h"
22 #include "Scheduler.h"
23 #include "Options.h"
24 #include "WorkState.h"
25 #include "Log.h"
26 #include "NewsServer.h"
27 #include "ServerPool.h"
28 #include "FeedInfo.h"
29 #include "FeedCoordinator.h"
30 #include "SchedulerScript.h"
31
AddTask(std::unique_ptr<Task> task)32 void Scheduler::AddTask(std::unique_ptr<Task> task)
33 {
34 Guard guard(m_taskListMutex);
35 m_taskList.push_back(std::move(task));
36 }
37
FirstCheck()38 void Scheduler::FirstCheck()
39 {
40 {
41 Guard guard(m_taskListMutex);
42
43 std::sort(m_taskList.begin(), m_taskList.end(),
44 [](const std::unique_ptr<Task>& task1, const std::unique_ptr<Task>& task2)
45 {
46 return (task1->m_hours < task2->m_hours) ||
47 ((task1->m_hours == task2->m_hours) && (task1->m_minutes < task2->m_minutes));
48 });
49 }
50
51 // check all tasks for the last week
52 CheckTasks();
53 }
54
ScheduleNextWork()55 void Scheduler::ScheduleNextWork()
56 {
57 // Ideally we should calculate wait time until next scheduler task or until resume time.
58 // The first isn't trivial and the second requires watching/reaction on changed scheduled resume time.
59 // We do it simpler instead: check once per minute, when seconds are changing from 59 to 00.
60
61 time_t curTime = Util::CurrentTime();
62 tm sched;
63 gmtime_r(&curTime, &sched);
64 sched.tm_min++;
65 sched.tm_sec = 0;
66 time_t nextMinute = Util::Timegm(&sched);
67
68 m_serviceInterval = nextMinute - curTime;
69 }
70
ServiceWork()71 void Scheduler::ServiceWork()
72 {
73 debug("Scheduler service work");
74
75 if (!DownloadQueue::IsLoaded())
76 {
77 return;
78 }
79
80 debug("Scheduler service work: doing work");
81
82 if (!m_firstChecked)
83 {
84 FirstCheck();
85 m_firstChecked = true;
86 return;
87 }
88
89 m_executeProcess = true;
90 CheckTasks();
91 CheckScheduledResume();
92 ScheduleNextWork();
93 }
94
CheckTasks()95 void Scheduler::CheckTasks()
96 {
97 PrepareLog();
98
99 {
100 Guard guard(m_taskListMutex);
101
102 time_t current = Util::CurrentTime();
103
104 if (!m_taskList.empty())
105 {
106 // Detect large step changes of system time
107 time_t diff = current - m_lastCheck;
108 if (diff > 60 * 90 || diff < 0)
109 {
110 debug("Reset scheduled tasks (detected clock change greater than 90 minutes or negative)");
111
112 // check all tasks for the last week
113 m_lastCheck = current - 60 * 60 * 24 * 7;
114 m_executeProcess = false;
115
116 for (Task* task : &m_taskList)
117 {
118 if (task->m_hours != Task::STARTUP_TASK)
119 {
120 task->m_lastExecuted = 0;
121 }
122 }
123 }
124
125 time_t localCurrent = current + g_WorkState->GetLocalTimeOffset();
126 time_t localLastCheck = m_lastCheck + g_WorkState->GetLocalTimeOffset();
127
128 tm tmCurrent;
129 gmtime_r(&localCurrent, &tmCurrent);
130 tm tmLastCheck;
131 gmtime_r(&localLastCheck, &tmLastCheck);
132
133 tm tmLoop;
134 memcpy(&tmLoop, &tmLastCheck, sizeof(tmLastCheck));
135 tmLoop.tm_hour = tmCurrent.tm_hour;
136 tmLoop.tm_min = tmCurrent.tm_min;
137 tmLoop.tm_sec = tmCurrent.tm_sec;
138 time_t loop = Util::Timegm(&tmLoop);
139
140 while (loop <= localCurrent)
141 {
142 for (Task* task : &m_taskList)
143 {
144 if (task->m_lastExecuted != loop)
145 {
146 tm tmAppoint;
147 memcpy(&tmAppoint, &tmLoop, sizeof(tmLoop));
148 tmAppoint.tm_hour = task->m_hours;
149 tmAppoint.tm_min = task->m_minutes;
150 tmAppoint.tm_sec = 0;
151
152 time_t appoint = Util::Timegm(&tmAppoint);
153
154 int weekDay = tmAppoint.tm_wday;
155 if (weekDay == 0)
156 {
157 weekDay = 7;
158 }
159
160 bool weekDayOK = task->m_weekDaysBits == 0 || (task->m_weekDaysBits & (1 << (weekDay - 1)));
161 bool doTask = (task->m_hours >= 0 && weekDayOK && localLastCheck < appoint && appoint <= localCurrent) ||
162 (task->m_hours == Task::STARTUP_TASK && task->m_lastExecuted == 0);
163
164 if (doTask)
165 {
166 ExecuteTask(task);
167 task->m_lastExecuted = loop;
168 }
169 }
170 }
171 loop += 60 * 60 * 24; // inc day
172 gmtime_r(&loop, &tmLoop);
173 }
174 }
175
176 m_lastCheck = current;
177 }
178
179 PrintLog();
180 }
181
ExecuteTask(Task * task)182 void Scheduler::ExecuteTask(Task* task)
183 {
184 #ifdef DEBUG
185 const char* commandName[] = { "Pause", "Unpause", "Pause Post-processing", "Unpause Post-processing",
186 "Set download rate", "Execute process", "Execute script",
187 "Pause Scan", "Unpause Scan", "Enable Server", "Disable Server", "Fetch Feed" };
188 debug("Executing scheduled command: %s", commandName[task->m_command]);
189 #endif
190
191 bool executeProcess = m_executeProcess || task->m_hours == Task::STARTUP_TASK;
192
193 switch (task->m_command)
194 {
195 case scDownloadRate:
196 if (!task->m_param.Empty())
197 {
198 g_WorkState->SetSpeedLimit(atoi(task->m_param) * 1024);
199 m_downloadRateChanged = true;
200 }
201 break;
202
203 case scPauseDownload:
204 case scUnpauseDownload:
205 g_WorkState->SetPauseDownload(task->m_command == scPauseDownload);
206 m_pauseDownloadChanged = true;
207 break;
208
209 case scPausePostProcess:
210 case scUnpausePostProcess:
211 g_WorkState->SetPausePostProcess(task->m_command == scPausePostProcess);
212 m_pausePostProcessChanged = true;
213 break;
214
215 case scPauseScan:
216 case scUnpauseScan:
217 g_WorkState->SetPauseScan(task->m_command == scPauseScan);
218 m_pauseScanChanged = true;
219 break;
220
221 case scExtensions:
222 case scProcess:
223 if (executeProcess)
224 {
225 SchedulerScriptController::StartScript(task->m_param, task->m_command == scProcess, task->m_id);
226 }
227 break;
228
229 case scActivateServer:
230 case scDeactivateServer:
231 EditServer(task->m_command == scActivateServer, task->m_param);
232 break;
233
234 case scFetchFeed:
235 if (executeProcess)
236 {
237 FetchFeed(task->m_param);
238 break;
239 }
240 }
241 }
242
PrepareLog()243 void Scheduler::PrepareLog()
244 {
245 m_downloadRateChanged = false;
246 m_pauseDownloadChanged = false;
247 m_pausePostProcessChanged = false;
248 m_pauseScanChanged = false;
249 m_serverChanged = false;
250 }
251
PrintLog()252 void Scheduler::PrintLog()
253 {
254 if (m_downloadRateChanged)
255 {
256 info("Scheduler: setting download rate to %i KB/s", g_WorkState->GetSpeedLimit() / 1024);
257 }
258 if (m_pauseDownloadChanged)
259 {
260 info("Scheduler: %s download", g_WorkState->GetPauseDownload() ? "pausing" : "unpausing");
261 }
262 if (m_pausePostProcessChanged)
263 {
264 info("Scheduler: %s post-processing", g_WorkState->GetPausePostProcess() ? "pausing" : "unpausing");
265 }
266 if (m_pauseScanChanged)
267 {
268 info("Scheduler: %s scan", g_WorkState->GetPauseScan() ? "pausing" : "unpausing");
269 }
270 if (m_serverChanged)
271 {
272 int index = 0;
273 for (NewsServer* server : g_ServerPool->GetServers())
274 {
275 if (server->GetActive() != m_serverStatusList[index])
276 {
277 info("Scheduler: %s %s", server->GetActive() ? "activating" : "deactivating", server->GetName());
278 }
279 index++;
280 }
281 g_ServerPool->Changed();
282 }
283 }
284
EditServer(bool active,const char * serverList)285 void Scheduler::EditServer(bool active, const char* serverList)
286 {
287 Tokenizer tok(serverList, ",;");
288 while (const char* serverRef = tok.Next())
289 {
290 int id = atoi(serverRef);
291 for (NewsServer* server : g_ServerPool->GetServers())
292 {
293 if ((id > 0 && server->GetId() == id) ||
294 !strcasecmp(server->GetName(), serverRef))
295 {
296 if (!m_serverChanged)
297 {
298 // store old server status for logging
299 m_serverStatusList.clear();
300 m_serverStatusList.reserve(g_ServerPool->GetServers()->size());
301 for (NewsServer* server2 : g_ServerPool->GetServers())
302 {
303 m_serverStatusList.push_back(server2->GetActive());
304 }
305 }
306 m_serverChanged = true;
307 server->SetActive(active);
308 break;
309 }
310 }
311 }
312 }
313
FetchFeed(const char * feedList)314 void Scheduler::FetchFeed(const char* feedList)
315 {
316 Tokenizer tok(feedList, ",;");
317 while (const char* feedRef = tok.Next())
318 {
319 int id = atoi(feedRef);
320 for (FeedInfo* feed : g_FeedCoordinator->GetFeeds())
321 {
322 if (feed->GetId() == id ||
323 !strcasecmp(feed->GetName(), feedRef) ||
324 !strcasecmp("0", feedRef))
325 {
326 g_FeedCoordinator->FetchFeed(!strcasecmp("0", feedRef) ? 0 : feed->GetId());
327 break;
328 }
329 }
330 }
331 }
332
CheckScheduledResume()333 void Scheduler::CheckScheduledResume()
334 {
335 time_t resumeTime = g_WorkState->GetResumeTime();
336 time_t currentTime = Util::CurrentTime();
337 if (resumeTime > 0 && currentTime >= resumeTime)
338 {
339 info("Autoresume");
340 g_WorkState->SetResumeTime(0);
341 g_WorkState->SetPauseDownload(false);
342 g_WorkState->SetPausePostProcess(false);
343 g_WorkState->SetPauseScan(false);
344 }
345 }
346