1 /****************************************************************************
2 **
3 ** Copyright (C) 2015 The Qt Company Ltd.
4 ** Contact: http://www.qt.io/licensing/
5 **
6 ** This file is part of the QtCore module of the Qt Toolkit.
7 **
8 ** $QT_BEGIN_LICENSE:LGPL$
9 ** Commercial License Usage
10 ** Licensees holding valid commercial Qt licenses may use this file in
11 ** accordance with the commercial license agreement provided with the
12 ** Software or, alternatively, in accordance with the terms contained in
13 ** a written agreement between you and The Qt Company. For licensing terms
14 ** and conditions see http://www.qt.io/terms-conditions. For further
15 ** information use the contact form at http://www.qt.io/contact-us.
16 **
17 ** GNU Lesser General Public License Usage
18 ** Alternatively, this file may be used under the terms of the GNU Lesser
19 ** General Public License version 2.1 or version 3 as published by the Free
20 ** Software Foundation and appearing in the file LICENSE.LGPLv21 and
21 ** LICENSE.LGPLv3 included in the packaging of this file. Please review the
22 ** following information to ensure the GNU Lesser General Public License
23 ** requirements will be met: https://www.gnu.org/licenses/lgpl.html and
24 ** http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
25 **
26 ** As a special exception, The Qt Company gives you certain additional
27 ** rights. These rights are described in The Qt Company LGPL Exception
28 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
29 **
30 ** GNU General Public License Usage
31 ** Alternatively, this file may be used under the terms of the GNU
32 ** General Public License version 3.0 as published by the Free Software
33 ** Foundation and appearing in the file LICENSE.GPL included in the
34 ** packaging of this file.  Please review the following information to
35 ** ensure the GNU General Public License version 3.0 requirements will be
36 ** met: http://www.gnu.org/copyleft/gpl.html.
37 **
38 ** $QT_END_LICENSE$
39 **
40 ****************************************************************************/
41 
42 // qfutureinterface.h included from qfuture.h
43 #include "qfuture.h"
44 
45 #ifndef QT_NO_QFUTURE
46 
47 #include <QtCore/qatomic.h>
48 #include <QtCore/qthread.h>
49 #include <QtCore/qthreadpool.h>
50 #include <private/qthreadpool_p.h>
51 
52 #include "qfutureinterface_p.h"
53 
54 QT_BEGIN_NAMESPACE
55 
56 enum {
57     MaxProgressEmitsPerSecond = 25
58 };
59 
QFutureInterfaceBase(State initialState)60 QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
61     : d(new QFutureInterfaceBasePrivate(initialState))
62 { }
63 
QFutureInterfaceBase(const QFutureInterfaceBase & other)64 QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
65     : d(other.d)
66 {
67     d->refCount.ref();
68 }
69 
~QFutureInterfaceBase()70 QFutureInterfaceBase::~QFutureInterfaceBase()
71 {
72     if (!d->refCount.deref())
73         delete d;
74 }
75 
cancel()76 void QFutureInterfaceBase::cancel()
77 {
78     QMutexLocker locker(&d->m_mutex);
79     if (d->state & Canceled)
80         return;
81 
82     d->state = State((d->state & ~Paused) | Canceled);
83     d->waitCondition.wakeAll();
84     d->pausedWaitCondition.wakeAll();
85     d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
86 }
87 
setPaused(bool paused)88 void QFutureInterfaceBase::setPaused(bool paused)
89 {
90     QMutexLocker locker(&d->m_mutex);
91     if (paused) {
92         d->state = State(d->state | Paused);
93         d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
94     } else {
95         d->state = State(d->state & ~Paused);
96         d->pausedWaitCondition.wakeAll();
97         d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
98     }
99 }
100 
togglePaused()101 void QFutureInterfaceBase::togglePaused()
102 {
103     QMutexLocker locker(&d->m_mutex);
104     if (d->state & Paused) {
105         d->state = State(d->state & ~Paused);
106         d->pausedWaitCondition.wakeAll();
107         d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
108     } else {
109         d->state = State(d->state | Paused);
110         d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
111     }
112 }
113 
setThrottled(bool enable)114 void QFutureInterfaceBase::setThrottled(bool enable)
115 {
116     // bail out if we are not changing the state
117     if ((enable && (d->state & Throttled)) || (!enable && !(d->state & Throttled)))
118         return;
119 
120     // lock and change the state
121     QMutexLocker lock(&d->m_mutex);
122     if (enable) {
123         d->state  = State(d->state | Throttled);
124     } else {
125         d->state  = State(d->state & ~Throttled);
126         if (!(d->state & Paused))
127             d->pausedWaitCondition.wakeAll();
128     }
129 }
130 
131 
isRunning() const132 bool QFutureInterfaceBase::isRunning() const
133 {
134     return queryState(Running);
135 }
136 
isStarted() const137 bool QFutureInterfaceBase::isStarted() const
138 {
139     return queryState(Started);
140 }
141 
isCanceled() const142 bool QFutureInterfaceBase::isCanceled() const
143 {
144     return queryState(Canceled);
145 }
146 
isFinished() const147 bool QFutureInterfaceBase::isFinished() const
148 {
149     return queryState(Finished);
150 }
151 
isPaused() const152 bool QFutureInterfaceBase::isPaused() const
153 {
154     return queryState(Paused);
155 }
156 
isThrottled() const157 bool QFutureInterfaceBase::isThrottled() const
158 {
159     return queryState(Throttled);
160 }
161 
isResultReadyAt(int index) const162 bool QFutureInterfaceBase::isResultReadyAt(int index) const
163 {
164     QMutexLocker lock(&d->m_mutex);
165     return d->internal_isResultReadyAt(index);
166 }
167 
waitForNextResult()168 bool QFutureInterfaceBase::waitForNextResult()
169 {
170     QMutexLocker lock(&d->m_mutex);
171     return d->internal_waitForNextResult();
172 }
173 
waitForResume()174 void QFutureInterfaceBase::waitForResume()
175 {
176     // return early if possible to avoid taking the mutex lock.
177     if ((d->state & Paused) == false || (d->state & Canceled))
178         return;
179 
180     QMutexLocker lock(&d->m_mutex);
181     if ((d->state & Paused) == false || (d->state & Canceled))
182         return;
183 
184     // decrease active thread count since this thread will wait.
185     QThreadPool::globalInstance()->releaseThread();
186 
187     d->pausedWaitCondition.wait(&d->m_mutex);
188 
189     QThreadPool::globalInstance()->reserveThread();
190 }
191 
progressValue() const192 int QFutureInterfaceBase::progressValue() const
193 {
194     return d->m_progressValue;
195 }
196 
progressMinimum() const197 int QFutureInterfaceBase::progressMinimum() const
198 {
199     return d->m_progressMinimum;
200 }
201 
progressMaximum() const202 int QFutureInterfaceBase::progressMaximum() const
203 {
204     return d->m_progressMaximum;
205 }
206 
resultCount() const207 int QFutureInterfaceBase::resultCount() const
208 {
209     QMutexLocker lock(&d->m_mutex);
210     return d->internal_resultCount();
211 }
212 
progressText() const213 QString QFutureInterfaceBase::progressText() const
214 {
215     QMutexLocker locker(&d->m_mutex);
216     return d->m_progressText;
217 }
218 
isProgressUpdateNeeded() const219 bool QFutureInterfaceBase::isProgressUpdateNeeded() const
220 {
221     QMutexLocker locker(&d->m_mutex);
222     return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
223 }
224 
reportStarted()225 void QFutureInterfaceBase::reportStarted()
226 {
227     QMutexLocker locker(&d->m_mutex);
228     if ((d->state & Started) || (d->state & Canceled) || (d->state & Finished))
229         return;
230 
231     d->setState(State(Started | Running));
232     d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
233 }
234 
reportCanceled()235 void QFutureInterfaceBase::reportCanceled()
236 {
237     cancel();
238 }
239 
240 #ifndef QT_NO_EXCEPTIONS
reportException(const QtConcurrent::Exception & exception)241 void QFutureInterfaceBase::reportException(const QtConcurrent::Exception &exception)
242 {
243     QMutexLocker locker(&d->m_mutex);
244     if ((d->state & Canceled) || (d->state & Finished))
245         return;
246 
247     d->m_exceptionStore.setException(exception);
248     d->state = State(d->state | Canceled);
249     d->waitCondition.wakeAll();
250     d->pausedWaitCondition.wakeAll();
251     d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
252 }
253 #endif
254 
reportFinished()255 void QFutureInterfaceBase::reportFinished()
256 {
257     QMutexLocker locker(&d->m_mutex);
258     if (!(d->state & Finished)) {
259         d->state = State((d->state & ~Running) | Finished);
260         d->waitCondition.wakeAll();
261         d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
262     }
263 }
264 
setExpectedResultCount(int resultCount)265 void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
266 {
267     if (d->manualProgress == false)
268         setProgressRange(0, resultCount);
269     d->m_expectedResultCount = resultCount;
270 }
271 
expectedResultCount()272 int QFutureInterfaceBase::expectedResultCount()
273 {
274     return d->m_expectedResultCount;
275 }
276 
queryState(State state) const277 bool QFutureInterfaceBase::queryState(State state) const
278 {
279     return (d->state & state);
280 }
281 
waitForResult(int resultIndex)282 void QFutureInterfaceBase::waitForResult(int resultIndex)
283 {
284     d->m_exceptionStore.throwPossibleException();
285 
286     if (!(d->state & Running))
287         return;
288 
289     // To avoid deadlocks and reduce the number of threads used, try to
290     // run the runnable in the current thread.
291     QThreadPool::globalInstance()->d_func()->stealRunnable(d->runnable);
292 
293     QMutexLocker lock(&d->m_mutex);
294 
295     if (!(d->state & Running))
296         return;
297 
298     const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
299     while ((d->state & Running) && d->internal_isResultReadyAt(waitIndex) == false)
300         d->waitCondition.wait(&d->m_mutex);
301 
302     d->m_exceptionStore.throwPossibleException();
303 }
304 
waitForFinished()305 void QFutureInterfaceBase::waitForFinished()
306 {
307     if (d->state & Running) {
308         QThreadPool::globalInstance()->d_func()->stealRunnable(d->runnable);
309 
310         QMutexLocker lock(&d->m_mutex);
311 
312         while (d->state & Running)
313             d->waitCondition.wait(&d->m_mutex);
314     }
315 
316     d->m_exceptionStore.throwPossibleException();
317 }
318 
reportResultsReady(int beginIndex,int endIndex)319 void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
320 {
321     if ((d->state & Canceled) || (d->state & Finished) || beginIndex == endIndex)
322         return;
323 
324     d->waitCondition.wakeAll();
325 
326     if (d->manualProgress == false) {
327         if (d->internal_updateProgress(d->m_progressValue + endIndex - beginIndex) == false) {
328             d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
329                                                beginIndex,
330                                                endIndex));
331             return;
332         }
333 
334         d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
335                                             d->m_progressValue,
336                                             d->m_progressText),
337                         QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
338                                             beginIndex,
339                                             endIndex));
340         return;
341     }
342     d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
343 }
344 
setRunnable(QRunnable * runnable)345 void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
346 {
347     d->runnable = runnable;
348 }
349 
setFilterMode(bool enable)350 void QFutureInterfaceBase::setFilterMode(bool enable)
351 {
352     QMutexLocker locker(&d->m_mutex);
353     resultStoreBase().setFilterMode(enable);
354 }
355 
setProgressRange(int minimum,int maximum)356 void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
357 {
358     QMutexLocker locker(&d->m_mutex);
359     d->m_progressMinimum = minimum;
360     d->m_progressMaximum = maximum;
361     d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
362 }
363 
setProgressValue(int progressValue)364 void QFutureInterfaceBase::setProgressValue(int progressValue)
365 {
366     setProgressValueAndText(progressValue, QString());
367 }
368 
setProgressValueAndText(int progressValue,const QString & progressText)369 void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
370                                                    const QString &progressText)
371 {
372     QMutexLocker locker(&d->m_mutex);
373     if (d->manualProgress == false)
374         d->manualProgress = true;
375     if (d->m_progressValue >= progressValue)
376         return;
377 
378     if ((d->state & Canceled) || (d->state & Finished))
379         return;
380 
381     if (d->internal_updateProgress(progressValue, progressText)) {
382         d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
383                                            d->m_progressValue,
384                                            d->m_progressText));
385     }
386 }
387 
mutex() const388 QMutex *QFutureInterfaceBase::mutex() const
389 {
390     return &d->m_mutex;
391 }
392 
exceptionStore()393 QtConcurrent::internal::ExceptionStore &QFutureInterfaceBase::exceptionStore()
394 {
395     return d->m_exceptionStore;
396 }
397 
resultStoreBase()398 QtConcurrent::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
399 {
400     return d->m_results;
401 }
402 
resultStoreBase() const403 const QtConcurrent::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
404 {
405     return d->m_results;
406 }
407 
operator =(const QFutureInterfaceBase & other)408 QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
409 {
410     other.d->refCount.ref();
411     if (!d->refCount.deref())
412         delete d;
413     d = other.d;
414     return *this;
415 }
416 
referenceCountIsOne() const417 bool QFutureInterfaceBase::referenceCountIsOne() const
418 {
419     return d->refCount == 1;
420 }
421 
QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)422 QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
423     : refCount(1), m_progressValue(0), m_progressMinimum(0), m_progressMaximum(0),
424       state(initialState), pendingResults(0),
425       manualProgress(false), m_expectedResultCount(0), runnable(0)
426 {
427     progressTime.invalidate();
428 }
429 
internal_resultCount() const430 int QFutureInterfaceBasePrivate::internal_resultCount() const
431 {
432     return m_results.count(); // ### subtract canceled results.
433 }
434 
internal_isResultReadyAt(int index) const435 bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const
436 {
437     return (m_results.contains(index));
438 }
439 
internal_waitForNextResult()440 bool QFutureInterfaceBasePrivate::internal_waitForNextResult()
441 {
442     if (m_results.hasNextResult())
443         return true;
444 
445     while ((state & QFutureInterfaceBase::Running) && m_results.hasNextResult() == false)
446         waitCondition.wait(&m_mutex);
447 
448     return (!(state & QFutureInterfaceBase::Canceled) && m_results.hasNextResult());
449 }
450 
internal_updateProgress(int progress,const QString & progressText)451 bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress,
452                                                           const QString &progressText)
453 {
454     if (m_progressValue >= progress)
455         return false;
456 
457     m_progressValue = progress;
458     m_progressText = progressText;
459 
460     if (progressTime.isValid() && m_progressValue != m_progressMaximum) // make sure the first and last steps are emitted.
461         if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
462             return false;
463 
464     progressTime.start();
465     return true;
466 }
467 
internal_setThrottled(bool enable)468 void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable)
469 {
470     // bail out if we are not changing the state
471     if ((enable && (state & QFutureInterfaceBase::Throttled))
472         || (!enable && !(state & QFutureInterfaceBase::Throttled)))
473         return;
474 
475     // change the state
476     if (enable) {
477         state  = QFutureInterfaceBase::State(state | QFutureInterfaceBase::Throttled);
478     } else {
479         state  = QFutureInterfaceBase::State(state & ~QFutureInterfaceBase::Throttled);
480         if (!(state & QFutureInterfaceBase::Paused))
481             pausedWaitCondition.wakeAll();
482     }
483 }
484 
sendCallOut(const QFutureCallOutEvent & callOutEvent)485 void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
486 {
487     if (outputConnections.isEmpty())
488         return;
489 
490     for (int i = 0; i < outputConnections.count(); ++i)
491         outputConnections.at(i)->postCallOutEvent(callOutEvent);
492 }
493 
sendCallOuts(const QFutureCallOutEvent & callOutEvent1,const QFutureCallOutEvent & callOutEvent2)494 void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
495                                      const QFutureCallOutEvent &callOutEvent2)
496 {
497     if (outputConnections.isEmpty())
498         return;
499 
500     for (int i = 0; i < outputConnections.count(); ++i) {
501         QFutureCallOutInterface *interface = outputConnections.at(i);
502         interface->postCallOutEvent(callOutEvent1);
503         interface->postCallOutEvent(callOutEvent2);
504     }
505 }
506 
507 // This function connects an output interface (for example a QFutureWatcher)
508 // to this future. While holding the lock we check the state and ready results
509 // and add the appropriate callouts to the queue. In order to avoid deadlocks,
510 // the actual callouts are made at the end while not holding the lock.
connectOutputInterface(QFutureCallOutInterface * interface)511 void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface)
512 {
513     QMutexLocker locker(&m_mutex);
514 
515     if (state & QFutureInterfaceBase::Started) {
516         interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
517         interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
518                                                         m_progressMinimum,
519                                                         m_progressMaximum));
520         interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
521                                                         m_progressValue,
522                                                         m_progressText));
523     }
524 
525     QtConcurrent::ResultIteratorBase it = m_results.begin();
526     while (it != m_results.end()) {
527         const int begin = it.resultIndex();
528         const int end = begin + it.batchSize();
529         interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
530                                                         begin,
531                                                         end));
532         it.batchedAdvance();
533     }
534 
535     if (state & QFutureInterfaceBase::Paused)
536         interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
537 
538     if (state & QFutureInterfaceBase::Canceled)
539         interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
540 
541     if (state & QFutureInterfaceBase::Finished)
542         interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
543 
544     outputConnections.append(interface);
545 }
546 
disconnectOutputInterface(QFutureCallOutInterface * interface)547 void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface)
548 {
549     QMutexLocker lock(&m_mutex);
550     const int index = outputConnections.indexOf(interface);
551     if (index == -1)
552         return;
553     outputConnections.removeAt(index);
554 
555     interface->callOutInterfaceDisconnected();
556 }
557 
setState(QFutureInterfaceBase::State newState)558 void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
559 {
560     state = newState;
561 }
562 
563 QT_END_NAMESPACE
564 
565 #endif // QT_NO_CONCURRENT
566