1 /*
2  * SPDX-FileCopyrightText: 2014 Christian Mollekopf <mollekopf@kolabsys.com>
3  *
4  * SPDX-License-Identifier: LGPL-2.1-or-later
5  *
6  */
7 
8 #include "batchfetcher.h"
9 
10 #include "imapresource_debug.h"
11 #include <KIMAP/Session>
BatchFetcher(MessageHelper::Ptr messageHelper,const KIMAP::ImapSet & set,const KIMAP::FetchJob::FetchScope & scope,int batchSize,KIMAP::Session * session)12 BatchFetcher::BatchFetcher(MessageHelper::Ptr messageHelper,
13                            const KIMAP::ImapSet &set,
14                            const KIMAP::FetchJob::FetchScope &scope,
15                            int batchSize,
16                            KIMAP::Session *session)
17     : KJob(session)
18     , m_currentSet(set)
19     , m_scope(scope)
20     , m_session(session)
21     , m_batchSize(batchSize)
22     , m_messageHelper(messageHelper)
23 {
24 }
25 
~BatchFetcher()26 BatchFetcher::~BatchFetcher()
27 {
28 }
29 
setUidBased(bool uidBased)30 void BatchFetcher::setUidBased(bool uidBased)
31 {
32     m_uidBased = uidBased;
33 }
34 
setSearchUids(const KIMAP::ImapInterval & interval)35 void BatchFetcher::setSearchUids(const KIMAP::ImapInterval &interval)
36 {
37     m_searchUidInterval = interval;
38 
39     // We look up the UIDs ourselves
40     m_currentSet = KIMAP::ImapSet();
41 
42     // MS Exchange can't handle big results so we have to split the search into small chunks
43     m_searchInChunks = m_session->serverGreeting().contains("Microsoft Exchange");
44 }
45 
setGmailExtensionsEnabled(bool enable)46 void BatchFetcher::setGmailExtensionsEnabled(bool enable)
47 {
48     m_gmailEnabled = enable;
49 }
50 
51 static const int maxAmountOfUidToSearchInOneTime = 2000;
52 
start()53 void BatchFetcher::start()
54 {
55     if (m_searchUidInterval.size()) {
56         // Search in chunks also Exchange can handle
57         const KIMAP::ImapInterval::Id firstUidToSearch = m_searchUidInterval.begin();
58         const KIMAP::ImapInterval::Id lastUidToSearch =
59             m_searchInChunks ? qMin(firstUidToSearch + maxAmountOfUidToSearchInOneTime - 1, m_searchUidInterval.end()) : m_searchUidInterval.end();
60 
61         // Prepare next chunk
62         const KIMAP::ImapInterval::Id intervalBegin = lastUidToSearch + 1;
63         // Or are we already done?
64         if (intervalBegin > m_searchUidInterval.end()) {
65             m_searchUidInterval = KIMAP::ImapInterval();
66         } else {
67             m_searchUidInterval.setBegin(intervalBegin);
68         }
69 
70         // Resolve the uid to sequence numbers
71         auto search = new KIMAP::SearchJob(m_session);
72         search->setUidBased(true);
73         search->setTerm(KIMAP::Term(KIMAP::Term::Uid, KIMAP::ImapSet(firstUidToSearch, lastUidToSearch)));
74         connect(search, &KIMAP::SearchJob::result, this, &BatchFetcher::onUidSearchDone);
75         search->start();
76     } else {
77         fetchNextBatch();
78     }
79 }
80 
onUidSearchDone(KJob * job)81 void BatchFetcher::onUidSearchDone(KJob *job)
82 {
83     if (job->error()) {
84         qCWarning(IMAPRESOURCE_LOG) << "Search job failed: " << job->errorString();
85         setError(KJob::UserDefinedError);
86         emitResult();
87         return;
88     }
89 
90     auto search = static_cast<KIMAP::SearchJob *>(job);
91     m_uidBased = search->isUidBased();
92     m_currentSet.add(search->results());
93 
94     // More to search?
95     start();
96 }
97 
fetchNextBatch()98 void BatchFetcher::fetchNextBatch()
99 {
100     if (m_fetchInProgress) {
101         m_continuationRequested = true;
102         return;
103     }
104     m_continuationRequested = false;
105     Q_ASSERT(m_batchSize > 0);
106     if (m_currentSet.isEmpty()) {
107         qCDebug(IMAPRESOURCE_LOG) << "fetch complete";
108         emitResult();
109         return;
110     }
111 
112     auto fetch = new KIMAP::FetchJob(m_session);
113     if (m_scope.changedSince != 0) {
114         qCDebug(IMAPRESOURCE_LOG) << "Fetching all messages in one batch.";
115         fetch->setSequenceSet(m_currentSet);
116         m_currentSet = KIMAP::ImapSet();
117     } else {
118         KIMAP::ImapSet toFetch;
119         qint64 counter = 0;
120         KIMAP::ImapSet newSet;
121 
122         // Take a chunk from the set
123         const auto intervals{m_currentSet.intervals()};
124         for (const KIMAP::ImapInterval &interval : intervals) {
125             if (!interval.hasDefinedEnd()) {
126                 // If we get an interval without a defined end we simply fetch everything
127                 qCDebug(IMAPRESOURCE_LOG) << "Received interval without defined end, fetching everything in one batch";
128                 toFetch.add(interval);
129                 newSet = KIMAP::ImapSet();
130                 break;
131             }
132             const qint64 wantedItems = m_batchSize - counter;
133             if (counter < m_batchSize) {
134                 if (interval.size() <= wantedItems) {
135                     counter += interval.size();
136                     toFetch.add(interval);
137                 } else {
138                     counter += wantedItems;
139                     toFetch.add(KIMAP::ImapInterval(interval.begin(), interval.begin() + wantedItems - 1));
140                     newSet.add(KIMAP::ImapInterval(interval.begin() + wantedItems, interval.end()));
141                 }
142             } else {
143                 newSet.add(interval);
144             }
145         }
146         qCDebug(IMAPRESOURCE_LOG) << "Fetching " << toFetch.intervals().size() << " intervals";
147         fetch->setSequenceSet(toFetch);
148         m_currentSet = newSet;
149     }
150 
151     fetch->setUidBased(m_uidBased);
152     fetch->setScope(m_scope);
153     fetch->setGmailExtensionsEnabled(m_gmailEnabled);
154     connect(fetch, &KIMAP::FetchJob::messagesAvailable, this, &BatchFetcher::onMessagesAvailable);
155     connect(fetch, &KJob::result, this, &BatchFetcher::onHeadersFetchDone);
156     m_fetchInProgress = true;
157     fetch->start();
158 }
159 
onMessagesAvailable(const QMap<qint64,KIMAP::Message> & messages)160 void BatchFetcher::onMessagesAvailable(const QMap<qint64, KIMAP::Message> &messages)
161 {
162     auto fetch = static_cast<KIMAP::FetchJob *>(sender());
163 
164     Akonadi::Item::List addedItems;
165     for (auto msg = messages.cbegin(), end = messages.cend(); msg != end; ++msg) {
166         // qDebug( 5327 ) << "Flags: " << i.flags();
167         bool ok;
168         const auto item = m_messageHelper->createItemFromMessage(msg->message, msg->uid, msg->size, msg->attributes, msg->flags, fetch->scope(), ok);
169         if (ok) {
170             m_fetchedItemsInCurrentBatch++;
171             addedItems << item;
172         }
173     }
174     //     qCDebug(IMAPRESOURCE_LOG) << addedItems.size();
175     if (!addedItems.isEmpty()) {
176         Q_EMIT itemsRetrieved(addedItems);
177     }
178 }
179 
onHeadersFetchDone(KJob * job)180 void BatchFetcher::onHeadersFetchDone(KJob *job)
181 {
182     m_fetchInProgress = false;
183     if (job->error()) {
184         qCWarning(IMAPRESOURCE_LOG) << "Fetch job failed " << job->errorString();
185         setError(KJob::UserDefinedError);
186         emitResult();
187         return;
188     }
189     if (m_currentSet.isEmpty()) {
190         emitResult();
191         return;
192     }
193     // Fetch more if we didn't deliver enough yet.
194     // This can happen because no message is in the fetched uid range, or if the translation failed
195     if (m_fetchedItemsInCurrentBatch < m_batchSize) {
196         fetchNextBatch();
197     } else {
198         m_fetchedItemsInCurrentBatch = 0;
199         // Also fetch more if we already got a continuation request during the fetch.
200         // This can happen if we deliver too many items during a previous batch (after using )
201         // Note that m_fetchedItemsInCurrentBatch will be off by the items that we delivered already.
202         if (m_continuationRequested) {
203             fetchNextBatch();
204         }
205     }
206 }
207