1 /*
2 * SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * SPDX-License-Identifier: LGPL-2.1-or-later
5 *
6 */
7
8 #include "batchfetcher.h"
9
10 #include "imapresource_debug.h"
11 #include <KIMAP/Session>
BatchFetcher(MessageHelper::Ptr messageHelper,const KIMAP::ImapSet & set,const KIMAP::FetchJob::FetchScope & scope,int batchSize,KIMAP::Session * session)12 BatchFetcher::BatchFetcher(MessageHelper::Ptr messageHelper,
13 const KIMAP::ImapSet &set,
14 const KIMAP::FetchJob::FetchScope &scope,
15 int batchSize,
16 KIMAP::Session *session)
17 : KJob(session)
18 , m_currentSet(set)
19 , m_scope(scope)
20 , m_session(session)
21 , m_batchSize(batchSize)
22 , m_messageHelper(messageHelper)
23 {
24 }
25
~BatchFetcher()26 BatchFetcher::~BatchFetcher()
27 {
28 }
29
setUidBased(bool uidBased)30 void BatchFetcher::setUidBased(bool uidBased)
31 {
32 m_uidBased = uidBased;
33 }
34
setSearchUids(const KIMAP::ImapInterval & interval)35 void BatchFetcher::setSearchUids(const KIMAP::ImapInterval &interval)
36 {
37 m_searchUidInterval = interval;
38
39 // We look up the UIDs ourselves
40 m_currentSet = KIMAP::ImapSet();
41
42 // MS Exchange can't handle big results so we have to split the search into small chunks
43 m_searchInChunks = m_session->serverGreeting().contains("Microsoft Exchange");
44 }
45
setGmailExtensionsEnabled(bool enable)46 void BatchFetcher::setGmailExtensionsEnabled(bool enable)
47 {
48 m_gmailEnabled = enable;
49 }
50
51 static const int maxAmountOfUidToSearchInOneTime = 2000;
52
start()53 void BatchFetcher::start()
54 {
55 if (m_searchUidInterval.size()) {
56 // Search in chunks also Exchange can handle
57 const KIMAP::ImapInterval::Id firstUidToSearch = m_searchUidInterval.begin();
58 const KIMAP::ImapInterval::Id lastUidToSearch =
59 m_searchInChunks ? qMin(firstUidToSearch + maxAmountOfUidToSearchInOneTime - 1, m_searchUidInterval.end()) : m_searchUidInterval.end();
60
61 // Prepare next chunk
62 const KIMAP::ImapInterval::Id intervalBegin = lastUidToSearch + 1;
63 // Or are we already done?
64 if (intervalBegin > m_searchUidInterval.end()) {
65 m_searchUidInterval = KIMAP::ImapInterval();
66 } else {
67 m_searchUidInterval.setBegin(intervalBegin);
68 }
69
70 // Resolve the uid to sequence numbers
71 auto search = new KIMAP::SearchJob(m_session);
72 search->setUidBased(true);
73 search->setTerm(KIMAP::Term(KIMAP::Term::Uid, KIMAP::ImapSet(firstUidToSearch, lastUidToSearch)));
74 connect(search, &KIMAP::SearchJob::result, this, &BatchFetcher::onUidSearchDone);
75 search->start();
76 } else {
77 fetchNextBatch();
78 }
79 }
80
onUidSearchDone(KJob * job)81 void BatchFetcher::onUidSearchDone(KJob *job)
82 {
83 if (job->error()) {
84 qCWarning(IMAPRESOURCE_LOG) << "Search job failed: " << job->errorString();
85 setError(KJob::UserDefinedError);
86 emitResult();
87 return;
88 }
89
90 auto search = static_cast<KIMAP::SearchJob *>(job);
91 m_uidBased = search->isUidBased();
92 m_currentSet.add(search->results());
93
94 // More to search?
95 start();
96 }
97
fetchNextBatch()98 void BatchFetcher::fetchNextBatch()
99 {
100 if (m_fetchInProgress) {
101 m_continuationRequested = true;
102 return;
103 }
104 m_continuationRequested = false;
105 Q_ASSERT(m_batchSize > 0);
106 if (m_currentSet.isEmpty()) {
107 qCDebug(IMAPRESOURCE_LOG) << "fetch complete";
108 emitResult();
109 return;
110 }
111
112 auto fetch = new KIMAP::FetchJob(m_session);
113 if (m_scope.changedSince != 0) {
114 qCDebug(IMAPRESOURCE_LOG) << "Fetching all messages in one batch.";
115 fetch->setSequenceSet(m_currentSet);
116 m_currentSet = KIMAP::ImapSet();
117 } else {
118 KIMAP::ImapSet toFetch;
119 qint64 counter = 0;
120 KIMAP::ImapSet newSet;
121
122 // Take a chunk from the set
123 const auto intervals{m_currentSet.intervals()};
124 for (const KIMAP::ImapInterval &interval : intervals) {
125 if (!interval.hasDefinedEnd()) {
126 // If we get an interval without a defined end we simply fetch everything
127 qCDebug(IMAPRESOURCE_LOG) << "Received interval without defined end, fetching everything in one batch";
128 toFetch.add(interval);
129 newSet = KIMAP::ImapSet();
130 break;
131 }
132 const qint64 wantedItems = m_batchSize - counter;
133 if (counter < m_batchSize) {
134 if (interval.size() <= wantedItems) {
135 counter += interval.size();
136 toFetch.add(interval);
137 } else {
138 counter += wantedItems;
139 toFetch.add(KIMAP::ImapInterval(interval.begin(), interval.begin() + wantedItems - 1));
140 newSet.add(KIMAP::ImapInterval(interval.begin() + wantedItems, interval.end()));
141 }
142 } else {
143 newSet.add(interval);
144 }
145 }
146 qCDebug(IMAPRESOURCE_LOG) << "Fetching " << toFetch.intervals().size() << " intervals";
147 fetch->setSequenceSet(toFetch);
148 m_currentSet = newSet;
149 }
150
151 fetch->setUidBased(m_uidBased);
152 fetch->setScope(m_scope);
153 fetch->setGmailExtensionsEnabled(m_gmailEnabled);
154 connect(fetch, &KIMAP::FetchJob::messagesAvailable, this, &BatchFetcher::onMessagesAvailable);
155 connect(fetch, &KJob::result, this, &BatchFetcher::onHeadersFetchDone);
156 m_fetchInProgress = true;
157 fetch->start();
158 }
159
onMessagesAvailable(const QMap<qint64,KIMAP::Message> & messages)160 void BatchFetcher::onMessagesAvailable(const QMap<qint64, KIMAP::Message> &messages)
161 {
162 auto fetch = static_cast<KIMAP::FetchJob *>(sender());
163
164 Akonadi::Item::List addedItems;
165 for (auto msg = messages.cbegin(), end = messages.cend(); msg != end; ++msg) {
166 // qDebug( 5327 ) << "Flags: " << i.flags();
167 bool ok;
168 const auto item = m_messageHelper->createItemFromMessage(msg->message, msg->uid, msg->size, msg->attributes, msg->flags, fetch->scope(), ok);
169 if (ok) {
170 m_fetchedItemsInCurrentBatch++;
171 addedItems << item;
172 }
173 }
174 // qCDebug(IMAPRESOURCE_LOG) << addedItems.size();
175 if (!addedItems.isEmpty()) {
176 Q_EMIT itemsRetrieved(addedItems);
177 }
178 }
179
onHeadersFetchDone(KJob * job)180 void BatchFetcher::onHeadersFetchDone(KJob *job)
181 {
182 m_fetchInProgress = false;
183 if (job->error()) {
184 qCWarning(IMAPRESOURCE_LOG) << "Fetch job failed " << job->errorString();
185 setError(KJob::UserDefinedError);
186 emitResult();
187 return;
188 }
189 if (m_currentSet.isEmpty()) {
190 emitResult();
191 return;
192 }
193 // Fetch more if we didn't deliver enough yet.
194 // This can happen because no message is in the fetched uid range, or if the translation failed
195 if (m_fetchedItemsInCurrentBatch < m_batchSize) {
196 fetchNextBatch();
197 } else {
198 m_fetchedItemsInCurrentBatch = 0;
199 // Also fetch more if we already got a continuation request during the fetch.
200 // This can happen if we deliver too many items during a previous batch (after using )
201 // Note that m_fetchedItemsInCurrentBatch will be off by the items that we delivered already.
202 if (m_continuationRequested) {
203 fetchNextBatch();
204 }
205 }
206 }
207