1 /*
2 * SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
5 *
6 */
7
8 #include "scheduler.h"
9
10 #include "akonadi_indexer_agent_debug.h"
11 #include "collectionindexingjob.h"
12
13 #include <Akonadi/AgentBase>
14 #include <Akonadi/CollectionFetchJob>
15 #include <Akonadi/CollectionFetchScope>
16 #include <Akonadi/IndexPolicyAttribute>
17 #include <Akonadi/ServerManager>
18
19 #include <KConfigGroup>
20 #include <KLocalizedString>
21
22 #include <QTimer>
23 #include <chrono>
24
25 using namespace std::chrono_literals;
26
~JobFactory()27 JobFactory::~JobFactory()
28 {
29 }
30
31 CollectionIndexingJob *
createCollectionIndexingJob(Index & index,const Akonadi::Collection & col,const QList<Akonadi::Item::Id> & pending,bool fullSync,QObject * parent)32 JobFactory::createCollectionIndexingJob(Index &index, const Akonadi::Collection &col, const QList<Akonadi::Item::Id> &pending, bool fullSync, QObject *parent)
33 {
34 auto job = new CollectionIndexingJob(index, col, pending, parent);
35 job->setFullSync(fullSync);
36 return job;
37 }
38
Scheduler(Index & index,const KSharedConfigPtr & config,const QSharedPointer<JobFactory> & jobFactory,QObject * parent)39 Scheduler::Scheduler(Index &index, const KSharedConfigPtr &config, const QSharedPointer<JobFactory> &jobFactory, QObject *parent)
40 : QObject(parent)
41 , m_config(config)
42 , m_index(index)
43 , m_jobFactory(jobFactory)
44 , m_busyTimeout(5000)
45 {
46 if (!m_jobFactory) {
47 m_jobFactory = QSharedPointer<JobFactory>(new JobFactory);
48 }
49 m_processTimer.setSingleShot(true);
50 m_processTimer.setInterval(100ms);
51 connect(&m_processTimer, &QTimer::timeout, this, &Scheduler::processNext);
52
53 KConfigGroup cfg = m_config->group("General");
54 const auto dirtyCollectionsResult2 = cfg.readEntry("dirtyCollections", QList<Akonadi::Collection::Id>());
55 m_dirtyCollections = QSet<Akonadi::Collection::Id>(dirtyCollectionsResult2.begin(), dirtyCollectionsResult2.end());
56 if (m_dirtyCollections.isEmpty()) {
57 KConfig baloorc(Akonadi::ServerManager::addNamespace(QStringLiteral("baloorc")));
58 KConfigGroup baloorcGroup = baloorc.group("Akonadi");
59
60 // Schedule collections we know have missing items from last time
61 const auto dirtyCollectionsResult = baloorcGroup.readEntry("dirtyCollections", QList<Akonadi::Collection::Id>());
62 m_dirtyCollections = QSet<Akonadi::Collection::Id>(dirtyCollectionsResult.begin(), dirtyCollectionsResult.end());
63 }
64
65 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Dirty collections " << m_dirtyCollections;
66 for (Akonadi::Collection::Id col : std::as_const(m_dirtyCollections)) {
67 scheduleCollection(Akonadi::Collection(col), true);
68 }
69
70 bool initialIndexingDone = cfg.readEntry("initialIndexingDone", false);
71 if (!initialIndexingDone) {
72 KConfig baloorc(Akonadi::ServerManager::addNamespace(QStringLiteral("baloorc")));
73 KConfigGroup baloorcGroup = baloorc.group("Akonadi");
74 initialIndexingDone = baloorcGroup.readEntry("initialIndexingDone", false);
75 cfg.writeEntry("initialIndexingDone", initialIndexingDone);
76 baloorcGroup.deleteEntry("initialIndexingDone"); // make sure that editing akonadi_indexing_agentrc by hand works in the future
77 }
78 // Trigger a full sync initially
79 if (!initialIndexingDone) {
80 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "initial indexing";
81 QMetaObject::invokeMethod(this, &Scheduler::scheduleCompleteSync, Qt::QueuedConnection);
82 }
83 cfg.writeEntry("initialIndexingDone", true);
84 cfg.sync();
85 }
86
~Scheduler()87 Scheduler::~Scheduler()
88 {
89 collectDirtyCollections();
90 }
91
setBusyTimeout(int timeout)92 void Scheduler::setBusyTimeout(int timeout)
93 {
94 m_busyTimeout = timeout;
95 }
96
numberOfCollectionQueued() const97 int Scheduler::numberOfCollectionQueued() const
98 {
99 return m_collectionQueue.count();
100 }
101
collectDirtyCollections()102 void Scheduler::collectDirtyCollections()
103 {
104 KConfigGroup cfg = m_config->group("General");
105 // Store collections where we did not manage to index all, we'll need to do a full sync for them the next time
106 QHash<Akonadi::Collection::Id, QQueue<Akonadi::Item::Id>>::ConstIterator it = m_queues.constBegin();
107 QHash<Akonadi::Collection::Id, QQueue<Akonadi::Item::Id>>::ConstIterator end = m_queues.constEnd();
108 for (; it != end; ++it) {
109 if (!it.value().isEmpty()) {
110 m_dirtyCollections.insert(it.key());
111 }
112 }
113 qCDebug(AKONADI_INDEXER_AGENT_LOG) << m_dirtyCollections;
114 cfg.writeEntry("dirtyCollections", m_dirtyCollections.values());
115 cfg.sync();
116 }
117
scheduleCollection(const Akonadi::Collection & col,bool fullSync)118 void Scheduler::scheduleCollection(const Akonadi::Collection &col, bool fullSync)
119 {
120 if (!m_collectionQueue.contains(col.id())) {
121 m_collectionQueue.enqueue(col.id());
122 }
123 if (fullSync) {
124 m_dirtyCollections.insert(col.id());
125 }
126 processNext();
127 }
128
addItem(const Akonadi::Item & item)129 void Scheduler::addItem(const Akonadi::Item &item)
130 {
131 Q_ASSERT(item.parentCollection().isValid());
132 m_lastModifiedTimestamps.insert(item.parentCollection().id(), QDateTime::currentMSecsSinceEpoch());
133 m_queues[item.parentCollection().id()].append(item.id());
134 // Move to the back
135 m_collectionQueue.removeOne(item.parentCollection().id());
136 m_collectionQueue.enqueue(item.parentCollection().id());
137 if (!m_processTimer.isActive()) {
138 m_processTimer.start();
139 }
140 }
141
scheduleCompleteSync()142 void Scheduler::scheduleCompleteSync()
143 {
144 qCDebug(AKONADI_INDEXER_AGENT_LOG);
145 {
146 auto job = new Akonadi::CollectionFetchJob(Akonadi::Collection::root(), Akonadi::CollectionFetchJob::Recursive);
147 job->fetchScope().setAncestorRetrieval(Akonadi::CollectionFetchScope::All);
148 job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::Index);
149 job->fetchScope().fetchAttribute<Akonadi::IndexPolicyAttribute>();
150 connect(job, &KJob::finished, this, &Scheduler::slotRootCollectionsFetched);
151 job->start();
152 }
153
154 // We want to index all collections, even if we don't index their content
155 {
156 auto job = new Akonadi::CollectionFetchJob(Akonadi::Collection::root(), Akonadi::CollectionFetchJob::Recursive);
157 job->fetchScope().setAncestorRetrieval(Akonadi::CollectionFetchScope::All);
158 job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::NoFilter);
159 job->fetchScope().setListFilter(Akonadi::CollectionFetchScope::Index);
160 connect(job, &KJob::finished, this, &Scheduler::slotCollectionsToIndexFetched);
161 job->start();
162 }
163 }
164
slotRootCollectionsFetched(KJob * kjob)165 void Scheduler::slotRootCollectionsFetched(KJob *kjob)
166 {
167 auto cjob = static_cast<Akonadi::CollectionFetchJob *>(kjob);
168 const Akonadi::Collection::List lstCols = cjob->collections();
169 for (const Akonadi::Collection &c : lstCols) {
170 // For skipping search collections
171 if (c.isVirtual()) {
172 continue;
173 }
174 if (c == Akonadi::Collection::root()) {
175 continue;
176 }
177 if (c.hasAttribute<Akonadi::IndexPolicyAttribute>() && !c.attribute<Akonadi::IndexPolicyAttribute>()->indexingEnabled()) {
178 continue;
179 }
180 scheduleCollection(c, true);
181 }
182
183 // If we did not schedule any collection
184 if (m_collectionQueue.isEmpty()) {
185 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "No collections scheduled";
186 Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
187 }
188 }
189
slotCollectionsToIndexFetched(KJob * kjob)190 void Scheduler::slotCollectionsToIndexFetched(KJob *kjob)
191 {
192 auto cjob = static_cast<Akonadi::CollectionFetchJob *>(kjob);
193 const Akonadi::Collection::List lstCols = cjob->collections();
194 for (const Akonadi::Collection &c : lstCols) {
195 // For skipping search collections
196 if (c.isVirtual()) {
197 continue;
198 }
199 if (c == Akonadi::Collection::root()) {
200 continue;
201 }
202 if (c.hasAttribute<Akonadi::IndexPolicyAttribute>() && !c.attribute<Akonadi::IndexPolicyAttribute>()->indexingEnabled()) {
203 continue;
204 }
205 m_index.index(c);
206 }
207 }
208
abort()209 void Scheduler::abort()
210 {
211 if (m_currentJob) {
212 m_currentJob->kill(KJob::Quietly);
213 }
214 m_currentJob = nullptr;
215 collectDirtyCollections();
216 m_collectionQueue.clear();
217 Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
218 }
219
processNext()220 void Scheduler::processNext()
221 {
222 m_processTimer.stop();
223 if (m_currentJob) {
224 return;
225 }
226 if (m_collectionQueue.isEmpty()) {
227 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Processing done";
228 Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Ready"));
229 return;
230 }
231
232 // An item was queued within the last 5 seconds, we're probably in the middle of a sync
233 const bool collectionIsChanging = (QDateTime::currentMSecsSinceEpoch() - m_lastModifiedTimestamps.value(m_collectionQueue.head())) < m_busyTimeout;
234 if (collectionIsChanging) {
235 // We're in the middle of something, wait with indexing
236 m_processTimer.start();
237 return;
238 }
239
240 const Akonadi::Collection col(m_collectionQueue.takeFirst());
241 qCDebug(AKONADI_INDEXER_AGENT_LOG) << "Processing collection: " << col.id();
242 QQueue<Akonadi::Item::Id> &itemQueue = m_queues[col.id()];
243 const bool fullSync = m_dirtyCollections.contains(col.id());
244 CollectionIndexingJob *job = m_jobFactory->createCollectionIndexingJob(m_index, col, itemQueue, fullSync, this);
245 itemQueue.clear();
246 job->setProperty("collection", col.id());
247 connect(job, &KJob::result, this, &Scheduler::slotIndexingFinished);
248 connect(job, &CollectionIndexingJob::status, this, &Scheduler::status);
249 connect(job, SIGNAL(percent(int)), this, SIGNAL(percent(int)));
250 m_currentJob = job;
251 job->start();
252 }
253
slotIndexingFinished(KJob * job)254 void Scheduler::slotIndexingFinished(KJob *job)
255 {
256 if (job->error()) {
257 qCWarning(AKONADI_INDEXER_AGENT_LOG) << "Indexing failed: " << job->errorString();
258 } else {
259 const auto collectionId = job->property("collection").value<Akonadi::Collection::Id>();
260 m_dirtyCollections.remove(collectionId);
261 Q_EMIT status(Akonadi::AgentBase::Idle, i18n("Collection \"%1\" indexed", collectionId));
262 Q_EMIT collectionIndexingFinished(collectionId);
263 }
264 m_currentJob = nullptr;
265 m_processTimer.start();
266 }
267