1 /* -*- C++ -*-
2     This file contains a benchmark for job processing in ThreadWeaver.
3 
4     SPDX-FileCopyrightText: 2005-2013 Mirko Boehm <mirko@kde.org>
5 
6     SPDX-License-Identifier: LGPL-2.0-or-later
7 */
8 
9 #include <numeric>
10 
11 #include <QCoreApplication>
12 #include <QList>
13 #include <QString>
14 #include <QTest>
15 #include <QtDebug>
16 
17 #include <ThreadWeaver/Collection>
18 #include <ThreadWeaver/Job>
19 #include <ThreadWeaver/Queueing>
20 #include <ThreadWeaver/Sequence>
21 #include <ThreadWeaver/ThreadWeaver>
22 
23 class AccumulateJob : public ThreadWeaver::Job
24 {
25 public:
AccumulateJob()26     explicit AccumulateJob()
27         : m_count(0)
28         , m_result(0)
29     {
30     }
31 
AccumulateJob(const AccumulateJob & a)32     AccumulateJob(const AccumulateJob &a)
33         : ThreadWeaver::Job()
34         , m_count(a.m_count)
35         , m_result(a.m_result)
36     {
37     }
38 
setCount(quint64 count)39     void setCount(quint64 count)
40     {
41         m_count = count;
42     }
43 
result() const44     quint64 result() const
45     {
46         return m_result;
47     }
48 
payload()49     void payload()
50     {
51         std::vector<quint64> numbers(m_count);
52         std::generate(numbers.begin(), numbers.end(), []() -> quint64 {
53             static quint64 i = 0;
54             return i++;
55         });
56         m_result = std::accumulate(numbers.begin(), numbers.end(), 0);
57     }
58 
59 protected:
run(ThreadWeaver::JobPointer,ThreadWeaver::Thread *)60     void run(ThreadWeaver::JobPointer, ThreadWeaver::Thread *) override
61     {
62         payload();
63     }
64 
65 private:
66     quint64 m_count;
67     quint64 m_result;
68 };
69 
70 class QueueBenchmarksTest : public QObject
71 {
72     Q_OBJECT
73 
74 public:
75     QueueBenchmarksTest();
76 
77 private Q_SLOTS:
78     void initTestCase();
79     void cleanupTestCase();
80     void BaselineBenchmark();
81     void BaselineBenchmark_data();
82     void BaselineAsJobsBenchmark();
83     void BaselineAsJobsBenchmark_data();
84     void IndividualJobsBenchmark();
85     void IndividualJobsBenchmark_data();
86     void CollectionsBenchmark();
87     void CollectionsBenchmark_data();
88     void SequencesBenchmark();
89     void SequencesBenchmark_data();
90 
91 private:
92     void defaultBenchmarkData(bool singleThreaded);
93 };
94 
QueueBenchmarksTest()95 QueueBenchmarksTest::QueueBenchmarksTest()
96 {
97 }
98 
initTestCase()99 void QueueBenchmarksTest::initTestCase()
100 {
101 }
102 
cleanupTestCase()103 void QueueBenchmarksTest::cleanupTestCase()
104 {
105 }
106 
107 /** @brief BaselineBenchmark simply performs the same operations in a loop.
108  *
109  * The result amounts to what time the jobs used in the benchmark need to execute without queueing or thread
110  * synchronization overhead. */
BaselineBenchmark()111 void QueueBenchmarksTest::BaselineBenchmark()
112 {
113     QFETCH(int, m);
114     QFETCH(int, c);
115     QFETCH(int, b);
116     QFETCH(int, t);
117     const int n = c * b;
118     Q_UNUSED(t); // in this case
119 
120     QVector<AccumulateJob> jobs(n);
121     for (int i = 0; i < n; ++i) {
122         jobs[i].setCount(m);
123     }
124 
125     // executeLocal needs to emit similar signals as execute(), to be comparable to the threaded variants.
126     // BaselineAsJobsBenchmark does that. Compare BaselineAsJobsBenchmark and BaselineBenchmark to evaluate the overhead of executing
127     // an operation in a job.
128     QBENCHMARK {
129         for (int i = 0; i < n; ++i) {
130             jobs[i].payload();
131         }
132     }
133 }
134 
BaselineBenchmark_data()135 void QueueBenchmarksTest::BaselineBenchmark_data()
136 {
137     defaultBenchmarkData(true);
138 }
139 
BaselineAsJobsBenchmark()140 void QueueBenchmarksTest::BaselineAsJobsBenchmark()
141 {
142     QFETCH(int, m);
143     QFETCH(int, c);
144     QFETCH(int, b);
145     QFETCH(int, t);
146     const int n = c * b;
147     Q_UNUSED(t); // in this case
148 
149     QVector<AccumulateJob> jobs(n);
150     for (int i = 0; i < n; ++i) {
151         jobs[i].setCount(m);
152     }
153 
154     QBENCHMARK {
155         for (int i = 0; i < n; ++i) {
156             jobs[i].blockingExecute();
157         }
158     }
159 }
160 
BaselineAsJobsBenchmark_data()161 void QueueBenchmarksTest::BaselineAsJobsBenchmark_data()
162 {
163     defaultBenchmarkData(true);
164 }
165 
IndividualJobsBenchmark()166 void QueueBenchmarksTest::IndividualJobsBenchmark()
167 {
168     QFETCH(int, m);
169     QFETCH(int, c);
170     QFETCH(int, b);
171     QFETCH(int, t);
172     const int n = c * b;
173 
174     ThreadWeaver::Queue weaver;
175     weaver.setMaximumNumberOfThreads(t);
176     weaver.suspend();
177     QVector<AccumulateJob> jobs(n);
178     {
179         ThreadWeaver::QueueStream stream(&weaver);
180         for (int i = 0; i < n; ++i) {
181             jobs[i].setCount(m);
182             stream << jobs[i];
183         }
184     }
185     QBENCHMARK_ONCE {
186         weaver.resume();
187         weaver.finish();
188     }
189 }
190 
IndividualJobsBenchmark_data()191 void QueueBenchmarksTest::IndividualJobsBenchmark_data()
192 {
193     defaultBenchmarkData(false);
194 }
195 
CollectionsBenchmark()196 void QueueBenchmarksTest::CollectionsBenchmark()
197 {
198     QFETCH(int, m);
199     QFETCH(int, c);
200     QFETCH(int, b);
201     QFETCH(int, t);
202     const int n = c * b;
203 
204     ThreadWeaver::Queue weaver;
205     weaver.setMaximumNumberOfThreads(t);
206     weaver.suspend();
207     QVector<AccumulateJob> jobs(n);
208 
209     // FIXME currently, memory management of the job sequences (they are deleted when they go out of scope)
210     // is measured as part of the benchmark
211     qDebug() << b << "blocks" << c << "operations, queueing...";
212     // queue the jobs blockwise as collections
213     for (int block = 0; block < b; ++block) {
214         ThreadWeaver::Collection *collection = new ThreadWeaver::Collection();
215         for (int operation = 0; operation < c; ++operation) {
216             const int index = block * b + operation;
217             jobs[index].setCount(m);
218             *collection << jobs[index];
219         }
220         weaver.stream() << collection;
221     }
222 
223     qDebug() << b << "blocks" << c << "operations, executing...";
224     QBENCHMARK_ONCE {
225         weaver.resume();
226         weaver.finish();
227     }
228 }
229 
CollectionsBenchmark_data()230 void QueueBenchmarksTest::CollectionsBenchmark_data()
231 {
232     defaultBenchmarkData(false);
233 }
234 
SequencesBenchmark()235 void QueueBenchmarksTest::SequencesBenchmark()
236 {
237     QFETCH(int, m);
238     QFETCH(int, c);
239     QFETCH(int, b);
240     QFETCH(int, t);
241     const int n = c * b;
242 
243     ThreadWeaver::Queue weaver;
244     weaver.setMaximumNumberOfThreads(t);
245     weaver.suspend();
246     QVector<AccumulateJob> jobs(n);
247 
248     qDebug() << b << "blocks" << c << "operations, queueing...";
249     // queue the jobs blockwise as collections
250     for (int block = 0; block < b; ++block) {
251         ThreadWeaver::Sequence *sequence = new ThreadWeaver::Sequence();
252         for (int operation = 0; operation < c; ++operation) {
253             const int index = block * b + operation;
254             jobs[index].setCount(m);
255             *sequence << jobs[index];
256         }
257         weaver.stream() << sequence;
258     }
259 
260     qDebug() << b << "blocks" << c << "operations, executing...";
261     QBENCHMARK_ONCE {
262         weaver.resume();
263         weaver.finish();
264     }
265 }
266 
SequencesBenchmark_data()267 void QueueBenchmarksTest::SequencesBenchmark_data()
268 {
269     defaultBenchmarkData(false);
270 }
271 
defaultBenchmarkData(bool singleThreaded)272 void QueueBenchmarksTest::defaultBenchmarkData(bool singleThreaded)
273 {
274     QTest::addColumn<int>("m"); // number of quint64's to accumulate
275     QTest::addColumn<int>("c"); // operations per block
276     QTest::addColumn<int>("b"); // number of blocks, number of jobs is b*c
277     QTest::addColumn<int>("t"); // number of worker threads
278 
279     const QList<int> threads = singleThreaded ? QList<int>() << 1 : QList<int>() << 1 << 2 << 4 << 8 << 16 << 32 << 64 << 128;
280     const QList<int> ms = QList<int>() << 1 << 10 << 100 << 1000 << 10000 << 100000;
281     for (int m : ms) {
282         for (int t : threads) {
283             const QString name = QString::fromLatin1("%1 threads, %2 values").arg(t).arg(m);
284             // newRow expects const char*, but then qstrdup's it in the QTestData constructor. Eeeew.
285             QTest::newRow(qPrintable(name)) << m << 256 << 256 << t;
286         }
287     }
288 }
289 
290 QTEST_MAIN(QueueBenchmarksTest)
291 
292 #include "QueueBenchmarks.moc"
293