1 /*
2     SPDX-FileCopyrightText: 2014 Daniel Vrátil <dvratil@redhat.com>
3 
4     SPDX-License-Identifier: LGPL-2.0-or-later
5 */
6 
7 #include "collectionscheduler.h"
8 #include "akonadiserver_debug.h"
9 #include "storage/datastore.h"
10 #include "storage/selectquerybuilder.h"
11 
12 #include <chrono>
13 #include <private/tristate_p.h>
14 
15 #include <QDateTime>
16 #include <QTimer>
17 
18 using namespace std::literals::chrono_literals;
19 
20 namespace Akonadi
21 {
22 namespace Server
23 {
24 /**
25  * @warning: QTimer's methods are not virtual, so it's necessary to always call
26  * methods on pointer to PauseableTimer!
27  */
28 class PauseableTimer : public QTimer
29 {
30     Q_OBJECT
31 
32 public:
PauseableTimer(QObject * parent=nullptr)33     explicit PauseableTimer(QObject *parent = nullptr)
34         : QTimer(parent)
35     {
36     }
37 
start(std::chrono::milliseconds interval)38     void start(std::chrono::milliseconds interval)
39     {
40         mStarted = QDateTime::currentDateTimeUtc();
41         mPaused = QDateTime();
42         setInterval(interval);
43         QTimer::start(interval);
44     }
45 
start()46     void start()
47     {
48         start(std::chrono::milliseconds{interval()});
49     }
50 
stop()51     void stop()
52     {
53         mStarted = QDateTime();
54         mPaused = QDateTime();
55         QTimer::stop();
56     }
57 
pause()58     Q_INVOKABLE void pause()
59     {
60         if (!isActive() || isPaused()) {
61             return;
62         }
63 
64         mPaused = QDateTime::currentDateTimeUtc();
65         QTimer::stop();
66     }
67 
resume()68     Q_INVOKABLE void resume()
69     {
70         if (!isPaused()) {
71             return;
72         }
73 
74         const auto remainder = std::chrono::milliseconds{interval()} - std::chrono::seconds{mStarted.secsTo(mPaused)};
75         start(qMax(std::chrono::milliseconds{0}, remainder));
76         mPaused = QDateTime();
77         // Update mStarted so that pause() can be called repeatedly
78         mStarted = QDateTime::currentDateTimeUtc();
79     }
80 
isPaused() const81     bool isPaused() const
82     {
83         return mPaused.isValid();
84     }
85 
86 private:
87     QDateTime mStarted;
88     QDateTime mPaused;
89 };
90 
91 } // namespace Server
92 } // namespace Akonadi
93 
94 using namespace Akonadi::Server;
95 
CollectionScheduler(const QString & threadName,QThread::Priority priority,QObject * parent)96 CollectionScheduler::CollectionScheduler(const QString &threadName, QThread::Priority priority, QObject *parent)
97     : AkThread(threadName, priority, parent)
98 {
99 }
100 
~CollectionScheduler()101 CollectionScheduler::~CollectionScheduler()
102 {
103 }
104 
105 // Called in secondary thread
quit()106 void CollectionScheduler::quit()
107 {
108     delete mScheduler;
109     mScheduler = nullptr;
110 
111     AkThread::quit();
112 }
113 
inhibit(bool inhibit)114 void CollectionScheduler::inhibit(bool inhibit)
115 {
116     if (inhibit) {
117         const bool success = QMetaObject::invokeMethod(mScheduler, &PauseableTimer::pause, Qt::QueuedConnection);
118         Q_ASSERT(success);
119         Q_UNUSED(success)
120     } else {
121         const bool success = QMetaObject::invokeMethod(mScheduler, &PauseableTimer::resume, Qt::QueuedConnection);
122         Q_ASSERT(success);
123         Q_UNUSED(success)
124     }
125 }
126 
minimumInterval() const127 int CollectionScheduler::minimumInterval() const
128 {
129     return mMinInterval;
130 }
131 
nextScheduledTime(qint64 collectionId) const132 CollectionScheduler::TimePoint CollectionScheduler::nextScheduledTime(qint64 collectionId) const
133 {
134     QMutexLocker locker(&mScheduleLock);
135     const auto i = constFind(collectionId);
136     if (i != mSchedule.cend()) {
137         return i.key();
138     }
139     return {};
140 }
141 
currentTimerInterval() const142 std::chrono::milliseconds CollectionScheduler::currentTimerInterval() const
143 {
144     return std::chrono::milliseconds(mScheduler->isActive() ? mScheduler->interval() : 0);
145 }
146 
setMinimumInterval(int intervalMinutes)147 void CollectionScheduler::setMinimumInterval(int intervalMinutes)
148 {
149     // No mutex -- you can only call this before starting the thread
150     mMinInterval = intervalMinutes;
151 }
152 
collectionAdded(qint64 collectionId)153 void CollectionScheduler::collectionAdded(qint64 collectionId)
154 {
155     Collection collection = Collection::retrieveById(collectionId);
156     DataStore::self()->activeCachePolicy(collection);
157     if (shouldScheduleCollection(collection)) {
158         QMetaObject::invokeMethod(
159             this,
160             [this, collection]() {
161                 scheduleCollection(collection);
162             },
163             Qt::QueuedConnection);
164     }
165 }
166 
collectionChanged(qint64 collectionId)167 void CollectionScheduler::collectionChanged(qint64 collectionId)
168 {
169     QMutexLocker locker(&mScheduleLock);
170     const auto it = constFind(collectionId);
171     if (it != mSchedule.cend()) {
172         const Collection &oldCollection = it.value();
173         Collection changed = Collection::retrieveById(collectionId);
174         DataStore::self()->activeCachePolicy(changed);
175         if (hasChanged(oldCollection, changed)) {
176             if (shouldScheduleCollection(changed)) {
177                 locker.unlock();
178                 // Scheduling the changed collection will automatically remove the old one
179                 QMetaObject::invokeMethod(
180                     this,
181                     [this, changed]() {
182                         scheduleCollection(changed);
183                     },
184                     Qt::QueuedConnection);
185             } else {
186                 locker.unlock();
187                 // If the collection should no longer be scheduled then remove it
188                 collectionRemoved(collectionId);
189             }
190         }
191     } else {
192         // We don't know the collection yet, but maybe now it can be scheduled
193         collectionAdded(collectionId);
194     }
195 }
196 
collectionRemoved(qint64 collectionId)197 void CollectionScheduler::collectionRemoved(qint64 collectionId)
198 {
199     QMutexLocker locker(&mScheduleLock);
200     auto it = find(collectionId);
201     if (it != mSchedule.end()) {
202         const bool reschedule = it == mSchedule.begin();
203         mSchedule.erase(it);
204 
205         // If we just remove currently scheduled collection, schedule the next one
206         if (reschedule) {
207             QMetaObject::invokeMethod(this, &CollectionScheduler::startScheduler, Qt::QueuedConnection);
208         }
209     }
210 }
211 
212 // Called in secondary thread
startScheduler()213 void CollectionScheduler::startScheduler()
214 {
215     QMutexLocker locker(&mScheduleLock);
216     // Don't restart timer if we are paused.
217     if (mScheduler->isPaused()) {
218         return;
219     }
220 
221     if (mSchedule.isEmpty()) {
222         // Stop the timer. It will be started again once some collection is scheduled
223         mScheduler->stop();
224         return;
225     }
226 
227     // Get next collection to expire and start the timer
228     const auto next = mSchedule.constBegin().key();
229     // TimePoint uses a signed representation internally (int64_t), so we get negative result when next is in the past
230     const auto delayUntilNext = std::chrono::duration_cast<std::chrono::milliseconds>(next - std::chrono::steady_clock::now());
231     mScheduler->start(qMax(std::chrono::milliseconds{0}, delayUntilNext));
232 }
233 
234 // Called in secondary thread
scheduleCollection(Collection collection,bool shouldStartScheduler)235 void CollectionScheduler::scheduleCollection(Collection collection, bool shouldStartScheduler)
236 {
237     DataStore::self()->activeCachePolicy(collection);
238 
239     QMutexLocker locker(&mScheduleLock);
240     auto i = find(collection.id());
241     if (i != mSchedule.end()) {
242         mSchedule.erase(i);
243     }
244 
245     if (!shouldScheduleCollection(collection)) {
246         return;
247     }
248 
249     const int expireMinutes = qMax(mMinInterval, collectionScheduleInterval(collection));
250     TimePoint nextCheck(std::chrono::steady_clock::now() + std::chrono::minutes(expireMinutes));
251 
252     // Check whether there's another check scheduled within a minute after this one.
253     // If yes, then delay this check so that it's scheduled together with the others
254     // This is a minor optimization to reduce wakeups and SQL queries
255     auto it = constLowerBound(nextCheck);
256     if (it != mSchedule.cend() && it.key() - nextCheck < 1min) {
257         nextCheck = it.key();
258 
259         // Also check whether there's another checked scheduled within a minute before
260         // this one.
261     } else if (it != mSchedule.cbegin()) {
262         --it;
263         if (nextCheck - it.key() < 1min) {
264             nextCheck = it.key();
265         }
266     }
267 
268     mSchedule.insert(nextCheck, collection);
269     if (shouldStartScheduler && !mScheduler->isActive()) {
270         locker.unlock();
271         startScheduler();
272     }
273 }
274 
constFind(qint64 collectionId) const275 CollectionScheduler::ScheduleMap::const_iterator CollectionScheduler::constFind(qint64 collectionId) const
276 {
277     return std::find_if(mSchedule.cbegin(), mSchedule.cend(), [collectionId](const Collection &c) {
278         return c.id() == collectionId;
279     });
280 }
281 
find(qint64 collectionId)282 CollectionScheduler::ScheduleMap::iterator CollectionScheduler::find(qint64 collectionId)
283 {
284     return std::find_if(mSchedule.begin(), mSchedule.end(), [collectionId](const Collection &c) {
285         return c.id() == collectionId;
286     });
287 }
288 
289 // separate method so we call the const version of QMap::lowerBound
constLowerBound(TimePoint timestamp) const290 CollectionScheduler::ScheduleMap::const_iterator CollectionScheduler::constLowerBound(TimePoint timestamp) const
291 {
292     return mSchedule.lowerBound(timestamp);
293 }
294 
295 // Called in secondary thread
init()296 void CollectionScheduler::init()
297 {
298     AkThread::init();
299 
300     mScheduler = new PauseableTimer();
301     mScheduler->setSingleShot(true);
302     connect(mScheduler, &QTimer::timeout, this, &CollectionScheduler::schedulerTimeout);
303 
304     // Only retrieve enabled collections and referenced collections, we don't care
305     // about anything else
306     SelectQueryBuilder<Collection> qb;
307     if (!qb.exec()) {
308         qCWarning(AKONADISERVER_LOG) << "Failed to query initial collections for scheduler!";
309         qCWarning(AKONADISERVER_LOG) << "Not a fatal error, no collections will be scheduled for sync or cache expiration!";
310     }
311 
312     const Collection::List collections = qb.result();
313     for (const Collection &collection : collections) {
314         scheduleCollection(collection);
315     }
316 
317     startScheduler();
318 }
319 
320 // Called in secondary thread
schedulerTimeout()321 void CollectionScheduler::schedulerTimeout()
322 {
323     QMutexLocker locker(&mScheduleLock);
324 
325     // Call stop() explicitly to reset the timer
326     mScheduler->stop();
327 
328     const auto timestamp = mSchedule.constBegin().key();
329     const QList<Collection> collections = mSchedule.values(timestamp);
330     mSchedule.remove(timestamp);
331     locker.unlock();
332 
333     for (const Collection &collection : collections) {
334         collectionExpired(collection);
335         scheduleCollection(collection, false);
336     }
337 
338     startScheduler();
339 }
340 
341 #include "collectionscheduler.moc"
342