1 /*
2    BAREOS® - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2000-2011 Free Software Foundation Europe e.V.
5    Copyright (C) 2011-2012 Planets Communications B.V.
6    Copyright (C) 2013-2019 Bareos GmbH & Co. KG
7 
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12 
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    Affero General Public License for more details.
17 
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22 */
23 
24 #include "include/bareos.h"
25 #include "dird/dird.h"
26 #include "dird/dird_conf.h"
27 #include "dird/dird_globals.h"
28 #include "dird/jcr_private.h"
29 #include "dird/job.h"
30 #include "dird/run_hour_validator.h"
31 #include "dird/scheduler.h"
32 #include "dird/scheduler_job_item_queue.h"
33 #include "dird/scheduler_private.h"
34 #include "dird/scheduler_system_time_source.h"
35 #include "dird/scheduler_time_adapter.h"
36 #include "dird/storage.h"
37 #include "include/make_unique.h"
38 #include "lib/parse_conf.h"
39 
40 #include <chrono>
41 #include <utility>
42 
43 namespace directordaemon {
44 
45 using std::chrono::seconds;
46 
47 static constexpr int local_debuglevel = 200;
48 static constexpr auto seconds_per_hour = seconds(3600);
49 static constexpr auto seconds_per_minute = seconds(60);
50 
IsAutomaticSchedulerJob(JobResource * job)51 static bool IsAutomaticSchedulerJob(JobResource* job)
52 {
53   Dmsg1(local_debuglevel + 100,
54         "Scheduler: Check if job IsAutomaticSchedulerJob %s.",
55         job->resource_name_);
56   if (job->schedule == nullptr) { return false; }
57   if (!job->schedule->enabled) { return false; }
58   if (!job->enabled) { return false; }
59   if ((job->client != nullptr) && !job->client->enabled) { return false; }
60   Dmsg1(local_debuglevel + 100,
61         "Scheduler: Check if job IsAutomaticSchedulerJob %s: Yes.",
62         job->resource_name_);
63   return true;
64 }
65 
SetJcrFromRunResource(JobControlRecord * jcr,RunResource * run)66 static void SetJcrFromRunResource(JobControlRecord* jcr, RunResource* run)
67 {
68   if (run->level != 0U) {
69     jcr->setJobLevel(run->level); /* override run level */
70   }
71 
72   if (run->pool != nullptr) {
73     jcr->impl->res.pool = run->pool; /* override pool */
74     jcr->impl->res.run_pool_override = true;
75   }
76 
77   if (run->full_pool != nullptr) {
78     jcr->impl->res.full_pool = run->full_pool; /* override full pool */
79     jcr->impl->res.run_full_pool_override = true;
80   }
81 
82   if (run->vfull_pool != nullptr) {
83     jcr->impl->res.vfull_pool =
84         run->vfull_pool; /* override virtual full pool */
85     jcr->impl->res.run_vfull_pool_override = true;
86   }
87 
88   if (run->inc_pool != nullptr) {
89     jcr->impl->res.inc_pool = run->inc_pool; /* override inc pool */
90     jcr->impl->res.run_inc_pool_override = true;
91   }
92 
93   if (run->diff_pool != nullptr) {
94     jcr->impl->res.diff_pool = run->diff_pool; /* override diff pool */
95     jcr->impl->res.run_diff_pool_override = true;
96   }
97 
98   if (run->next_pool != nullptr) {
99     jcr->impl->res.next_pool = run->next_pool; /* override next pool */
100     jcr->impl->res.run_next_pool_override = true;
101   }
102 
103   if (run->storage != nullptr) {
104     UnifiedStorageResource store;
105     store.store = run->storage;
106     PmStrcpy(store.store_source, _("run override"));
107     SetRwstorage(jcr, &store); /* override storage */
108   }
109 
110   if (run->msgs != nullptr) {
111     jcr->impl->res.messages = run->msgs; /* override messages */
112   }
113 
114   if (run->Priority != 0) { jcr->JobPriority = run->Priority; }
115 
116   if (run->spool_data_set) { jcr->impl->spool_data = run->spool_data; }
117 
118   if (run->accurate_set) {
119     jcr->accurate = run->accurate; /* overwrite accurate mode */
120   }
121 
122   if (run->MaxRunSchedTime_set) {
123     jcr->impl->MaxRunSchedTime = run->MaxRunSchedTime;
124   }
125 }
126 
TryCreateJobControlRecord(const SchedulerJobItem & next_job)127 JobControlRecord* SchedulerPrivate::TryCreateJobControlRecord(
128     const SchedulerJobItem& next_job)
129 {
130   JobControlRecord* jcr = NewDirectorJcr();
131   SetJcrDefaults(jcr, next_job.job);
132   if (next_job.run != nullptr) {
133     next_job.run->scheduled_last = time_adapter->time_source_->SystemTime();
134     SetJcrFromRunResource(jcr, next_job.run);
135   }
136   return jcr;
137 }
138 
WaitForJobsToRun()139 void SchedulerPrivate::WaitForJobsToRun()
140 {
141   while (active && !prioritised_job_item_queue.Empty()) {
142     auto next_job = prioritised_job_item_queue.TopItem();
143     if (!next_job.is_valid) { break; }
144 
145     time_t now = time_adapter->time_source_->SystemTime();
146 
147     if (now >= next_job.runtime) {
148       auto run_job = prioritised_job_item_queue.TakeOutTopItem();
149       if (!run_job.is_valid) {
150         continue;  // check queue again
151       }
152       JobControlRecord* jcr = TryCreateJobControlRecord(run_job);
153       if (jcr != nullptr) {
154         Dmsg1(local_debuglevel, "Scheduler: Running job %s.",
155               run_job.job->resource_name_);
156         ExecuteJobCallback_(jcr);
157       }
158 
159     } else {
160       time_t wait_interval{std::min(time_adapter->default_wait_interval_,
161                                     next_job.runtime - now)};
162       Dmsg2(local_debuglevel,
163             "Scheduler: WaitForJobsToRun is sleeping for %d seconds. Next "
164             "job: %s.",
165             wait_interval, next_job.job->resource_name_);
166 
167       time_adapter->time_source_->SleepFor(seconds(wait_interval));
168     }
169   }
170 }
171 
FillSchedulerJobQueueOrSleep()172 void SchedulerPrivate::FillSchedulerJobQueueOrSleep()
173 {
174   while (active && prioritised_job_item_queue.Empty()) {
175     AddJobsForThisAndNextHourToQueue();
176     if (prioritised_job_item_queue.Empty()) {
177       time_adapter->time_source_->SleepFor(
178           seconds(time_adapter->default_wait_interval_));
179     }
180   }
181 }
182 
CalculateRuntime(time_t time,uint32_t minute)183 static time_t CalculateRuntime(time_t time, uint32_t minute)
184 {
185   struct tm tm {
186   };
187   Blocaltime(&time, &tm);
188   tm.tm_min = minute;
189   tm.tm_sec = 0;
190   return mktime(&tm);
191 }
192 
AddJobsForThisAndNextHourToQueue()193 void SchedulerPrivate::AddJobsForThisAndNextHourToQueue()
194 {
195   Dmsg0(local_debuglevel, "Begin AddJobsForThisAndNextHourToQueue\n");
196 
197   RunHourValidator this_hour(time_adapter->time_source_->SystemTime());
198   this_hour.PrintDebugMessage(local_debuglevel);
199 
200   RunHourValidator next_hour(this_hour.Time() + seconds_per_hour.count());
201   next_hour.PrintDebugMessage(local_debuglevel);
202 
203   JobResource* job = nullptr;
204 
205   LockRes(my_config);
206   foreach_res (job, R_JOB) {
207     if (!IsAutomaticSchedulerJob(job)) { continue; }
208 
209     Dmsg1(local_debuglevel, "Got job: %s\n", job->resource_name_);
210 
211     for (RunResource* run = job->schedule->run; run != nullptr;
212          run = run->next) {
213       bool run_this_hour = this_hour.TriggersOn(run->date_time_bitfield);
214       bool run_next_hour = next_hour.TriggersOn(run->date_time_bitfield);
215 
216       Dmsg3(local_debuglevel, "run@%p: run_now=%d run_next_hour=%d\n", run,
217             run_this_hour, run_next_hour);
218 
219       if (run_this_hour || run_next_hour) {
220         time_t runtime = CalculateRuntime(this_hour.Time(), run->minute);
221         if (run_this_hour) {
222           AddJobToQueue(job, run, this_hour.Time(), runtime);
223         }
224         if (run_next_hour) {
225           AddJobToQueue(job, run, this_hour.Time(),
226                         runtime + seconds_per_hour.count());
227         }
228       }
229     }
230   }
231   UnlockRes(my_config);
232   Dmsg0(local_debuglevel, "Finished AddJobsForThisAndNextHourToQueue\n");
233 }
234 
AddJobToQueue(JobResource * job,RunResource * run,time_t now,time_t runtime)235 void SchedulerPrivate::AddJobToQueue(JobResource* job,
236                                      RunResource* run,
237                                      time_t now,
238                                      time_t runtime)
239 {
240   Dmsg1(local_debuglevel + 100, "Scheduler: Try AddJobToQueue %s.",
241         job->resource_name_);
242 
243   if (run != nullptr) {
244     if ((runtime - run->scheduled_last) < 61) { return; }
245   }
246 
247   if ((runtime + 59) < now) { return; }
248 
249   try {
250     Dmsg1(local_debuglevel + 100, "Scheduler: Put job %s into queue.",
251           job->resource_name_);
252 
253     prioritised_job_item_queue.EmplaceItem(job, run, runtime);
254 
255   } catch (const std::invalid_argument& e) {
256     Dmsg1(local_debuglevel + 100, "Could not emplace job: %s\n", e.what());
257   }
258 }
259 
AddJobWithNoRunResourceToQueue(JobResource * job)260 void SchedulerPrivate::AddJobWithNoRunResourceToQueue(JobResource* job)
261 {
262   time_t now = time_adapter->time_source_->SystemTime();
263   AddJobToQueue(job, nullptr, now, now);
264 }
265 
266 class DefaultSchedulerTimeAdapter : public SchedulerTimeAdapter {
267  public:
DefaultSchedulerTimeAdapter()268   DefaultSchedulerTimeAdapter()
269       : SchedulerTimeAdapter(std::make_unique<SystemTimeSource>())
270   {
271     default_wait_interval_ = seconds_per_minute.count();
272   }
273 };
274 
SchedulerPrivate()275 SchedulerPrivate::SchedulerPrivate()
276     : time_adapter{std::make_unique<DefaultSchedulerTimeAdapter>()}
277     , ExecuteJobCallback_{ExecuteJob}
278 {
279   // this is the default director scheduler
280 }
281 
SchedulerPrivate(std::unique_ptr<SchedulerTimeAdapter> time_adapter,std::function<void (JobControlRecord *)> ExecuteJobCallback)282 SchedulerPrivate::SchedulerPrivate(
283     std::unique_ptr<SchedulerTimeAdapter> time_adapter,
284     std::function<void(JobControlRecord*)> ExecuteJobCallback)
285     : time_adapter{std::move(time_adapter)}
286     , ExecuteJobCallback_{std::move(std::move(ExecuteJobCallback))}
287 {
288   // constructor used for tests to inject mocked time adapter and callbacks
289 }
290 
291 SchedulerPrivate::~SchedulerPrivate() = default;
292 
293 
294 }  // namespace directordaemon
295