1 /****************************************************************************
2 **
3 ** Copyright (C) 2016 The Qt Company Ltd.
4 ** Contact: https://www.qt.io/licensing/
5 **
6 ** This file is part of the QtConcurrent module of the Qt Toolkit.
7 **
8 ** $QT_BEGIN_LICENSE:LGPL$
9 ** Commercial License Usage
10 ** Licensees holding valid commercial Qt licenses may use this file in
11 ** accordance with the commercial license agreement provided with the
12 ** Software or, alternatively, in accordance with the terms contained in
13 ** a written agreement between you and The Qt Company. For licensing terms
14 ** and conditions see https://www.qt.io/terms-conditions. For further
15 ** information use the contact form at https://www.qt.io/contact-us.
16 **
17 ** GNU Lesser General Public License Usage
18 ** Alternatively, this file may be used under the terms of the GNU Lesser
19 ** General Public License version 3 as published by the Free Software
20 ** Foundation and appearing in the file LICENSE.LGPL3 included in the
21 ** packaging of this file. Please review the following information to
22 ** ensure the GNU Lesser General Public License version 3 requirements
23 ** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
24 **
25 ** GNU General Public License Usage
26 ** Alternatively, this file may be used under the terms of the GNU
27 ** General Public License version 2.0 or (at your option) the GNU General
28 ** Public license version 3 or any later version approved by the KDE Free
29 ** Qt Foundation. The licenses are as published by the Free Software
30 ** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
31 ** included in the packaging of this file. Please review the following
32 ** information to ensure the GNU General Public License requirements will
33 ** be met: https://www.gnu.org/licenses/gpl-2.0.html and
34 ** https://www.gnu.org/licenses/gpl-3.0.html.
35 **
36 ** $QT_END_LICENSE$
37 **
38 ****************************************************************************/
39 
40 #ifndef QTCONCURRENT_ITERATEKERNEL_H
41 #define QTCONCURRENT_ITERATEKERNEL_H
42 
43 #include <QtConcurrent/qtconcurrent_global.h>
44 
45 #if !defined(QT_NO_CONCURRENT) || defined(Q_CLANG_QDOC)
46 
47 #include <QtCore/qatomic.h>
48 #include <QtConcurrent/qtconcurrentmedian.h>
49 #include <QtConcurrent/qtconcurrentthreadengine.h>
50 
51 #include <iterator>
52 
53 QT_BEGIN_NAMESPACE
54 
55 
56 
57 namespace QtConcurrent {
58 
59 /*
60     The BlockSizeManager class manages how many iterations a thread should
61     reserve and process at a time. This is done by measuring the time spent
62     in the user code versus the control part code, and then increasing
63     the block size if the ratio between them is to small. The block size
64     management is done on the basis of the median of several timing measuremens,
65     and it is done induvidualy for each thread.
66 */
67 class Q_CONCURRENT_EXPORT BlockSizeManager
68 {
69 public:
70     BlockSizeManager(int iterationCount);
71     void timeBeforeUser();
72     void timeAfterUser();
73     int blockSize();
74 private:
blockSizeMaxed()75     inline bool blockSizeMaxed()
76     {
77         return (m_blockSize >= maxBlockSize);
78     }
79 
80     const int maxBlockSize;
81     qint64 beforeUser;
82     qint64 afterUser;
83     Median<double> controlPartElapsed;
84     Median<double> userPartElapsed;
85     int m_blockSize;
86 
87     Q_DISABLE_COPY(BlockSizeManager)
88 };
89 
90 // ### Qt6: Replace BlockSizeManager with V2 implementation
91 class Q_CONCURRENT_EXPORT BlockSizeManagerV2
92 {
93 public:
94     explicit BlockSizeManagerV2(int iterationCount);
95 
96     void timeBeforeUser();
97     void timeAfterUser();
98     int blockSize();
99 
100 private:
blockSizeMaxed()101     inline bool blockSizeMaxed()
102     {
103         return (m_blockSize >= maxBlockSize);
104     }
105 
106     const int maxBlockSize;
107     qint64 beforeUser;
108     qint64 afterUser;
109     MedianDouble controlPartElapsed;
110     MedianDouble userPartElapsed;
111     int m_blockSize;
112 
113     Q_DISABLE_COPY(BlockSizeManagerV2)
114 };
115 
116 template <typename T>
117 class ResultReporter
118 {
119 public:
ResultReporter(ThreadEngine<T> * _threadEngine)120     ResultReporter(ThreadEngine<T> *_threadEngine)
121     :threadEngine(_threadEngine)
122     {
123 
124     }
125 
reserveSpace(int resultCount)126     void reserveSpace(int resultCount)
127     {
128         currentResultCount = resultCount;
129         vector.resize(qMax(resultCount, vector.count()));
130     }
131 
reportResults(int begin)132     void reportResults(int begin)
133     {
134         const int useVectorThreshold = 4; // Tunable parameter.
135         if (currentResultCount > useVectorThreshold) {
136             vector.resize(currentResultCount);
137             threadEngine->reportResults(vector, begin);
138         } else {
139             for (int i = 0; i < currentResultCount; ++i)
140                 threadEngine->reportResult(&vector.at(i), begin + i);
141         }
142     }
143 
getPointer()144     inline T * getPointer()
145     {
146         return vector.data();
147     }
148 
149     int currentResultCount;
150     ThreadEngine<T> *threadEngine;
151     QVector<T> vector;
152 };
153 
154 template <>
155 class ResultReporter<void>
156 {
157 public:
ResultReporter(ThreadEngine<void> *)158     inline ResultReporter(ThreadEngine<void> *) { }
reserveSpace(int)159     inline void reserveSpace(int) { }
reportResults(int)160     inline void reportResults(int) { }
getPointer()161     inline void * getPointer() { return nullptr; }
162 };
163 
selectIteration(std::bidirectional_iterator_tag)164 inline bool selectIteration(std::bidirectional_iterator_tag)
165 {
166     return false; // while
167 }
168 
selectIteration(std::forward_iterator_tag)169 inline bool selectIteration(std::forward_iterator_tag)
170 {
171     return false; // while
172 }
173 
selectIteration(std::random_access_iterator_tag)174 inline bool selectIteration(std::random_access_iterator_tag)
175 {
176     return true; // for
177 }
178 
179 template <typename Iterator, typename T>
180 class IterateKernel : public ThreadEngine<T>
181 {
182 public:
183     typedef T ResultType;
184 
IterateKernel(Iterator _begin,Iterator _end)185     IterateKernel(Iterator _begin, Iterator _end)
186         : begin(_begin), end(_end), current(_begin), currentIndex(0),
187            forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category())), progressReportingEnabled(true)
188     {
189         iterationCount =  forIteration ? std::distance(_begin, _end) : 0;
190     }
191 
~IterateKernel()192     virtual ~IterateKernel() { }
193 
runIteration(Iterator it,int index,T * result)194     virtual bool runIteration(Iterator it, int index , T *result)
195         { Q_UNUSED(it); Q_UNUSED(index); Q_UNUSED(result); return false; }
runIterations(Iterator _begin,int beginIndex,int endIndex,T * results)196     virtual bool runIterations(Iterator _begin, int beginIndex, int endIndex, T *results)
197         { Q_UNUSED(_begin); Q_UNUSED(beginIndex); Q_UNUSED(endIndex); Q_UNUSED(results); return false; }
198 
start()199     void start() override
200     {
201         progressReportingEnabled = this->isProgressReportingEnabled();
202         if (progressReportingEnabled && iterationCount > 0)
203             this->setProgressRange(0, iterationCount);
204     }
205 
shouldStartThread()206     bool shouldStartThread() override
207     {
208         if (forIteration)
209             return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
210         else // whileIteration
211             return (iteratorThreads.loadRelaxed() == 0);
212     }
213 
threadFunction()214     ThreadFunctionResult threadFunction() override
215     {
216         if (forIteration)
217             return this->forThreadFunction();
218         else // whileIteration
219             return this->whileThreadFunction();
220     }
221 
forThreadFunction()222     ThreadFunctionResult forThreadFunction()
223     {
224         BlockSizeManagerV2 blockSizeManager(iterationCount);
225         ResultReporter<T> resultReporter(this);
226 
227         for(;;) {
228             if (this->isCanceled())
229                 break;
230 
231             const int currentBlockSize = blockSizeManager.blockSize();
232 
233             if (currentIndex.loadRelaxed() >= iterationCount)
234                 break;
235 
236             // Atomically reserve a block of iterationCount for this thread.
237             const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
238             const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
239 
240             if (beginIndex >= endIndex) {
241                 // No more work
242                 break;
243             }
244 
245             this->waitForResume(); // (only waits if the qfuture is paused.)
246 
247             if (shouldStartThread())
248                 this->startThread();
249 
250             const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
251             resultReporter.reserveSpace(finalBlockSize);
252 
253             // Call user code with the current iteration range.
254             blockSizeManager.timeBeforeUser();
255             const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
256             blockSizeManager.timeAfterUser();
257 
258             if (resultsAvailable)
259                 resultReporter.reportResults(beginIndex);
260 
261             // Report progress if progress reporting enabled.
262             if (progressReportingEnabled) {
263                 completed.fetchAndAddAcquire(finalBlockSize);
264                 this->setProgressValue(this->completed.loadRelaxed());
265             }
266 
267             if (this->shouldThrottleThread())
268                 return ThrottleThread;
269         }
270         return ThreadFinished;
271     }
272 
whileThreadFunction()273     ThreadFunctionResult whileThreadFunction()
274     {
275         if (iteratorThreads.testAndSetAcquire(0, 1) == false)
276             return ThreadFinished;
277 
278         ResultReporter<T> resultReporter(this);
279         resultReporter.reserveSpace(1);
280 
281         while (current != end) {
282             // The following two lines breaks support for input iterators according to
283             // the sgi docs: dereferencing prev after calling ++current is not allowed
284             // on input iterators. (prev is dereferenced inside user.runIteration())
285             Iterator prev = current;
286             ++current;
287             int index = currentIndex.fetchAndAddRelaxed(1);
288             iteratorThreads.testAndSetRelease(1, 0);
289 
290             this->waitForResume(); // (only waits if the qfuture is paused.)
291 
292             if (shouldStartThread())
293                 this->startThread();
294 
295             const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
296             if (resultAavailable)
297                 resultReporter.reportResults(index);
298 
299             if (this->shouldThrottleThread())
300                 return ThrottleThread;
301 
302             if (iteratorThreads.testAndSetAcquire(0, 1) == false)
303                 return ThreadFinished;
304         }
305 
306         return ThreadFinished;
307     }
308 
309 
310 public:
311     const Iterator begin;
312     const Iterator end;
313     Iterator current;
314     QAtomicInt currentIndex;
315     bool forIteration;
316     QAtomicInt iteratorThreads;
317     int iterationCount;
318 
319     bool progressReportingEnabled;
320     QAtomicInt completed;
321 };
322 
323 } // namespace QtConcurrent
324 
325 
326 QT_END_NAMESPACE
327 
328 #endif // QT_NO_CONCURRENT
329 
330 #endif
331