1 /*
2  * SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com>
3  *
4  * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
5  *
6  */
7 
8 #include "scheduler.h"
9 
10 #include "akonadi_indexer_agent_debug.h"
11 #include "collectionindexingjob.h"
12 
13 #include <Akonadi/AgentBase>
14 #include <Akonadi/CollectionFetchJob>
15 #include <Akonadi/CollectionFetchScope>
16 #include <Akonadi/IndexPolicyAttribute>
17 #include <Akonadi/ServerManager>
18 
19 #include <KConfigGroup>
20 #include <KLocalizedString>
21 
22 #include <QTimer>
23 #include <chrono>
24 
25 using namespace std::chrono_literals;
26 
~JobFactory()27 JobFactory::~JobFactory()
28 {
29 }
30 
31 CollectionIndexingJob *
createCollectionIndexingJob(Index & index,const Akonadi::Collection & col,const QList<Akonadi::Item::Id> & pending,bool fullSync,QObject * parent)32 JobFactory::createCollectionIndexingJob(Index &index, const Akonadi::Collection &col, const QList<Akonadi::Item::Id> &pending, bool fullSync, QObject *parent)
33 {
34     auto job = new CollectionIndexingJob(index, col, pending, parent);
35     job->setFullSync(fullSync);
36     return job;
37 }
38 
Scheduler(Index & index,const KSharedConfigPtr & config,const QSharedPointer<JobFactory> & jobFactory,QObject * parent)39 Scheduler::Scheduler(Index &index, const KSharedConfigPtr &config, const QSharedPointer<JobFactory> &jobFactory, QObject *parent)
40     : QObject(parent)
41     , m_config(config)
42     , m_index(index)
43     , m_jobFactory(jobFactory)
44     , m_busyTimeout(5000)
45 {
46     if (!m_jobFactory) {
47         m_jobFactory = QSharedPointer<JobFactory>(new JobFactory);
48     }
49     m_processTimer.setSingleShot(true);
50     m_processTimer.setInterval(100ms);
51     connect(&m_processTimer, &QTimer::timeout, this, &Scheduler::processNext);
52 
53     KConfigGroup cfg = m_config->group("General");
54     const auto dirtyCollectionsResult2 = cfg.readEntry("dirtyCollections", QList<Akonadi::Collection::Id>());
55     m_dirtyCollections = QSet<Akonadi::Collection::Id>(dirtyCollectionsResult2.begin(), dirtyCollectionsResult2.end());
56     if (m_dirtyCollections.isEmpty()) {
57         KConfig baloorc(Akonadi::ServerManager::addNamespace(QStringLiteral("baloorc")));
58         KConfigGroup baloorcGroup = baloorc.group("Akonadi");
59 
60         // Schedule collections we know have missing items from last time
61         const auto dirtyCollectionsResult = baloorcGroup.readEntry("dirtyCollections", QList<Akonadi::Collection::Id>());
62         m_dirtyCollections = QSet<Akonadi::Collection::Id>(dirtyCollectionsResult.begin(), dirtyCollectionsResult.end());
63     }
64 
65     qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Dirty collections " << m_dirtyCollections;
66     for (Akonadi::Collection::Id col : std::as_const(m_dirtyCollections)) {
67         scheduleCollection(Akonadi::Collection(col), true);
68     }
69 
70     bool initialIndexingDone = cfg.readEntry("initialIndexingDone", false);
71     if (!initialIndexingDone) {
72         KConfig baloorc(Akonadi::ServerManager::addNamespace(QStringLiteral("baloorc")));
73         KConfigGroup baloorcGroup = baloorc.group("Akonadi");
74         initialIndexingDone = baloorcGroup.readEntry("initialIndexingDone", false);
75         cfg.writeEntry("initialIndexingDone", initialIndexingDone);
76         baloorcGroup.deleteEntry("initialIndexingDone"); // make sure that editing akonadi_indexing_agentrc by hand works in the future
77     }
78     // Trigger a full sync initially
79     if (!initialIndexingDone) {
80         qCDebug(AKONADI_INDEXER_AGENT_LOG) << "initial indexing";
81         QMetaObject::invokeMethod(this, &Scheduler::scheduleCompleteSync, Qt::QueuedConnection);
82     }
83     cfg.writeEntry("initialIndexingDone", true);
84     cfg.sync();
85 }
86 
~Scheduler()87 Scheduler::~Scheduler()
88 {
89     collectDirtyCollections();
90 }
91 
setBusyTimeout(int timeout)92 void Scheduler::setBusyTimeout(int timeout)
93 {
94     m_busyTimeout = timeout;
95 }
96 
numberOfCollectionQueued() const97 int Scheduler::numberOfCollectionQueued() const
98 {
99     return m_collectionQueue.count();
100 }
101 
collectDirtyCollections()102 void Scheduler::collectDirtyCollections()
103 {
104     KConfigGroup cfg = m_config->group("General");
105     // Store collections where we did not manage to index all, we'll need to do a full sync for them the next time
106     QHash<Akonadi::Collection::Id, QQueue<Akonadi::Item::Id>>::ConstIterator it = m_queues.constBegin();
107     QHash<Akonadi::Collection::Id, QQueue<Akonadi::Item::Id>>::ConstIterator end = m_queues.constEnd();
108     for (; it != end; ++it) {
109         if (!it.value().isEmpty()) {
110             m_dirtyCollections.insert(it.key());
111         }
112     }
113     qCDebug(AKONADI_INDEXER_AGENT_LOG) << m_dirtyCollections;
114     cfg.writeEntry("dirtyCollections", m_dirtyCollections.values());
115     cfg.sync();
116 }
117 
scheduleCollection(const Akonadi::Collection & col,bool fullSync)118 void Scheduler::scheduleCollection(const Akonadi::Collection &col, bool fullSync)
119 {
120     if (!m_collectionQueue.contains(col.id())) {
121         m_collectionQueue.enqueue(col.id());
122     }
123     if (fullSync) {
124         m_dirtyCollections.insert(col.id());
125     }
126     processNext();
127 }
128 
addItem(const Akonadi::Item & item)129 void Scheduler::addItem(const Akonadi::Item &item)
130 {
131     Q_ASSERT(item.parentCollection().isValid());
132     m_lastModifiedTimestamps.insert(item.parentCollection().id(), QDateTime::currentMSecsSinceEpoch());
133     m_queues[item.parentCollection().id()].append(item.id());
134     // Move to the back
135     m_collectionQueue.removeOne(item.parentCollection().id());
136     m_collectionQueue.enqueue(item.parentCollection().id());
137     if (!m_processTimer.isActive()) {
138         m_processTimer.start();
139     }
140 }
141 
scheduleCompleteSync()142 void Scheduler::scheduleCompleteSync()
143 {
144     qCDebug(AKONADI_INDEXER_AGENT_LOG);
145     {
146         auto job = new Akonadi::CollectionFetchJob(Akonadi::Collection::root(), Akonadi::CollectionFetchJob::Recursive);
147         job->fetchScope().setAncestorRetrieval(Akonadi::CollectionFetchScope::All);
148         job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::Index);
149         job->fetchScope().fetchAttribute<Akonadi::IndexPolicyAttribute>();
150         connect(job, &KJob::finished, this, &Scheduler::slotRootCollectionsFetched);
151         job->start();
152     }
153 
154     // We want to index all collections, even if we don't index their content
155     {
156         auto job = new Akonadi::CollectionFetchJob(Akonadi::Collection::root(), Akonadi::CollectionFetchJob::Recursive);
157         job->fetchScope().setAncestorRetrieval(Akonadi::CollectionFetchScope::All);
158         job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::NoFilter);
159         job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::Index);
160         connect(job, &KJob::finished, this, &Scheduler::slotCollectionsToIndexFetched);
161         job->start();
162     }
163 }
164 
slotRootCollectionsFetched(KJob * kjob)165 void Scheduler::slotRootCollectionsFetched(KJob *kjob)
166 {
167     auto cjob = static_cast<Akonadi::CollectionFetchJob *>(kjob);
168     const Akonadi::Collection::List lstCols = cjob->collections();
169     for (const Akonadi::Collection &c : lstCols) {
170         // For skipping search collections
171         if (c.isVirtual()) {
172             continue;
173         }
174         if (c == Akonadi::Collection::root()) {
175             continue;
176         }
177         if (c.hasAttribute<Akonadi::IndexPolicyAttribute>() && !c.attribute<Akonadi::IndexPolicyAttribute>()->indexingEnabled()) {
178             continue;
179         }
180         scheduleCollection(c, true);
181     }
182 
183     // If we did not schedule any collection
184     if (m_collectionQueue.isEmpty()) {
185         qCDebug(AKONADI_INDEXER_AGENT_LOG) << "No collections scheduled";
186         Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
187     }
188 }
189 
slotCollectionsToIndexFetched(KJob * kjob)190 void Scheduler::slotCollectionsToIndexFetched(KJob *kjob)
191 {
192     auto cjob = static_cast<Akonadi::CollectionFetchJob *>(kjob);
193     const Akonadi::Collection::List lstCols = cjob->collections();
194     for (const Akonadi::Collection &c : lstCols) {
195         // For skipping search collections
196         if (c.isVirtual()) {
197             continue;
198         }
199         if (c == Akonadi::Collection::root()) {
200             continue;
201         }
202         if (c.hasAttribute<Akonadi::IndexPolicyAttribute>() && !c.attribute<Akonadi::IndexPolicyAttribute>()->indexingEnabled()) {
203             continue;
204         }
205         m_index.index(c);
206     }
207 }
208 
abort()209 void Scheduler::abort()
210 {
211     if (m_currentJob) {
212         m_currentJob->kill(KJob::Quietly);
213     }
214     m_currentJob = nullptr;
215     collectDirtyCollections();
216     m_collectionQueue.clear();
217     Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
218 }
219 
processNext()220 void Scheduler::processNext()
221 {
222     m_processTimer.stop();
223     if (m_currentJob) {
224         return;
225     }
226     if (m_collectionQueue.isEmpty()) {
227         qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Processing done";
228         Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
229         return;
230     }
231 
232     // An item was queued within the last 5 seconds, we're probably in the middle of a sync
233     const bool collectionIsChanging = (QDateTime::currentMSecsSinceEpoch() - m_lastModifiedTimestamps.value(m_collectionQueue.head())) < m_busyTimeout;
234     if (collectionIsChanging) {
235         // We're in the middle of something, wait with indexing
236         m_processTimer.start();
237         return;
238     }
239 
240     const Akonadi::Collection col(m_collectionQueue.takeFirst());
241     qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Processing collection: " << col.id();
242     QQueue<Akonadi::Item::Id> &itemQueue = m_queues[col.id()];
243     const bool fullSync = m_dirtyCollections.contains(col.id());
244     CollectionIndexingJob *job = m_jobFactory->createCollectionIndexingJob(m_index, col, itemQueue, fullSync, this);
245     itemQueue.clear();
246     job->setProperty("collection", col.id());
247     connect(job, &KJob::result, this, &Scheduler::slotIndexingFinished);
248     connect(job, &CollectionIndexingJob::status, this, &Scheduler::status);
249     connect(job, SIGNAL(percent(int)), this, SIGNAL(percent(int)));
250     m_currentJob = job;
251     job->start();
252 }
253 
slotIndexingFinished(KJob * job)254 void Scheduler::slotIndexingFinished(KJob *job)
255 {
256     if (job->error()) {
257         qCWarning(AKONADI_INDEXER_AGENT_LOG) << "Indexing failed: " << job->errorString();
258     } else {
259         const auto collectionId = job->property("collection").value<Akonadi::Collection::Id>();
260         m_dirtyCollections.remove(collectionId);
261         Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Collection \"%1\" indexed", collectionId));
262         Q_EMIT collectionIndexingFinished(collectionId);
263     }
264     m_currentJob = nullptr;
265     m_processTimer.start();
266 }
267