1 /* -*- C++ -*-
2 This file contains a benchmark for job processing in ThreadWeaver.
3
4 SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7 */
8
9 #include <numeric>
10
11 #include <QCoreApplication>
12 #include <QList>
13 #include <QString>
14 #include <QTest>
15 #include <QtDebug>
16
17 #include <ThreadWeaver/Collection>
18 #include <ThreadWeaver/Job>
19 #include <ThreadWeaver/Queueing>
20 #include <ThreadWeaver/Sequence>
21 #include <ThreadWeaver/ThreadWeaver>
22
23 class AccumulateJob : public ThreadWeaver::Job
24 {
25 public:
AccumulateJob()26 explicit AccumulateJob()
27 : m_count(0)
28 , m_result(0)
29 {
30 }
31
AccumulateJob(const AccumulateJob & a)32 AccumulateJob(const AccumulateJob &a)
33 : ThreadWeaver::Job()
34 , m_count(a.m_count)
35 , m_result(a.m_result)
36 {
37 }
38
setCount(quint64 count)39 void setCount(quint64 count)
40 {
41 m_count = count;
42 }
43
result() const44 quint64 result() const
45 {
46 return m_result;
47 }
48
payload()49 void payload()
50 {
51 std::vector<quint64> numbers(m_count);
52 std::generate(numbers.begin(), numbers.end(), []() -> quint64 {
53 static quint64 i = 0;
54 return i++;
55 });
56 m_result = std::accumulate(numbers.begin(), numbers.end(), 0);
57 }
58
59 protected:
run(ThreadWeaver::JobPointer,ThreadWeaver::Thread *)60 void run(ThreadWeaver::JobPointer, ThreadWeaver::Thread *) override
61 {
62 payload();
63 }
64
65 private:
66 quint64 m_count;
67 quint64 m_result;
68 };
69
70 class QueueBenchmarksTest : public QObject
71 {
72 Q_OBJECT
73
74 public:
75 QueueBenchmarksTest();
76
77 private Q_SLOTS:
78 void initTestCase();
79 void cleanupTestCase();
80 void BaselineBenchmark();
81 void BaselineBenchmark_data();
82 void BaselineAsJobsBenchmark();
83 void BaselineAsJobsBenchmark_data();
84 void IndividualJobsBenchmark();
85 void IndividualJobsBenchmark_data();
86 void CollectionsBenchmark();
87 void CollectionsBenchmark_data();
88 void SequencesBenchmark();
89 void SequencesBenchmark_data();
90
91 private:
92 void defaultBenchmarkData(bool singleThreaded);
93 };
94
QueueBenchmarksTest()95 QueueBenchmarksTest::QueueBenchmarksTest()
96 {
97 }
98
initTestCase()99 void QueueBenchmarksTest::initTestCase()
100 {
101 }
102
cleanupTestCase()103 void QueueBenchmarksTest::cleanupTestCase()
104 {
105 }
106
107 /** @brief BaselineBenchmark simply performs the same operations in a loop.
108 *
109 * The result amounts to what time the jobs used in the benchmark need to execute without queueing or thread
110 * synchronization overhead. */
BaselineBenchmark()111 void QueueBenchmarksTest::BaselineBenchmark()
112 {
113 QFETCH(int, m);
114 QFETCH(int, c);
115 QFETCH(int, b);
116 QFETCH(int, t);
117 const int n = c * b;
118 Q_UNUSED(t); // in this case
119
120 QVector<AccumulateJob> jobs(n);
121 for (int i = 0; i < n; ++i) {
122 jobs[i].setCount(m);
123 }
124
125 // executeLocal needs to emit similar signals as execute(), to be comparable to the threaded variants.
126 // BaselineAsJobsBenchmark does that. Compare BaselineAsJobsBenchmark and BaselineBenchmark to evaluate the overhead of executing
127 // an operation in a job.
128 QBENCHMARK {
129 for (int i = 0; i < n; ++i) {
130 jobs[i].payload();
131 }
132 }
133 }
134
BaselineBenchmark_data()135 void QueueBenchmarksTest::BaselineBenchmark_data()
136 {
137 defaultBenchmarkData(true);
138 }
139
BaselineAsJobsBenchmark()140 void QueueBenchmarksTest::BaselineAsJobsBenchmark()
141 {
142 QFETCH(int, m);
143 QFETCH(int, c);
144 QFETCH(int, b);
145 QFETCH(int, t);
146 const int n = c * b;
147 Q_UNUSED(t); // in this case
148
149 QVector<AccumulateJob> jobs(n);
150 for (int i = 0; i < n; ++i) {
151 jobs[i].setCount(m);
152 }
153
154 QBENCHMARK {
155 for (int i = 0; i < n; ++i) {
156 jobs[i].blockingExecute();
157 }
158 }
159 }
160
BaselineAsJobsBenchmark_data()161 void QueueBenchmarksTest::BaselineAsJobsBenchmark_data()
162 {
163 defaultBenchmarkData(true);
164 }
165
IndividualJobsBenchmark()166 void QueueBenchmarksTest::IndividualJobsBenchmark()
167 {
168 QFETCH(int, m);
169 QFETCH(int, c);
170 QFETCH(int, b);
171 QFETCH(int, t);
172 const int n = c * b;
173
174 ThreadWeaver::Queue weaver;
175 weaver.setMaximumNumberOfThreads(t);
176 weaver.suspend();
177 QVector<AccumulateJob> jobs(n);
178 {
179 ThreadWeaver::QueueStream stream(&weaver);
180 for (int i = 0; i < n; ++i) {
181 jobs[i].setCount(m);
182 stream << jobs[i];
183 }
184 }
185 QBENCHMARK_ONCE {
186 weaver.resume();
187 weaver.finish();
188 }
189 }
190
IndividualJobsBenchmark_data()191 void QueueBenchmarksTest::IndividualJobsBenchmark_data()
192 {
193 defaultBenchmarkData(false);
194 }
195
CollectionsBenchmark()196 void QueueBenchmarksTest::CollectionsBenchmark()
197 {
198 QFETCH(int, m);
199 QFETCH(int, c);
200 QFETCH(int, b);
201 QFETCH(int, t);
202 const int n = c * b;
203
204 ThreadWeaver::Queue weaver;
205 weaver.setMaximumNumberOfThreads(t);
206 weaver.suspend();
207 QVector<AccumulateJob> jobs(n);
208
209 // FIXME currently, memory management of the job sequences (they are deleted when they go out of scope)
210 // is measured as part of the benchmark
211 qDebug() << b << "blocks" << c << "operations, queueing...";
212 // queue the jobs blockwise as collections
213 for (int block = 0; block < b; ++block) {
214 ThreadWeaver::Collection *collection = new ThreadWeaver::Collection();
215 for (int operation = 0; operation < c; ++operation) {
216 const int index = block * b + operation;
217 jobs[index].setCount(m);
218 *collection << jobs[index];
219 }
220 weaver.stream() << collection;
221 }
222
223 qDebug() << b << "blocks" << c << "operations, executing...";
224 QBENCHMARK_ONCE {
225 weaver.resume();
226 weaver.finish();
227 }
228 }
229
CollectionsBenchmark_data()230 void QueueBenchmarksTest::CollectionsBenchmark_data()
231 {
232 defaultBenchmarkData(false);
233 }
234
SequencesBenchmark()235 void QueueBenchmarksTest::SequencesBenchmark()
236 {
237 QFETCH(int, m);
238 QFETCH(int, c);
239 QFETCH(int, b);
240 QFETCH(int, t);
241 const int n = c * b;
242
243 ThreadWeaver::Queue weaver;
244 weaver.setMaximumNumberOfThreads(t);
245 weaver.suspend();
246 QVector<AccumulateJob> jobs(n);
247
248 qDebug() << b << "blocks" << c << "operations, queueing...";
249 // queue the jobs blockwise as collections
250 for (int block = 0; block < b; ++block) {
251 ThreadWeaver::Sequence *sequence = new ThreadWeaver::Sequence();
252 for (int operation = 0; operation < c; ++operation) {
253 const int index = block * b + operation;
254 jobs[index].setCount(m);
255 *sequence << jobs[index];
256 }
257 weaver.stream() << sequence;
258 }
259
260 qDebug() << b << "blocks" << c << "operations, executing...";
261 QBENCHMARK_ONCE {
262 weaver.resume();
263 weaver.finish();
264 }
265 }
266
SequencesBenchmark_data()267 void QueueBenchmarksTest::SequencesBenchmark_data()
268 {
269 defaultBenchmarkData(false);
270 }
271
defaultBenchmarkData(bool singleThreaded)272 void QueueBenchmarksTest::defaultBenchmarkData(bool singleThreaded)
273 {
274 QTest::addColumn<int>("m"); // number of quint64's to accumulate
275 QTest::addColumn<int>("c"); // operations per block
276 QTest::addColumn<int>("b"); // number of blocks, number of jobs is b*c
277 QTest::addColumn<int>("t"); // number of worker threads
278
279 const QList<int> threads = singleThreaded ? QList<int>() << 1 : QList<int>() << 1 << 2 << 4 << 8 << 16 << 32 << 64 << 128;
280 const QList<int> ms = QList<int>() << 1 << 10 << 100 << 1000 << 10000 << 100000;
281 for (int m : ms) {
282 for (int t : threads) {
283 const QString name = QString::fromLatin1("%1 threads, %2 values").arg(t).arg(m);
284 // newRow expects const char*, but then qstrdup's it in the QTestData constructor. Eeeew.
285 QTest::newRow(qPrintable(name)) << m << 256 << 256 << t;
286 }
287 }
288 }
289
290 QTEST_MAIN(QueueBenchmarksTest)
291
292 #include "QueueBenchmarks.moc"
293