1 /****************************************************************************
2 **
3 ** Copyright (C) 2015 The Qt Company Ltd.
4 ** Contact: http://www.qt.io/licensing/
5 **
6 ** This file is part of the QtCore module of the Qt Toolkit.
7 **
8 ** $QT_BEGIN_LICENSE:LGPL$
9 ** Commercial License Usage
10 ** Licensees holding valid commercial Qt licenses may use this file in
11 ** accordance with the commercial license agreement provided with the
12 ** Software or, alternatively, in accordance with the terms contained in
13 ** a written agreement between you and The Qt Company. For licensing terms
14 ** and conditions see http://www.qt.io/terms-conditions. For further
15 ** information use the contact form at http://www.qt.io/contact-us.
16 **
17 ** GNU Lesser General Public License Usage
18 ** Alternatively, this file may be used under the terms of the GNU Lesser
19 ** General Public License version 2.1 or version 3 as published by the Free
20 ** Software Foundation and appearing in the file LICENSE.LGPLv21 and
21 ** LICENSE.LGPLv3 included in the packaging of this file. Please review the
22 ** following information to ensure the GNU Lesser General Public License
23 ** requirements will be met: https://www.gnu.org/licenses/lgpl.html and
24 ** http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
25 **
26 ** As a special exception, The Qt Company gives you certain additional
27 ** rights. These rights are described in The Qt Company LGPL Exception
28 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
29 **
30 ** GNU General Public License Usage
31 ** Alternatively, this file may be used under the terms of the GNU
32 ** General Public License version 3.0 as published by the Free Software
33 ** Foundation and appearing in the file LICENSE.GPL included in the
34 ** packaging of this file. Please review the following information to
35 ** ensure the GNU General Public License version 3.0 requirements will be
36 ** met: http://www.gnu.org/copyleft/gpl.html.
37 **
38 ** $QT_END_LICENSE$
39 **
40 ****************************************************************************/
41
42 // qfutureinterface.h included from qfuture.h
43 #include "qfuture.h"
44
45 #ifndef QT_NO_QFUTURE
46
47 #include <QtCore/qatomic.h>
48 #include <QtCore/qthread.h>
49 #include <QtCore/qthreadpool.h>
50 #include <private/qthreadpool_p.h>
51
52 #include "qfutureinterface_p.h"
53
54 QT_BEGIN_NAMESPACE
55
56 enum {
57 MaxProgressEmitsPerSecond = 25
58 };
59
QFutureInterfaceBase(State initialState)60 QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
61 : d(new QFutureInterfaceBasePrivate(initialState))
62 { }
63
QFutureInterfaceBase(const QFutureInterfaceBase & other)64 QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
65 : d(other.d)
66 {
67 d->refCount.ref();
68 }
69
~QFutureInterfaceBase()70 QFutureInterfaceBase::~QFutureInterfaceBase()
71 {
72 if (!d->refCount.deref())
73 delete d;
74 }
75
cancel()76 void QFutureInterfaceBase::cancel()
77 {
78 QMutexLocker locker(&d->m_mutex);
79 if (d->state & Canceled)
80 return;
81
82 d->state = State((d->state & ~Paused) | Canceled);
83 d->waitCondition.wakeAll();
84 d->pausedWaitCondition.wakeAll();
85 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
86 }
87
setPaused(bool paused)88 void QFutureInterfaceBase::setPaused(bool paused)
89 {
90 QMutexLocker locker(&d->m_mutex);
91 if (paused) {
92 d->state = State(d->state | Paused);
93 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
94 } else {
95 d->state = State(d->state & ~Paused);
96 d->pausedWaitCondition.wakeAll();
97 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
98 }
99 }
100
togglePaused()101 void QFutureInterfaceBase::togglePaused()
102 {
103 QMutexLocker locker(&d->m_mutex);
104 if (d->state & Paused) {
105 d->state = State(d->state & ~Paused);
106 d->pausedWaitCondition.wakeAll();
107 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
108 } else {
109 d->state = State(d->state | Paused);
110 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
111 }
112 }
113
setThrottled(bool enable)114 void QFutureInterfaceBase::setThrottled(bool enable)
115 {
116 // bail out if we are not changing the state
117 if ((enable && (d->state & Throttled)) || (!enable && !(d->state & Throttled)))
118 return;
119
120 // lock and change the state
121 QMutexLocker lock(&d->m_mutex);
122 if (enable) {
123 d->state = State(d->state | Throttled);
124 } else {
125 d->state = State(d->state & ~Throttled);
126 if (!(d->state & Paused))
127 d->pausedWaitCondition.wakeAll();
128 }
129 }
130
131
isRunning() const132 bool QFutureInterfaceBase::isRunning() const
133 {
134 return queryState(Running);
135 }
136
isStarted() const137 bool QFutureInterfaceBase::isStarted() const
138 {
139 return queryState(Started);
140 }
141
isCanceled() const142 bool QFutureInterfaceBase::isCanceled() const
143 {
144 return queryState(Canceled);
145 }
146
isFinished() const147 bool QFutureInterfaceBase::isFinished() const
148 {
149 return queryState(Finished);
150 }
151
isPaused() const152 bool QFutureInterfaceBase::isPaused() const
153 {
154 return queryState(Paused);
155 }
156
isThrottled() const157 bool QFutureInterfaceBase::isThrottled() const
158 {
159 return queryState(Throttled);
160 }
161
isResultReadyAt(int index) const162 bool QFutureInterfaceBase::isResultReadyAt(int index) const
163 {
164 QMutexLocker lock(&d->m_mutex);
165 return d->internal_isResultReadyAt(index);
166 }
167
waitForNextResult()168 bool QFutureInterfaceBase::waitForNextResult()
169 {
170 QMutexLocker lock(&d->m_mutex);
171 return d->internal_waitForNextResult();
172 }
173
waitForResume()174 void QFutureInterfaceBase::waitForResume()
175 {
176 // return early if possible to avoid taking the mutex lock.
177 if ((d->state & Paused) == false || (d->state & Canceled))
178 return;
179
180 QMutexLocker lock(&d->m_mutex);
181 if ((d->state & Paused) == false || (d->state & Canceled))
182 return;
183
184 // decrease active thread count since this thread will wait.
185 QThreadPool::globalInstance()->releaseThread();
186
187 d->pausedWaitCondition.wait(&d->m_mutex);
188
189 QThreadPool::globalInstance()->reserveThread();
190 }
191
progressValue() const192 int QFutureInterfaceBase::progressValue() const
193 {
194 return d->m_progressValue;
195 }
196
progressMinimum() const197 int QFutureInterfaceBase::progressMinimum() const
198 {
199 return d->m_progressMinimum;
200 }
201
progressMaximum() const202 int QFutureInterfaceBase::progressMaximum() const
203 {
204 return d->m_progressMaximum;
205 }
206
resultCount() const207 int QFutureInterfaceBase::resultCount() const
208 {
209 QMutexLocker lock(&d->m_mutex);
210 return d->internal_resultCount();
211 }
212
progressText() const213 QString QFutureInterfaceBase::progressText() const
214 {
215 QMutexLocker locker(&d->m_mutex);
216 return d->m_progressText;
217 }
218
isProgressUpdateNeeded() const219 bool QFutureInterfaceBase::isProgressUpdateNeeded() const
220 {
221 QMutexLocker locker(&d->m_mutex);
222 return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
223 }
224
reportStarted()225 void QFutureInterfaceBase::reportStarted()
226 {
227 QMutexLocker locker(&d->m_mutex);
228 if ((d->state & Started) || (d->state & Canceled) || (d->state & Finished))
229 return;
230
231 d->setState(State(Started | Running));
232 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
233 }
234
reportCanceled()235 void QFutureInterfaceBase::reportCanceled()
236 {
237 cancel();
238 }
239
240 #ifndef QT_NO_EXCEPTIONS
reportException(const QtConcurrent::Exception & exception)241 void QFutureInterfaceBase::reportException(const QtConcurrent::Exception &exception)
242 {
243 QMutexLocker locker(&d->m_mutex);
244 if ((d->state & Canceled) || (d->state & Finished))
245 return;
246
247 d->m_exceptionStore.setException(exception);
248 d->state = State(d->state | Canceled);
249 d->waitCondition.wakeAll();
250 d->pausedWaitCondition.wakeAll();
251 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
252 }
253 #endif
254
reportFinished()255 void QFutureInterfaceBase::reportFinished()
256 {
257 QMutexLocker locker(&d->m_mutex);
258 if (!(d->state & Finished)) {
259 d->state = State((d->state & ~Running) | Finished);
260 d->waitCondition.wakeAll();
261 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
262 }
263 }
264
setExpectedResultCount(int resultCount)265 void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
266 {
267 if (d->manualProgress == false)
268 setProgressRange(0, resultCount);
269 d->m_expectedResultCount = resultCount;
270 }
271
expectedResultCount()272 int QFutureInterfaceBase::expectedResultCount()
273 {
274 return d->m_expectedResultCount;
275 }
276
queryState(State state) const277 bool QFutureInterfaceBase::queryState(State state) const
278 {
279 return (d->state & state);
280 }
281
waitForResult(int resultIndex)282 void QFutureInterfaceBase::waitForResult(int resultIndex)
283 {
284 d->m_exceptionStore.throwPossibleException();
285
286 if (!(d->state & Running))
287 return;
288
289 // To avoid deadlocks and reduce the number of threads used, try to
290 // run the runnable in the current thread.
291 QThreadPool::globalInstance()->d_func()->stealRunnable(d->runnable);
292
293 QMutexLocker lock(&d->m_mutex);
294
295 if (!(d->state & Running))
296 return;
297
298 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
299 while ((d->state & Running) && d->internal_isResultReadyAt(waitIndex) == false)
300 d->waitCondition.wait(&d->m_mutex);
301
302 d->m_exceptionStore.throwPossibleException();
303 }
304
waitForFinished()305 void QFutureInterfaceBase::waitForFinished()
306 {
307 if (d->state & Running) {
308 QThreadPool::globalInstance()->d_func()->stealRunnable(d->runnable);
309
310 QMutexLocker lock(&d->m_mutex);
311
312 while (d->state & Running)
313 d->waitCondition.wait(&d->m_mutex);
314 }
315
316 d->m_exceptionStore.throwPossibleException();
317 }
318
reportResultsReady(int beginIndex,int endIndex)319 void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
320 {
321 if ((d->state & Canceled) || (d->state & Finished) || beginIndex == endIndex)
322 return;
323
324 d->waitCondition.wakeAll();
325
326 if (d->manualProgress == false) {
327 if (d->internal_updateProgress(d->m_progressValue + endIndex - beginIndex) == false) {
328 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
329 beginIndex,
330 endIndex));
331 return;
332 }
333
334 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
335 d->m_progressValue,
336 d->m_progressText),
337 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
338 beginIndex,
339 endIndex));
340 return;
341 }
342 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
343 }
344
setRunnable(QRunnable * runnable)345 void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
346 {
347 d->runnable = runnable;
348 }
349
setFilterMode(bool enable)350 void QFutureInterfaceBase::setFilterMode(bool enable)
351 {
352 QMutexLocker locker(&d->m_mutex);
353 resultStoreBase().setFilterMode(enable);
354 }
355
setProgressRange(int minimum,int maximum)356 void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
357 {
358 QMutexLocker locker(&d->m_mutex);
359 d->m_progressMinimum = minimum;
360 d->m_progressMaximum = maximum;
361 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
362 }
363
setProgressValue(int progressValue)364 void QFutureInterfaceBase::setProgressValue(int progressValue)
365 {
366 setProgressValueAndText(progressValue, QString());
367 }
368
setProgressValueAndText(int progressValue,const QString & progressText)369 void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
370 const QString &progressText)
371 {
372 QMutexLocker locker(&d->m_mutex);
373 if (d->manualProgress == false)
374 d->manualProgress = true;
375 if (d->m_progressValue >= progressValue)
376 return;
377
378 if ((d->state & Canceled) || (d->state & Finished))
379 return;
380
381 if (d->internal_updateProgress(progressValue, progressText)) {
382 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
383 d->m_progressValue,
384 d->m_progressText));
385 }
386 }
387
mutex() const388 QMutex *QFutureInterfaceBase::mutex() const
389 {
390 return &d->m_mutex;
391 }
392
exceptionStore()393 QtConcurrent::internal::ExceptionStore &QFutureInterfaceBase::exceptionStore()
394 {
395 return d->m_exceptionStore;
396 }
397
resultStoreBase()398 QtConcurrent::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
399 {
400 return d->m_results;
401 }
402
resultStoreBase() const403 const QtConcurrent::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
404 {
405 return d->m_results;
406 }
407
operator =(const QFutureInterfaceBase & other)408 QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
409 {
410 other.d->refCount.ref();
411 if (!d->refCount.deref())
412 delete d;
413 d = other.d;
414 return *this;
415 }
416
referenceCountIsOne() const417 bool QFutureInterfaceBase::referenceCountIsOne() const
418 {
419 return d->refCount == 1;
420 }
421
QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)422 QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
423 : refCount(1), m_progressValue(0), m_progressMinimum(0), m_progressMaximum(0),
424 state(initialState), pendingResults(0),
425 manualProgress(false), m_expectedResultCount(0), runnable(0)
426 {
427 progressTime.invalidate();
428 }
429
internal_resultCount() const430 int QFutureInterfaceBasePrivate::internal_resultCount() const
431 {
432 return m_results.count(); // ### subtract canceled results.
433 }
434
internal_isResultReadyAt(int index) const435 bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const
436 {
437 return (m_results.contains(index));
438 }
439
internal_waitForNextResult()440 bool QFutureInterfaceBasePrivate::internal_waitForNextResult()
441 {
442 if (m_results.hasNextResult())
443 return true;
444
445 while ((state & QFutureInterfaceBase::Running) && m_results.hasNextResult() == false)
446 waitCondition.wait(&m_mutex);
447
448 return (!(state & QFutureInterfaceBase::Canceled) && m_results.hasNextResult());
449 }
450
internal_updateProgress(int progress,const QString & progressText)451 bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress,
452 const QString &progressText)
453 {
454 if (m_progressValue >= progress)
455 return false;
456
457 m_progressValue = progress;
458 m_progressText = progressText;
459
460 if (progressTime.isValid() && m_progressValue != m_progressMaximum) // make sure the first and last steps are emitted.
461 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
462 return false;
463
464 progressTime.start();
465 return true;
466 }
467
internal_setThrottled(bool enable)468 void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable)
469 {
470 // bail out if we are not changing the state
471 if ((enable && (state & QFutureInterfaceBase::Throttled))
472 || (!enable && !(state & QFutureInterfaceBase::Throttled)))
473 return;
474
475 // change the state
476 if (enable) {
477 state = QFutureInterfaceBase::State(state | QFutureInterfaceBase::Throttled);
478 } else {
479 state = QFutureInterfaceBase::State(state & ~QFutureInterfaceBase::Throttled);
480 if (!(state & QFutureInterfaceBase::Paused))
481 pausedWaitCondition.wakeAll();
482 }
483 }
484
sendCallOut(const QFutureCallOutEvent & callOutEvent)485 void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
486 {
487 if (outputConnections.isEmpty())
488 return;
489
490 for (int i = 0; i < outputConnections.count(); ++i)
491 outputConnections.at(i)->postCallOutEvent(callOutEvent);
492 }
493
sendCallOuts(const QFutureCallOutEvent & callOutEvent1,const QFutureCallOutEvent & callOutEvent2)494 void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
495 const QFutureCallOutEvent &callOutEvent2)
496 {
497 if (outputConnections.isEmpty())
498 return;
499
500 for (int i = 0; i < outputConnections.count(); ++i) {
501 QFutureCallOutInterface *interface = outputConnections.at(i);
502 interface->postCallOutEvent(callOutEvent1);
503 interface->postCallOutEvent(callOutEvent2);
504 }
505 }
506
507 // This function connects an output interface (for example a QFutureWatcher)
508 // to this future. While holding the lock we check the state and ready results
509 // and add the appropriate callouts to the queue. In order to avoid deadlocks,
510 // the actual callouts are made at the end while not holding the lock.
connectOutputInterface(QFutureCallOutInterface * interface)511 void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface)
512 {
513 QMutexLocker locker(&m_mutex);
514
515 if (state & QFutureInterfaceBase::Started) {
516 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
517 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
518 m_progressMinimum,
519 m_progressMaximum));
520 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
521 m_progressValue,
522 m_progressText));
523 }
524
525 QtConcurrent::ResultIteratorBase it = m_results.begin();
526 while (it != m_results.end()) {
527 const int begin = it.resultIndex();
528 const int end = begin + it.batchSize();
529 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
530 begin,
531 end));
532 it.batchedAdvance();
533 }
534
535 if (state & QFutureInterfaceBase::Paused)
536 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
537
538 if (state & QFutureInterfaceBase::Canceled)
539 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
540
541 if (state & QFutureInterfaceBase::Finished)
542 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
543
544 outputConnections.append(interface);
545 }
546
disconnectOutputInterface(QFutureCallOutInterface * interface)547 void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface)
548 {
549 QMutexLocker lock(&m_mutex);
550 const int index = outputConnections.indexOf(interface);
551 if (index == -1)
552 return;
553 outputConnections.removeAt(index);
554
555 interface->callOutInterfaceDisconnected();
556 }
557
setState(QFutureInterfaceBase::State newState)558 void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
559 {
560 state = newState;
561 }
562
563 QT_END_NAMESPACE
564
565 #endif // QT_NO_CONCURRENT
566