1 // ©2013-2014 Cameron Desrochers.
2 // Distributed under the simplified BSD license (see the LICENSE file that
3 // should have come with this file).
4
5 // Benchmarks for moodycamel::ConcurrentQueue.
6 // Provides comparative timings of various operations under
7 // highly artificial circumstances. You've been warned :-)
8
9 #include <cstdio>
10 #include <cstring>
11 #include <string>
12 #include <cstdint>
13 #include <cmath>
14 #include <cstdarg>
15 #include <fstream>
16 #include <ctime>
17 #include <random>
18 #include <vector>
19 #include <map>
20 #include <cassert>
21 #include <thread>
22 #include <algorithm>
23 #include <cctype>
24
25 #include "../blockingconcurrentqueue.h"
26 #include "lockbasedqueue.h"
27 #include "simplelockfree.h"
28 #include "boostqueue.h"
29 #include "tbbqueue.h"
30 #include "stdqueue.h"
31 #include "dlibqueue.h"
32 #include "../tests/common/simplethread.h"
33 #include "../tests/common/systemtime.h"
34 #include "cpuid.h"
35
36 using namespace moodycamel;
37
38
39 typedef std::minstd_rand RNG_t;
40
41 bool precise = false;
42
43
44 enum benchmark_type_t
45 {
46 bench_balanced,
47 bench_only_enqueue,
48 bench_only_enqueue_prealloc,
49 bench_only_enqueue_bulk,
50 bench_only_enqueue_bulk_prealloc,
51 bench_only_dequeue,
52 bench_only_dequeue_bulk,
53 bench_mostly_enqueue,
54 bench_mostly_enqueue_bulk,
55 bench_mostly_dequeue,
56 bench_mostly_dequeue_bulk,
57 bench_spmc,
58 bench_spmc_preproduced,
59 bench_mpsc,
60 bench_empty_dequeue,
61 bench_enqueue_dequeue_pairs,
62 bench_heavy_concurrent,
63
64 BENCHMARK_TYPE_COUNT
65 };
66
67 const char BENCHMARK_SHORT_NAMES[BENCHMARK_TYPE_COUNT][32] = {
68 "balanced",
69 "only_enqueue",
70 "only_enqueue_prealloc",
71 "only_enqueue_bulk",
72 "only_enqueue_bulk_prealloc",
73 "only_dequeue",
74 "only_dequeue_bulk",
75 "mostly_enqueue",
76 "mostly_enqueue_bulk",
77 "mostly_dequeue",
78 "mostly_dequeue_bulk",
79 "spmc",
80 "spmc_preproduced",
81 "mpsc",
82 "empty_dequeue",
83 "enqueue_dequeue_pairs",
84 "heavy_concurrent"
85 };
86
87 const char BENCHMARK_NAMES[BENCHMARK_TYPE_COUNT][64] = {
88 "balanced",
89 "only enqueue",
90 "only enqueue (pre-allocated)",
91 "only enqueue bulk",
92 "only enqueue bulk (pre-allocated)",
93 "only dequeue",
94 "only dequeue bulk",
95 "mostly enqueue",
96 "mostly enqueue bulk",
97 "mostly dequeue",
98 "mostly dequeue bulk",
99 "single-producer, multi-consumer",
100 "single-producer, multi-consumer (pre-produced)",
101 "multi-producer, single-consumer",
102 "dequeue from empty",
103 "enqueue-dequeue pairs",
104 "heavy concurrent"
105 };
106
107 const char BENCHMARK_DESCS[BENCHMARK_TYPE_COUNT][256] = {
108 "Measures the average operation speed with multiple symmetrical threads\n under reasonable load -- small random intervals between accesses",
109 "Measures the average operation speed when all threads are producers",
110 "Measures the average operation speed when all threads are producers,\n and the queue has been stretched out first",
111 "Measures the average speed of enqueueing an item in bulk when all threads are producers",
112 "Measures the average speed of enqueueing an item in bulk when all threads are producers,\n and the queue has been stretched out first",
113 "Measures the average operation speed when all threads are consumers",
114 "Measures the average speed of dequeueing an item in bulk when all threads are consumers",
115 "Measures the average operation speed when most threads are enqueueing",
116 "Measures the average speed of enqueueing an item in bulk under light contention",
117 "Measures the average operation speed when most threads are dequeueing",
118 "Measures the average speed of dequeueing an item in bulk under light contention",
119 "Measures the average speed of dequeueing with only one producer, but multiple consumers",
120 "Measures the average speed of dequeueing from a queue pre-filled by one thread",
121 "Measures the average speed of dequeueing with only one consumer, but multiple producers",
122 "Measures the average speed of attempting to dequeue from an empty queue\n (that eight separate threads had at one point enqueued to)",
123 "Measures the average operation speed with each thread doing an enqueue\n followed by a dequeue",
124 "Measures the average operation speed with many threads under heavy load"
125 };
126
127 const char BENCHMARK_SINGLE_THREAD_NOTES[BENCHMARK_TYPE_COUNT][256] = {
128 "",
129 "",
130 "",
131 "",
132 "",
133 "",
134 "",
135 "",
136 "",
137 "",
138 "",
139 "",
140 "",
141 "",
142 "No contention -- measures raw failed dequeue speed on empty queue",
143 "No contention -- measures speed of immediately dequeueing the item that was just enqueued",
144 ""
145 };
146
147 int BENCHMARK_THREADS_MEASURED[BENCHMARK_TYPE_COUNT] = {
148 0, // measures nthreads
149 0,
150 0,
151 0,
152 0,
153 0,
154 0,
155 0,
156 0,
157 0,
158 0,
159 -1, // nthreads - 1
160 0,
161 1, // 1
162 0,
163 0,
164 0,
165 };
166
167 int BENCHMARK_THREADS[BENCHMARK_TYPE_COUNT][9] = {
168 { 2, 3, 4, 8, 12, 16, 32, 0, 0 },
169 { 1, 2, 4, 8, 12, 16, 32, 48, 0 },
170 { 1, 2, 4, 8, 32, 0, 0, 0, 0 },
171 { 1, 2, 4, 8, 12, 16, 32, 48, 0 },
172 { 1, 2, 4, 8, 32, 0, 0, 0, 0 },
173 { 1, 2, 4, 8, 12, 16, 32, 48, 0 },
174 { 1, 2, 4, 8, 12, 16, 32, 48, 0 },
175 { 2, 4, 8, 32, 0, 0, 0, 0, 0 },
176 { 2, 4, 8, 32, 0, 0, 0, 0, 0 },
177 { 2, 4, 8, 0, 0, 0, 0, 0, 0 },
178 { 2, 4, 8, 0, 0, 0, 0, 0, 0 },
179 { 2, 4, 8, 16, 0, 0, 0, 0, 0 },
180 { 1, 3, 7, 15, 0, 0, 0, 0, 0 },
181 { 2, 4, 8, 16, 0, 0, 0, 0, 0 },
182 { 1, 2, 8, 32, 0, 0, 0, 0, 0 },
183 { 1, 2, 4, 8, 32, 0, 0, 0, 0 },
184 { 2, 3, 4, 8, 12, 16, 32, 48, 0 },
185 };
186
187 enum queue_id_t
188 {
189 queue_moodycamel_ConcurrentQueue,
190 queue_moodycamel_BlockingConcurrentQueue,
191 queue_boost,
192 queue_tbb,
193 queue_simplelockfree,
194 queue_lockbased,
195 queue_std,
196 queue_dlib,
197 QUEUE_COUNT
198 };
199
200 const char QUEUE_NAMES[QUEUE_COUNT][64] = {
201 "moodycamel::ConcurrentQueue",
202 "moodycamel::BlockingConcurrentQueue",
203 "boost::lockfree::queue",
204 "tbb::concurrent_queue",
205 "SimpleLockFreeQueue",
206 "LockBasedQueue",
207 "std::queue",
208 "dlib::pipe"
209 };
210
211 const char QUEUE_SUMMARY_NOTES[QUEUE_COUNT][128] = {
212 "including bulk",
213 "including bulk",
214 "",
215 "",
216 "",
217 "",
218 "single thread only",
219 ""
220 };
221
222 const bool QUEUE_TOKEN_SUPPORT[QUEUE_COUNT] = {
223 true,
224 true,
225 false,
226 false,
227 false,
228 false,
229 false,
230 false
231 };
232
233 const int QUEUE_MAX_THREADS[QUEUE_COUNT] = {
234 -1, // no limit
235 -1,
236 -1,
237 -1,
238 -1,
239 -1,
240 1,
241 -1
242 };
243
244 const bool QUEUE_BENCH_SUPPORT[QUEUE_COUNT][BENCHMARK_TYPE_COUNT] = {
245 { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 },
246 { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 },
247 { 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1 },
248 { 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1 },
249 { 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1 },
250 { 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1 },
251 { 0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0 },
252 { 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1 }
253 };
254
255
256 struct Traits : public moodycamel::ConcurrentQueueDefaultTraits
257 {
258 // Use a slightly larger default block size; the default offers
259 // a good trade off between speed and memory usage, but a bigger
260 // block size will improve throughput (which is mostly what
261 // we're after with these benchmarks).
262 static const size_t BLOCK_SIZE = 64;
263 };
264
265
266 typedef std::uint64_t counter_t;
267
268 const counter_t BULK_BATCH_SIZE = 2300;
269
270 struct BenchmarkResult
271 {
272 double elapsedTime;
273 counter_t operations;
274
operator <BenchmarkResult275 inline bool operator<(BenchmarkResult const& other) const
276 {
277 return elapsedTime < other.elapsedTime;
278 }
279 };
280
281
282 template<typename TFunc>
rampUpToMeasurableNumberOfMaxOps(TFunc const & func,counter_t startOps=256)283 counter_t rampUpToMeasurableNumberOfMaxOps(TFunc const& func, counter_t startOps = 256)
284 {
285 counter_t ops = startOps;
286 double time;
287 do {
288 time = func(ops);
289 ops *= 2;
290 } while (time < (precise ? 30 : 10));
291 #ifdef NDEBUG
292 return ops / 2;
293 #else
294 return ops / 4;
295 #endif
296 }
297
adjustForThreads(counter_t suggestedOps,int nthreads)298 counter_t adjustForThreads(counter_t suggestedOps, int nthreads)
299 {
300 return std::max((counter_t)(suggestedOps / std::pow(2, std::sqrt((nthreads - 1) * 3))), suggestedOps / 16);
301 }
302
303
304 template<typename TQueue, typename item_t>
determineMaxOpsForBenchmark(benchmark_type_t benchmark,int nthreads,bool useTokens,unsigned int randSeed)305 counter_t determineMaxOpsForBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, unsigned int randSeed)
306 {
307 switch (benchmark) {
308 case bench_balanced: {
309 return adjustForThreads(rampUpToMeasurableNumberOfMaxOps([&](counter_t ops) {
310 TQueue q;
311 RNG_t rng(randSeed * 1);
312 std::uniform_int_distribution<int> rand(0, 20);
313 double total = 0;
314 SystemTime start;
315 item_t item = 1;
316 for (counter_t i = 0; i != ops; ++i) {
317 start = getSystemTime();
318 q.enqueue(item);
319 total += getTimeDelta(start);
320 }
321 return total;
322 }), nthreads);
323 }
324 case bench_only_enqueue:
325 case bench_only_enqueue_prealloc:
326 case bench_mostly_enqueue: {
327 return adjustForThreads(rampUpToMeasurableNumberOfMaxOps([](counter_t ops) {
328 TQueue q;
329 item_t item = 1;
330 auto start = getSystemTime();
331 for (counter_t i = 0; i != ops; ++i) {
332 q.enqueue(item);
333 }
334 return getTimeDelta(start);
335 }), nthreads);
336 }
337 case bench_only_dequeue:
338 case bench_mostly_dequeue:
339 case bench_spmc:
340 case bench_spmc_preproduced:
341 case bench_mpsc: {
342 return adjustForThreads(rampUpToMeasurableNumberOfMaxOps([](counter_t ops) {
343 TQueue q;
344 item_t item = 1;
345 for (counter_t i = 0; i != ops; ++i) {
346 q.enqueue(item);
347 }
348 item_t item_rec;
349 auto start = getSystemTime();
350 for (counter_t i = 0; i != ops; ++i) {
351 q.try_dequeue(item_rec);
352 }
353 return getTimeDelta(start);
354 }), nthreads);
355 }
356 case bench_only_enqueue_bulk:
357 case bench_only_enqueue_bulk_prealloc:
358 case bench_mostly_enqueue_bulk: {
359 std::vector<item_t> data;
360 for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) {
361 data.push_back(i);
362 }
363 return adjustForThreads(rampUpToMeasurableNumberOfMaxOps([&](counter_t ops) {
364 TQueue q;
365 auto start = getSystemTime();
366 for (counter_t i = 0; i != ops; ++i) {
367 q.enqueue_bulk(data.cbegin(), data.size());
368 }
369 return getTimeDelta(start);
370 }), nthreads);
371 }
372 case bench_only_dequeue_bulk:
373 case bench_mostly_dequeue_bulk: {
374 return adjustForThreads(rampUpToMeasurableNumberOfMaxOps([](counter_t ops) {
375 TQueue q;
376 std::vector<item_t> data(BULK_BATCH_SIZE);
377 for (counter_t i = 0; i != ops; ++i) {
378 q.enqueue_bulk(data.cbegin(), data.size());
379 }
380 auto start = getSystemTime();
381 for (counter_t i = 0; i != ops; ++i) {
382 q.try_dequeue_bulk(data.begin(), data.size());
383 }
384 return getTimeDelta(start);
385 }), nthreads);
386 return 0;
387 }
388 case bench_empty_dequeue: {
389 return adjustForThreads(rampUpToMeasurableNumberOfMaxOps([](counter_t ops) {
390 TQueue q;
391 item_t item_rec;
392 auto start = getSystemTime();
393 for (counter_t i = 0; i != ops; ++i) {
394 q.try_dequeue(item_rec);
395 }
396 return getTimeDelta(start);
397 }), nthreads);
398 }
399 case bench_enqueue_dequeue_pairs: {
400 return adjustForThreads(rampUpToMeasurableNumberOfMaxOps([](counter_t ops) {
401 TQueue q;
402 item_t item = 1;
403 item_t item_rec;
404 auto start = getSystemTime();
405 for (counter_t i = 0; i != ops; ++i) {
406 q.enqueue(item);
407 q.try_dequeue(item_rec);
408 }
409 return getTimeDelta(start);
410 }), nthreads);
411 }
412
413 case bench_heavy_concurrent: {
414 return adjustForThreads(rampUpToMeasurableNumberOfMaxOps([](counter_t ops) {
415 TQueue q;
416 item_t item=1;
417 item_t item_rec;
418 auto start = getSystemTime();
419 for (counter_t i = 0; i != ops; ++i) {
420 q.enqueue(item);
421 q.try_dequeue(item_rec);
422 }
423 return getTimeDelta(start);
424 }), nthreads);
425 }
426
427 default:
428 assert(false && "Every benchmark type must be handled here!");
429 return 0;
430 }
431 }
432
433
434 // Returns time elapsed, in (fractional) milliseconds
435 template<typename TQueue, typename item_t>
runBenchmark(benchmark_type_t benchmark,int nthreads,bool useTokens,unsigned int randSeed,counter_t maxOps,int maxThreads,counter_t & out_opCount)436 double runBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, unsigned int randSeed, counter_t maxOps, int maxThreads, counter_t& out_opCount)
437 {
438 double result = 0;
439 volatile int forceNoOptimizeDummy;
440
441 switch (benchmark) {
442 case bench_balanced: {
443 // Measures the average operation speed with multiple symmetrical threads under reasonable load
444 TQueue q;
445 std::vector<SimpleThread> threads(nthreads);
446 std::vector<counter_t> ops(nthreads);
447 std::vector<double> times(nthreads);
448 std::atomic<int> ready(0);
449 item_t item_rec;
450 item_t item = 1;
451 for (int tid = 0; tid != nthreads; ++tid) {
452 threads[tid] = SimpleThread([&](int id) {
453 ready.fetch_add(1, std::memory_order_relaxed);
454 while (ready.load(std::memory_order_relaxed) != nthreads)
455 continue;
456
457 SystemTime start;
458 RNG_t rng(randSeed * (id + 1));
459 std::uniform_int_distribution<int> rand(0, 20);
460 ops[id] = 0;
461 times[id] = 0;
462 typename TQueue::consumer_token_t consTok(q);
463 typename TQueue::producer_token_t prodTok(q);
464
465
466 for (counter_t i = 0; i != maxOps; ++i) {
467 if (rand(rng) == 0) {
468 start = getSystemTime();
469 if ((i & 1) == 0) {
470 if (useTokens) {
471 q.try_dequeue(consTok, item_rec);
472 }
473 else {
474 q.try_dequeue(item_rec);
475 }
476 }
477 else {
478 if (useTokens) {
479 q.enqueue(prodTok, item);
480 }
481 else {
482 q.enqueue(item);
483 }
484 }
485 times[id] += getTimeDelta(start);
486 ++ops[id];
487 }
488 }
489 }, tid);
490 }
491 out_opCount = 0;
492 result = 0;
493 for (int tid = 0; tid != nthreads; ++tid) {
494 threads[tid].join();
495 out_opCount += ops[tid];
496 result += times[tid];
497 }
498 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
499 break;
500 }
501
502 case bench_only_enqueue_prealloc: {
503 out_opCount = maxOps * nthreads;
504
505 TQueue q;
506 item_t item = 1;
507 item_t item_rec;
508 {
509 // Enqueue opcount elements first, then dequeue them; this
510 // will "stretch out" the queue, letting implementatations
511 // that re-use memory internally avoid having to allocate
512 // more later during the timed enqueue operations.
513 std::vector<SimpleThread> threads(nthreads);
514
515 for (int tid = 0; tid != nthreads; ++tid) {
516 threads[tid] = SimpleThread([&](int id) {
517 if (useTokens) {
518 typename TQueue::producer_token_t tok(q);
519 for (counter_t i = 0; i != maxOps; ++i) {
520 q.enqueue(tok, item);
521 }
522 }
523 else {
524 for (counter_t i = 0; i != maxOps; ++i) {
525 q.enqueue(item);
526 }
527 }
528 }, tid);
529 }
530 for (int tid = 0; tid != nthreads; ++tid) {
531 threads[tid].join();
532 }
533
534 // Now empty the queue
535
536 while (q.try_dequeue(item_rec))
537 continue;
538 }
539
540 if (nthreads == 1) {
541 // No contention -- measures raw single-item enqueue speed
542 auto start = getSystemTime();
543 if (useTokens) {
544 typename TQueue::producer_token_t tok(q);
545 for (counter_t i = 0; i != maxOps; ++i) {
546 q.enqueue(tok, item);
547 }
548 }
549 else {
550 for (counter_t i = 0; i != maxOps; ++i) {
551 q.enqueue(item);
552 }
553 }
554 result = getTimeDelta(start);
555 }
556 else {
557 std::vector<SimpleThread> threads(nthreads);
558 std::vector<double> timings(nthreads);
559 std::atomic<int> ready(0);
560 for (int tid = 0; tid != nthreads; ++tid) {
561 threads[tid] = SimpleThread([&](int id) {
562 ready.fetch_add(1, std::memory_order_relaxed);
563 while (ready.load(std::memory_order_relaxed) != nthreads)
564 continue;
565
566 auto start = getSystemTime();
567 if (useTokens) {
568 typename TQueue::producer_token_t tok(q);
569 for (counter_t i = 0; i != maxOps; ++i) {
570 q.enqueue(tok, item);
571 }
572 }
573 else {
574 for (counter_t i = 0; i != maxOps; ++i) {
575 q.enqueue(item);
576 }
577 }
578 timings[id] = getTimeDelta(start);
579 }, tid);
580 }
581 result = 0;
582 for (int tid = 0; tid != nthreads; ++tid) {
583 threads[tid].join();
584 result += timings[tid];
585 }
586 }
587 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
588 break;
589 }
590
591 case bench_only_enqueue: {
592 out_opCount = maxOps * nthreads;
593
594 TQueue q;
595 item_t item = 1;
596 item_t item_rec;
597 if (nthreads == 1) {
598 // No contention -- measures raw single-item enqueue speed
599 auto start = getSystemTime();
600 if (useTokens) {
601 typename TQueue::producer_token_t tok(q);
602 for (counter_t i = 0; i != maxOps; ++i) {
603 q.enqueue(tok, item);
604 }
605 }
606 else {
607 for (counter_t i = 0; i != maxOps; ++i) {
608 q.enqueue(item);
609 }
610 }
611 result = getTimeDelta(start);
612 }
613 else {
614 std::vector<SimpleThread> threads(nthreads);
615 std::vector<double> timings(nthreads);
616 std::atomic<int> ready(0);
617 for (int tid = 0; tid != nthreads; ++tid) {
618 threads[tid] = SimpleThread([&](int id) {
619 ready.fetch_add(1, std::memory_order_relaxed);
620 while (ready.load(std::memory_order_relaxed) != nthreads)
621 continue;
622
623 auto start = getSystemTime();
624 if (useTokens) {
625 typename TQueue::producer_token_t tok(q);
626 for (counter_t i = 0; i != maxOps; ++i) {
627 q.enqueue(tok, item);
628 }
629 }
630 else {
631 for (counter_t i = 0; i != maxOps; ++i) {
632 q.enqueue(item);
633 }
634 }
635 timings[id] = getTimeDelta(start);
636 }, tid);
637 }
638 result = 0;
639 for (int tid = 0; tid != nthreads; ++tid) {
640 threads[tid].join();
641 result += timings[tid];
642 }
643 }
644 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
645 break;
646 }
647
648 case bench_spmc_preproduced:
649 case bench_only_dequeue: {
650 out_opCount = maxOps * nthreads;
651
652 TQueue q;
653 item_t item = 1;
654 item_t item_rec;
655 {
656 // Fill up the queue first
657 std::vector<SimpleThread> threads(benchmark == bench_spmc_preproduced ? 1 : nthreads);
658 counter_t itemsPerThread = benchmark == bench_spmc_preproduced ? maxOps * nthreads : maxOps;
659 for (size_t tid = 0; tid != threads.size(); ++tid) {
660 threads[tid] = SimpleThread([&](size_t id) {
661 if (useTokens) {
662 typename TQueue::producer_token_t tok(q);
663 for (counter_t i = 0; i != itemsPerThread; ++i) {
664 q.enqueue(tok, item);
665 }
666 }
667 else {
668 for (counter_t i = 0; i != itemsPerThread; ++i) {
669 q.enqueue(item);
670 }
671 }
672 }, tid);
673 }
674 for (size_t tid = 0; tid != threads.size(); ++tid) {
675 threads[tid].join();
676 }
677 }
678
679 if (nthreads == 1) {
680 // No contention -- measures raw single-item dequeue speed
681 auto start = getSystemTime();
682 if (useTokens) {
683 typename TQueue::consumer_token_t tok(q);
684 for (counter_t i = 0; i != maxOps; ++i) {
685 q.try_dequeue(tok, item_rec);
686 }
687 }
688 else {
689 for (counter_t i = 0; i != maxOps; ++i) {
690 q.try_dequeue(item_rec);
691 }
692 }
693 result = getTimeDelta(start);
694 }
695 else {
696 std::vector<SimpleThread> threads(nthreads);
697 std::vector<double> timings(nthreads);
698 std::atomic<int> ready(0);
699 for (int tid = 0; tid != nthreads; ++tid) {
700 threads[tid] = SimpleThread([&](int id) {
701 ready.fetch_add(1, std::memory_order_relaxed);
702 while (ready.load(std::memory_order_relaxed) != nthreads)
703 continue;
704
705 auto start = getSystemTime();
706 if (useTokens) {
707 typename TQueue::consumer_token_t tok(q);
708 for (counter_t i = 0; i != maxOps; ++i) {
709 q.try_dequeue(tok, item_rec);
710 }
711 }
712 else {
713 for (counter_t i = 0; i != maxOps; ++i) {
714 q.try_dequeue(item_rec);
715 }
716 }
717 timings[id] = getTimeDelta(start);
718 }, tid);
719 }
720 result = 0;
721 for (int tid = 0; tid != nthreads; ++tid) {
722 threads[tid].join();
723 result += timings[tid];
724 }
725 }
726 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
727 break;
728 }
729
730 case bench_mostly_enqueue: {
731 // Measures the average operation speed when most threads are enqueueing
732 TQueue q;
733 item_t item = 1;
734 item_t item_rec;
735 out_opCount = maxOps * nthreads;
736 std::vector<SimpleThread> threads(nthreads);
737 std::vector<double> timings(nthreads);
738 auto dequeueThreads = std::max(1, nthreads / 4);
739 std::atomic<int> ready(0);
740 for (int tid = 0; tid != nthreads - dequeueThreads; ++tid) {
741 threads[tid] = SimpleThread([&](int id) {
742 ready.fetch_add(1, std::memory_order_relaxed);
743 while (ready.load(std::memory_order_relaxed) != nthreads)
744 continue;
745
746 auto start = getSystemTime();
747 if (useTokens) {
748 typename TQueue::producer_token_t tok(q);
749 for (counter_t i = 0; i != maxOps; ++i) {
750 q.enqueue(tok, item);
751 }
752 }
753 else {
754 for (counter_t i = 0; i != maxOps; ++i) {
755 q.enqueue(item);
756 }
757 }
758 timings[id] = getTimeDelta(start);
759 }, tid);
760 }
761 for (int tid = nthreads - dequeueThreads; tid != nthreads; ++tid) {
762 threads[tid] = SimpleThread([&](int id) {
763 ready.fetch_add(1, std::memory_order_relaxed);
764 while (ready.load(std::memory_order_relaxed) != nthreads)
765 continue;
766
767 auto start = getSystemTime();
768 if (useTokens) {
769 typename TQueue::consumer_token_t tok(q);
770 for (counter_t i = 0; i != maxOps; ++i) {
771 q.try_dequeue(tok, item_rec);
772 }
773 }
774 else {
775 for (counter_t i = 0; i != maxOps; ++i) {
776 q.try_dequeue(item_rec);
777 }
778 }
779 timings[id] = getTimeDelta(start);
780 }, tid);
781 }
782 result = 0;
783 for (int tid = 0; tid != nthreads; ++tid) {
784 threads[tid].join();
785 result += timings[tid];
786 }
787 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
788 break;
789 }
790
791 case bench_mostly_dequeue: {
792 // Measures the average operation speed when most threads are dequeueing
793 TQueue q;
794 item_t item = 1;
795 item_t item_rec;
796 out_opCount = maxOps * nthreads;
797 std::vector<SimpleThread> threads(nthreads);
798 std::vector<double> timings(nthreads);
799 auto enqueueThreads = std::max(1, nthreads / 4);
800 {
801 // Fill up the queue first
802 std::vector<SimpleThread> threads(enqueueThreads);
803 for (int tid = 0; tid != enqueueThreads; ++tid) {
804 threads[tid] = SimpleThread([&](int id) {
805 if (useTokens) {
806 typename TQueue::producer_token_t tok(q);
807 for (counter_t i = 0; i != maxOps; ++i) {
808 q.enqueue(tok, item);
809 }
810 }
811 else {
812 for (counter_t i = 0; i != maxOps; ++i) {
813 q.enqueue(item);
814 }
815 }
816 }, tid);
817 }
818 for (int tid = 0; tid != enqueueThreads; ++tid) {
819 threads[tid].join();
820 }
821 }
822 std::atomic<int> ready(0);
823 for (int tid = 0; tid != nthreads - enqueueThreads; ++tid) {
824 threads[tid] = SimpleThread([&](int id) {
825 ready.fetch_add(1, std::memory_order_relaxed);
826 while (ready.load(std::memory_order_relaxed) != nthreads)
827 continue;
828
829 auto start = getSystemTime();
830 if (useTokens) {
831 typename TQueue::consumer_token_t tok(q);
832 for (counter_t i = 0; i != maxOps; ++i) {
833 q.try_dequeue(tok, item_rec);
834 }
835 }
836 else {
837 for (counter_t i = 0; i != maxOps; ++i) {
838 q.try_dequeue(item_rec);
839 }
840 }
841 timings[id] = getTimeDelta(start);
842 }, tid);
843 }
844 for (int tid = nthreads - enqueueThreads; tid != nthreads; ++tid) {
845 threads[tid] = SimpleThread([&](int id) {
846 ready.fetch_add(1, std::memory_order_relaxed);
847 while (ready.load(std::memory_order_relaxed) != nthreads)
848 continue;
849
850 auto start = getSystemTime();
851 if (useTokens) {
852 typename TQueue::producer_token_t tok(q);
853 for (counter_t i = 0; i != maxOps; ++i) {
854 q.enqueue(tok, item);
855 }
856 }
857 else {
858 for (counter_t i = 0; i != maxOps; ++i) {
859 q.enqueue(item);
860 }
861 }
862 timings[id] = getTimeDelta(start);
863 }, tid);
864 }
865 result = 0;
866 for (int tid = 0; tid != nthreads; ++tid) {
867 threads[tid].join();
868 result += timings[tid];
869 }
870 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
871 break;
872 }
873
874 case bench_only_enqueue_bulk_prealloc: {
875 TQueue q;
876 item_t item = 1;
877 item_t item_rec;
878 {
879 // Enqueue opcount elements first, then dequeue them; this
880 // will "stretch out" the queue, letting implementatations
881 // that re-use memory internally avoid having to allocate
882 // more later during the timed enqueue operations.
883 std::vector<SimpleThread> threads(nthreads);
884 for (int tid = 0; tid != nthreads; ++tid) {
885 threads[tid] = SimpleThread([&](int id) {
886 if (useTokens) {
887 typename TQueue::producer_token_t tok(q);
888 for (counter_t i = 0; i != maxOps; ++i) {
889 q.enqueue(tok, item);
890 }
891 }
892 else {
893 for (counter_t i = 0; i != maxOps; ++i) {
894 q.enqueue(item);
895 }
896 }
897 }, tid);
898 }
899 for (int tid = 0; tid != nthreads; ++tid) {
900 threads[tid].join();
901 }
902
903 // Now empty the queue
904 while (q.try_dequeue(item_rec))
905 continue;
906 }
907
908 std::vector<counter_t> data;
909 for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) {
910 data.push_back(i);
911 }
912
913 out_opCount = maxOps * BULK_BATCH_SIZE * nthreads;
914 if (nthreads == 1) {
915 auto start = getSystemTime();
916 if (useTokens) {
917 typename TQueue::producer_token_t tok(q);
918 for (counter_t i = 0; i != maxOps; ++i) {
919 q.enqueue_bulk(tok, data.cbegin(), data.size());
920 }
921 }
922 else {
923 for (counter_t i = 0; i != maxOps; ++i) {
924 q.enqueue_bulk(data.cbegin(), data.size());
925 }
926 }
927 result = getTimeDelta(start);
928 }
929 else {
930 std::vector<SimpleThread> threads(nthreads);
931 std::vector<double> timings(nthreads);
932 std::atomic<int> ready(0);
933 for (int tid = 0; tid != nthreads; ++tid) {
934 threads[tid] = SimpleThread([&](int id) {
935 ready.fetch_add(1, std::memory_order_relaxed);
936 while (ready.load(std::memory_order_relaxed) != nthreads)
937 continue;
938
939 auto start = getSystemTime();
940 if (useTokens) {
941 typename TQueue::producer_token_t tok(q);
942 for (counter_t i = 0; i != maxOps; ++i) {
943 q.enqueue_bulk(tok, data.cbegin(), data.size());
944 }
945 }
946 else {
947 for (counter_t i = 0; i != maxOps; ++i) {
948 q.enqueue_bulk(data.cbegin(), data.size());
949 }
950 }
951 timings[id] = getTimeDelta(start);
952 }, tid);
953 }
954 result = 0;
955 for (int tid = 0; tid != nthreads; ++tid) {
956 threads[tid].join();
957 result += timings[tid];
958 }
959 }
960 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
961 break;
962 }
963
964 case bench_only_enqueue_bulk: {
965 TQueue q;
966 item_t item = 1;
967 item_t item_rec;
968 std::vector<counter_t> data;
969 for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) {
970 data.push_back(i);
971 }
972
973 out_opCount = maxOps * BULK_BATCH_SIZE * nthreads;
974 if (nthreads == 1) {
975 auto start = getSystemTime();
976 if (useTokens) {
977 typename TQueue::producer_token_t tok(q);
978 for (counter_t i = 0; i != maxOps; ++i) {
979 q.enqueue_bulk(tok, data.cbegin(), data.size());
980 }
981 }
982 else {
983 for (counter_t i = 0; i != maxOps; ++i) {
984 q.enqueue_bulk(data.cbegin(), data.size());
985 }
986 }
987 result = getTimeDelta(start);
988 }
989 else {
990 std::vector<SimpleThread> threads(nthreads);
991 std::vector<double> timings(nthreads);
992 std::atomic<int> ready(0);
993 for (int tid = 0; tid != nthreads; ++tid) {
994 threads[tid] = SimpleThread([&](int id) {
995 ready.fetch_add(1, std::memory_order_relaxed);
996 while (ready.load(std::memory_order_relaxed) != nthreads)
997 continue;
998
999 auto start = getSystemTime();
1000 if (useTokens) {
1001 typename TQueue::producer_token_t tok(q);
1002 for (counter_t i = 0; i != maxOps; ++i) {
1003 q.enqueue_bulk(tok, data.cbegin(), data.size());
1004 }
1005 }
1006 else {
1007 for (counter_t i = 0; i != maxOps; ++i) {
1008 q.enqueue_bulk(data.cbegin(), data.size());
1009 }
1010 }
1011 timings[id] = getTimeDelta(start);
1012 }, tid);
1013 }
1014 result = 0;
1015 for (int tid = 0; tid != nthreads; ++tid) {
1016 threads[tid].join();
1017 result += timings[tid];
1018 }
1019 }
1020 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1021 break;
1022 }
1023
1024 case bench_mostly_enqueue_bulk: {
1025 // Measures the average speed of enqueueing in bulk under light contention
1026 TQueue q;
1027 item_t item = 1;
1028 item_t item_rec;
1029 std::vector<counter_t> data;
1030 for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) {
1031 data.push_back(i);
1032 }
1033
1034 std::vector<SimpleThread> threads(nthreads);
1035 std::vector<double> timings(nthreads);
1036 auto dequeueThreads = std::max(1, nthreads / 4);
1037 std::vector<counter_t> ops(nthreads - dequeueThreads);
1038 out_opCount = maxOps * BULK_BATCH_SIZE * (nthreads - dequeueThreads); // dequeue ops added after
1039 std::atomic<int> ready(0);
1040 for (int tid = 0; tid != nthreads - dequeueThreads; ++tid) {
1041 threads[tid] = SimpleThread([&](int id) {
1042 ready.fetch_add(1, std::memory_order_relaxed);
1043 while (ready.load(std::memory_order_relaxed) != nthreads)
1044 continue;
1045
1046 auto start = getSystemTime();
1047 if (useTokens) {
1048 typename TQueue::producer_token_t tok(q);
1049 for (counter_t i = 0; i != maxOps; ++i) {
1050 q.enqueue_bulk(tok, data.cbegin(), data.size());
1051 }
1052 }
1053 else {
1054 for (counter_t i = 0; i != maxOps; ++i) {
1055 q.enqueue_bulk(data.cbegin(), data.size());
1056 }
1057 }
1058 timings[id] = getTimeDelta(start);
1059 }, tid);
1060 }
1061 for (int tid = nthreads - dequeueThreads; tid != nthreads; ++tid) {
1062 threads[tid] = SimpleThread([&](int id, int idBase0) {
1063 std::vector<int> items(BULK_BATCH_SIZE);
1064
1065 ready.fetch_add(1, std::memory_order_relaxed);
1066 while (ready.load(std::memory_order_relaxed) != nthreads)
1067 continue;
1068
1069 counter_t totalOps = 0;
1070 auto start = getSystemTime();
1071 if (useTokens) {
1072 typename TQueue::consumer_token_t tok(q);
1073 for (counter_t i = 0; i != maxOps; ++i) {
1074 auto actual = q.try_dequeue_bulk(tok, items.begin(), items.size());
1075 totalOps += actual + (actual == items.size() ? 0 : 1);
1076 }
1077 }
1078 else {
1079 for (counter_t i = 0; i != maxOps; ++i) {
1080 auto actual = q.try_dequeue_bulk(items.begin(), items.size());
1081 totalOps += actual + (actual == items.size() ? 0 : 1);
1082 }
1083 }
1084 timings[id] = getTimeDelta(start);
1085 ops[idBase0] = totalOps;
1086 }, tid, tid - (nthreads - dequeueThreads));
1087 }
1088 result = 0;
1089 for (int tid = 0; tid != nthreads; ++tid) {
1090 threads[tid].join();
1091 result += timings[tid];
1092 if (tid < dequeueThreads) {
1093 out_opCount += ops[tid];
1094 }
1095 }
1096 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1097 break;
1098 }
1099
1100 case bench_only_dequeue_bulk: {
1101 // Measures the average speed of dequeueing in bulk when all threads are consumers
1102 TQueue q;
1103 item_t item = 1;
1104 item_t item_rec;
1105 {
1106 // Fill up the queue first
1107 std::vector<int> data(BULK_BATCH_SIZE);
1108 for (int i = 0; i != BULK_BATCH_SIZE; ++i) {
1109 data[i] = i;
1110 }
1111 std::vector<SimpleThread> threads(nthreads);
1112 for (int tid = 0; tid != nthreads; ++tid) {
1113 threads[tid] = SimpleThread([&](int id) {
1114 if (useTokens) {
1115 typename TQueue::producer_token_t tok(q);
1116 for (counter_t i = 0; i != maxOps; ++i) {
1117 q.enqueue_bulk(tok, data.cbegin(), data.size());
1118 }
1119 }
1120 else {
1121 for (counter_t i = 0; i != maxOps; ++i) {
1122 q.enqueue_bulk(data.cbegin(), data.size());
1123 }
1124 }
1125 }, tid);
1126 }
1127 for (int tid = 0; tid != nthreads; ++tid) {
1128 threads[tid].join();
1129 }
1130 }
1131 if (nthreads == 1) {
1132 out_opCount = maxOps * BULK_BATCH_SIZE;
1133 auto start = getSystemTime();
1134 std::vector<int> items(BULK_BATCH_SIZE);
1135 if (useTokens) {
1136 typename TQueue::consumer_token_t tok(q);
1137 for (counter_t i = 0; i != maxOps; ++i) {
1138 q.try_dequeue_bulk(tok, items.begin(), items.size());
1139 }
1140 }
1141 else {
1142 for (counter_t i = 0; i != maxOps; ++i) {
1143 q.try_dequeue_bulk(items.begin(), items.size());
1144 }
1145 }
1146 result = getTimeDelta(start);
1147 }
1148 else {
1149 std::vector<SimpleThread> threads(nthreads);
1150 std::vector<double> timings(nthreads);
1151 std::vector<counter_t> ops(nthreads);
1152 std::atomic<int> ready(0);
1153 for (int tid = 0; tid != nthreads; ++tid) {
1154 threads[tid] = SimpleThread([&](int id) {
1155 std::vector<int> items(BULK_BATCH_SIZE);
1156 ready.fetch_add(1, std::memory_order_relaxed);
1157 while (ready.load(std::memory_order_relaxed) != nthreads)
1158 continue;
1159
1160 counter_t totalOps = 0;
1161 auto start = getSystemTime();
1162 if (useTokens) {
1163 typename TQueue::consumer_token_t tok(q);
1164 for (counter_t i = 0; i != maxOps; ++i) {
1165 auto actual = q.try_dequeue_bulk(tok, items.begin(), items.size());
1166 totalOps += actual + (actual == items.size() ? 0 : 1);
1167 }
1168 }
1169 else {
1170 for (counter_t i = 0; i != maxOps; ++i) {
1171 auto actual = q.try_dequeue_bulk(items.begin(), items.size());
1172 totalOps += actual + (actual == items.size() ? 0 : 1);
1173 }
1174 }
1175 timings[id] = getTimeDelta(start);
1176 ops[id] = totalOps;
1177 }, tid);
1178 }
1179 result = 0;
1180 out_opCount = 0;
1181 for (int tid = 0; tid != nthreads; ++tid) {
1182 threads[tid].join();
1183 result += timings[tid];
1184 out_opCount += ops[tid];
1185 }
1186 }
1187 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1188 break;
1189 }
1190
1191 case bench_mostly_dequeue_bulk: {
1192 // Measures the average speed of dequeueing in bulk under light contention
1193 TQueue q;
1194 item_t item = 1;
1195 item_t item_rec;
1196 auto enqueueThreads = std::max(1, nthreads / 4);
1197 out_opCount = maxOps * BULK_BATCH_SIZE * enqueueThreads;
1198 std::vector<SimpleThread> threads(nthreads);
1199 std::vector<double> timings(nthreads);
1200 std::vector<counter_t> ops(nthreads - enqueueThreads);
1201 std::vector<int> enqueueData(BULK_BATCH_SIZE);
1202 for (int i = 0; i != BULK_BATCH_SIZE; ++i) {
1203 enqueueData[i] = i;
1204 }
1205 {
1206 // Fill up the queue first
1207 std::vector<SimpleThread> threads(enqueueThreads);
1208 for (int tid = 0; tid != enqueueThreads; ++tid) {
1209 threads[tid] = SimpleThread([&](int id) {
1210 if (useTokens) {
1211 typename TQueue::producer_token_t tok(q);
1212 for (counter_t i = 0; i != maxOps; ++i) {
1213 q.enqueue_bulk(tok, enqueueData.cbegin(), enqueueData.size());
1214 }
1215 }
1216 else {
1217 for (counter_t i = 0; i != maxOps; ++i) {
1218 q.enqueue_bulk(enqueueData.cbegin(), enqueueData.size());
1219 }
1220 }
1221 }, tid);
1222 }
1223 for (int tid = 0; tid != enqueueThreads; ++tid) {
1224 threads[tid].join();
1225 }
1226 }
1227 std::atomic<int> ready(0);
1228 for (int tid = 0; tid != nthreads - enqueueThreads; ++tid) {
1229 threads[tid] = SimpleThread([&](int id) {
1230 std::vector<int> data(BULK_BATCH_SIZE);
1231 ready.fetch_add(1, std::memory_order_relaxed);
1232 while (ready.load(std::memory_order_relaxed) != nthreads)
1233 continue;
1234 counter_t totalOps = 0;
1235 auto start = getSystemTime();
1236 if (useTokens) {
1237 typename TQueue::consumer_token_t tok(q);
1238 for (counter_t i = 0; i != maxOps; ++i) {
1239 auto actual = q.try_dequeue_bulk(tok, data.begin(), data.size());
1240 totalOps += actual + (actual == data.size() ? 0 : 1);
1241 }
1242 }
1243 else {
1244 for (counter_t i = 0; i != maxOps; ++i) {
1245 auto actual = q.try_dequeue_bulk(data.begin(), data.size());
1246 totalOps += actual + (actual == data.size() ? 0 : 1);
1247 }
1248 }
1249 timings[id] = getTimeDelta(start);
1250 ops[id] = totalOps;
1251 }, tid);
1252 }
1253 for (int tid = nthreads - enqueueThreads; tid != nthreads; ++tid) {
1254 threads[tid] = SimpleThread([&](int id) {
1255 ready.fetch_add(1, std::memory_order_relaxed);
1256 while (ready.load(std::memory_order_relaxed) != nthreads)
1257 continue;
1258
1259 auto start = getSystemTime();
1260 if (useTokens) {
1261 typename TQueue::producer_token_t tok(q);
1262 for (counter_t i = 0; i != maxOps; ++i) {
1263 q.enqueue_bulk(tok, enqueueData.cbegin(), enqueueData.size());
1264 }
1265 }
1266 else {
1267 for (counter_t i = 0; i != maxOps; ++i) {
1268 q.enqueue_bulk(enqueueData.cbegin(), enqueueData.size());
1269 }
1270 }
1271 timings[id] = getTimeDelta(start);
1272 }, tid);
1273 }
1274 result = 0;
1275 for (int tid = 0; tid != nthreads; ++tid) {
1276 threads[tid].join();
1277 result += timings[tid];
1278 if (tid < nthreads - enqueueThreads) {
1279 out_opCount += ops[tid];
1280 }
1281 }
1282 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1283 break;
1284 }
1285
1286 case bench_spmc: {
1287 counter_t elementsToDequeue = maxOps * (nthreads - 1);
1288
1289 TQueue q;
1290 item_t item = 1;
1291 item_t item_rec;
1292 std::vector<SimpleThread> threads(nthreads - 1);
1293 std::vector<double> timings(nthreads - 1);
1294 std::vector<counter_t> ops(nthreads - 1);
1295 std::atomic<bool> lynchpin(false);
1296 std::atomic<counter_t> totalDequeued(0);
1297 for (int tid = 0; tid != nthreads - 1; ++tid) {
1298 threads[tid] = SimpleThread([&](int id) {
1299 while (!lynchpin.load(std::memory_order_relaxed)) {
1300 continue;
1301 }
1302
1303 int item;
1304 counter_t i = 0;
1305 auto start = getSystemTime();
1306 if (useTokens) {
1307 typename TQueue::consumer_token_t tok(q);
1308 while (true) {
1309 if (q.try_dequeue(tok, item)) {
1310 totalDequeued.fetch_add(1, std::memory_order_relaxed);
1311 }
1312 else if (totalDequeued.load(std::memory_order_relaxed) == elementsToDequeue) {
1313 break;
1314 }
1315 ++i;
1316 }
1317 }
1318 else {
1319 while (true) {
1320 if (q.try_dequeue(item_rec)) {
1321 totalDequeued.fetch_add(1, std::memory_order_relaxed);
1322 }
1323 else if (totalDequeued.load(std::memory_order_relaxed) == elementsToDequeue) {
1324 break;
1325 }
1326 ++i;
1327 }
1328 }
1329 timings[id] = getTimeDelta(start);
1330 ops[id] = i;
1331 }, tid);
1332 }
1333
1334 lynchpin.store(true, std::memory_order_seq_cst);
1335 for (counter_t i = 0; i != elementsToDequeue; ++i) {
1336 q.enqueue(item);
1337 }
1338
1339 result = 0;
1340 out_opCount = 0;
1341 for (int tid = 0; tid != nthreads - 1; ++tid) {
1342 threads[tid].join();
1343 result += timings[tid];
1344 out_opCount += ops[tid];
1345 }
1346 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1347 break;
1348 }
1349
1350 case bench_mpsc: {
1351 TQueue q;
1352 item_t item = 1;
1353 item_t item_rec;
1354 counter_t elementsToDequeue = maxOps * (nthreads - 1);
1355 std::vector<SimpleThread> threads(nthreads);
1356 std::atomic<int> ready(0);
1357 for (int tid = 0; tid != nthreads; ++tid) {
1358 if (tid == 0) {
1359 // Consumer thread
1360 threads[tid] = SimpleThread([&](int id) {
1361 ready.fetch_add(1, std::memory_order_seq_cst);
1362 while (ready.load(std::memory_order_relaxed) != nthreads)
1363 continue;
1364
1365 int item;
1366 out_opCount = 0;
1367 auto start = getSystemTime();
1368 if (useTokens) {
1369 typename TQueue::consumer_token_t tok(q);
1370 for (counter_t i = 0; i != elementsToDequeue;) {
1371 i += q.try_dequeue(tok, item) ? 1 : 0;
1372 ++out_opCount;
1373 }
1374 }
1375 else {
1376 for (counter_t i = 0; i != elementsToDequeue;) {
1377 i += q.try_dequeue(item_rec) ? 1 : 0;
1378 ++out_opCount;
1379 }
1380 }
1381 result = getTimeDelta(start);
1382 }, tid);
1383 }
1384 else {
1385 threads[tid] = SimpleThread([&](int id) {
1386 ready.fetch_add(1, std::memory_order_seq_cst);
1387 while (ready.load(std::memory_order_relaxed) != nthreads)
1388 continue;
1389
1390 if (useTokens) {
1391 typename TQueue::producer_token_t tok(q);
1392 for (counter_t i = 0; i != maxOps; ++i) {
1393 q.enqueue(tok, item);
1394 }
1395 }
1396 else {
1397 for (counter_t i = 0; i != maxOps; ++i) {
1398 q.enqueue(item);
1399 }
1400 }
1401 }, tid);
1402 }
1403 }
1404
1405 for (int tid = 0; tid != nthreads; ++tid) {
1406 threads[tid].join();
1407 }
1408 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1409 break;
1410 }
1411
1412 case bench_empty_dequeue: {
1413 // Measures the average speed of attempting to dequeue from an empty queue
1414 TQueue q;
1415 item_t item = 1;
1416 item_t item_rec;
1417 // Fill up then empty the queue first
1418 {
1419 std::vector<SimpleThread> threads(maxThreads > 0 ? maxThreads : 8);
1420 for (size_t tid = 0; tid != threads.size(); ++tid) {
1421 threads[tid] = SimpleThread([&](size_t id) {
1422 if (useTokens) {
1423 typename TQueue::producer_token_t tok(q);
1424 for (counter_t i = 0; i != 10000; ++i) {
1425 q.enqueue(tok, item);
1426 }
1427 }
1428 else {
1429 for (counter_t i = 0; i != 10000; ++i) {
1430 q.enqueue(item);
1431 }
1432 }
1433 }, tid);
1434 }
1435 for (size_t tid = 0; tid != threads.size(); ++tid) {
1436 threads[tid].join();
1437 }
1438
1439 // Empty the queue
1440 while (q.try_dequeue(item_rec))
1441 continue;
1442 }
1443
1444 if (nthreads == 1) {
1445 // No contention -- measures raw failed dequeue speed on empty queue
1446 int item;
1447 out_opCount = maxOps;
1448 auto start = getSystemTime();
1449 if (useTokens) {
1450 typename TQueue::consumer_token_t tok(q);
1451 for (counter_t i = 0; i != maxOps; ++i) {
1452 q.try_dequeue(tok, item);
1453 }
1454 }
1455 else {
1456 for (counter_t i = 0; i != maxOps; ++i) {
1457 q.try_dequeue(item_rec);
1458 }
1459 }
1460 result = getTimeDelta(start);
1461 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1462 }
1463 else {
1464 out_opCount = maxOps * nthreads;
1465 std::vector<SimpleThread> threads(nthreads);
1466 std::vector<double> timings(nthreads);
1467 std::atomic<int> ready(0);
1468 for (int tid = 0; tid != nthreads; ++tid) {
1469 threads[tid] = SimpleThread([&](int id) {
1470 ready.fetch_add(1, std::memory_order_relaxed);
1471 while (ready.load(std::memory_order_relaxed) != nthreads)
1472 continue;
1473
1474 int item;
1475 auto start = getSystemTime();
1476 if (useTokens) {
1477 typename TQueue::consumer_token_t tok(q);
1478 for (counter_t i = 0; i != maxOps; ++i) {
1479 q.try_dequeue(tok, item);
1480 }
1481 }
1482 else {
1483 for (counter_t i = 0; i != maxOps; ++i) {
1484 q.try_dequeue(item_rec);
1485 }
1486 }
1487 timings[id] = getTimeDelta(start);
1488 }, tid);
1489 }
1490 result = 0;
1491 for (int tid = 0; tid != nthreads; ++tid) {
1492 threads[tid].join();
1493 result += timings[tid];
1494 }
1495 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1496 }
1497 break;
1498 }
1499
1500 case bench_enqueue_dequeue_pairs: {
1501 // Measures the average speed of attempting to dequeue from an empty queue
1502 // (that eight separate threads had at one point enqueued to)
1503 out_opCount = maxOps * 2 * nthreads;
1504 TQueue q;
1505 item_t item = 1;
1506 item_t item_rec;
1507 if (nthreads == 1) {
1508 // No contention -- measures speed of immediately dequeueing the item that was just enqueued
1509 int item;
1510 auto start = getSystemTime();
1511 if (useTokens) {
1512 typename TQueue::producer_token_t prodTok(q);
1513 typename TQueue::consumer_token_t consTok(q);
1514 for (counter_t i = 0; i != maxOps; ++i) {
1515 q.enqueue(prodTok, item);
1516 q.try_dequeue(consTok, item);
1517 }
1518 }
1519 else {
1520 for (counter_t i = 0; i != maxOps; ++i) {
1521 q.enqueue(item);
1522 q.try_dequeue(item_rec);
1523 }
1524 }
1525 result = getTimeDelta(start);
1526 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1527 }
1528 else {
1529 std::vector<SimpleThread> threads(nthreads);
1530 std::vector<double> timings(nthreads);
1531 std::atomic<int> ready(0);
1532 for (int tid = 0; tid != nthreads; ++tid) {
1533 threads[tid] = SimpleThread([&](int id) {
1534 ready.fetch_add(1, std::memory_order_relaxed);
1535 while (ready.load(std::memory_order_relaxed) != nthreads)
1536 continue;
1537
1538 int item;
1539 auto start = getSystemTime();
1540 if (useTokens) {
1541 typename TQueue::producer_token_t prodTok(q);
1542 typename TQueue::consumer_token_t consTok(q);
1543 for (counter_t i = 0; i != maxOps; ++i) {
1544 q.enqueue(prodTok, item);
1545 q.try_dequeue(consTok, item);
1546 }
1547 }
1548 else {
1549 for (counter_t i = 0; i != maxOps; ++i) {
1550 q.enqueue(item);
1551 q.try_dequeue(item_rec);
1552 }
1553 }
1554 timings[id] = getTimeDelta(start);
1555 }, tid);
1556 }
1557 result = 0;
1558 for (int tid = 0; tid != nthreads; ++tid) {
1559 threads[tid].join();
1560 result += timings[tid];
1561 }
1562 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1563 }
1564 break;
1565 }
1566
1567 case bench_heavy_concurrent: {
1568 // Measures the average operation speed with many threads under heavy load
1569 out_opCount = maxOps * nthreads;
1570 TQueue q;
1571 item_t item = 1;
1572 item_t item_rec;
1573 std::vector<SimpleThread> threads(nthreads);
1574 std::vector<double> timings(nthreads);
1575 std::atomic<int> ready(0);
1576 for (int tid = 0; tid != nthreads; ++tid) {
1577 threads[tid] = SimpleThread([&](int id) {
1578 ready.fetch_add(1, std::memory_order_relaxed);
1579 while (ready.load(std::memory_order_relaxed) != nthreads)
1580 continue;
1581
1582 auto start = getSystemTime();
1583 if (id < 2) {
1584 // Alternate
1585 int item;
1586 if (useTokens) {
1587 typename TQueue::consumer_token_t consTok(q);
1588 typename TQueue::producer_token_t prodTok(q);
1589
1590 for (counter_t i = 0; i != maxOps / 2; ++i) {
1591 q.try_dequeue(consTok, item);
1592 q.enqueue(prodTok, item);
1593 }
1594 }
1595 else {
1596 for (counter_t i = 0; i != maxOps / 2; ++i) {
1597 q.try_dequeue(item_rec);
1598 q.enqueue(item);
1599 }
1600 }
1601 }
1602 else {
1603 if ((id & 1) == 0) {
1604 // Enqueue
1605 if (useTokens) {
1606 typename TQueue::producer_token_t prodTok(q);
1607 for (counter_t i = 0; i != maxOps; ++i) {
1608 q.enqueue(prodTok, item);
1609 }
1610 }
1611 else {
1612 for (counter_t i = 0; i != maxOps; ++i) {
1613 q.enqueue(item);
1614 }
1615 }
1616 }
1617 else {
1618 // Dequeue
1619 int item;
1620 if (useTokens) {
1621 typename TQueue::consumer_token_t consTok(q);
1622 for (counter_t i = 0; i != maxOps; ++i) {
1623 q.try_dequeue(consTok, item);
1624 }
1625 }
1626 else {
1627 for (counter_t i = 0; i != maxOps; ++i) {
1628 q.try_dequeue(item_rec);
1629 }
1630 }
1631 }
1632 }
1633 timings[id] = getTimeDelta(start);
1634 }, tid);
1635 }
1636 result = 0;
1637 for (int tid = 0; tid != nthreads; ++tid) {
1638 threads[tid].join();
1639 result += timings[tid];
1640 }
1641 forceNoOptimizeDummy = q.try_dequeue(item_rec) ? 1 : 0;
1642 break;
1643 }
1644
1645 default:
1646 assert(false && "Every benchmark type must be handled here!");
1647 result = 0;
1648 out_opCount = 0;
1649 }
1650
1651 (void)forceNoOptimizeDummy;
1652
1653 return result;
1654 }
1655
1656
1657 const char* LOG_FILE = "benchmarks.log";
1658 std::ofstream* logOut;
1659 bool logErrorReported = false;
1660
sayf(int indent,const char * fmt,...)1661 void sayf(int indent, const char* fmt, ...)
1662 {
1663 static char indentBuffer[] = " ";
1664 static char buf[2048];
1665
1666 indentBuffer[indent] = '\0';
1667
1668 va_list arglist;
1669 va_start(arglist, fmt);
1670 vsprintf(buf, fmt, arglist);
1671 va_end(arglist);
1672
1673 if (*logOut) {
1674 (*logOut) << indentBuffer << buf;
1675 }
1676 else if (!logErrorReported) {
1677 std::printf("Note: Error writing to log file. Future output will appear only on stdout\n");
1678 logErrorReported = true;
1679 }
1680 std::printf("%s%s", indentBuffer, buf);
1681
1682 indentBuffer[indent] = ' ';
1683 }
1684
1685
1686 // Returns a formatted timestamp.
1687 // Returned buffer is only valid until the next call.
1688 // Not thread-safe.
timestamp()1689 static const char* timestamp()
1690 {
1691 static char buf[32];
1692 time_t time = std::time(NULL);
1693 strcpy(buf, std::asctime(std::localtime(&time)));
1694 buf[strlen(buf) - 1] = '\0'; // Remove trailing newline
1695 return buf;
1696 }
1697
isvowel(char ch)1698 static inline bool isvowel(char ch)
1699 {
1700 ch = std::tolower(ch);
1701 for (const char* v = "aeiou"; *v != '\0'; ++v) {
1702 if (*v == ch) {
1703 return true;
1704 }
1705 }
1706 return false;
1707 }
1708
safe_divide(double a,double b)1709 static inline double safe_divide(double a, double b)
1710 {
1711 return b == 0 ? 0 : a / b;
1712 }
1713
1714 // Returns a positive number formatted in a string in a human-readable way.
1715 // The string is always 7 characters or less (excluding null byte).
1716 // Returned buffer is only valid until the sixteenth next call.
1717 // Not thread safe.
pretty(double num)1718 static const char* pretty(double num)
1719 {
1720 assert(num >= 0);
1721
1722 #if defined(_MSC_VER) && _MSC_VER < 1800
1723 if (!_finite(num)) {
1724 return "inf";
1725 }
1726 if (_isnan(num)) {
1727 return "nan";
1728 }
1729 #else
1730 if (std::isinf(num)) {
1731 return "inf";
1732 }
1733 if (std::isnan(num)) {
1734 return "nan";
1735 }
1736 #endif
1737
1738 static char bufs[16][8];
1739 static int nextBuf = 0;
1740 char* buf = bufs[nextBuf++];
1741 nextBuf &= 15;
1742
1743 int suffix = 0;
1744 if (num < 1) {
1745 static const char minisufs[] = "\0munpfazy";
1746 while (num < 0.01) {
1747 ++suffix;
1748 num *= 1000;
1749 }
1750 sprintf(buf, "%1.4f%c", num, minisufs[suffix]);
1751 }
1752 else {
1753 static const char megasufs[] = "\0kMGTPEZY";
1754 while (num >= 1000) {
1755 ++suffix;
1756 num /= 1000;
1757 }
1758 sprintf(buf, "%.2f%c", num, megasufs[suffix]);
1759 }
1760
1761 return buf;
1762 }
1763
printBenchmarkNames()1764 void printBenchmarkNames()
1765 {
1766 std::printf(" Supported benchmarks are:\n");
1767
1768 for (int i = 0; i != BENCHMARK_TYPE_COUNT; ++i) {
1769 std::printf(" %s\n", BENCHMARK_SHORT_NAMES[i]);
1770 }
1771 }
1772
1773
main(int argc,char ** argv)1774 int main(int argc, char** argv)
1775 {
1776 // Disable buffering (so that when run in, e.g., Sublime Text, the output appears as it is written)
1777 std::setvbuf(stdout, nullptr, _IONBF, 0);
1778
1779 // Isolate the executable name
1780 std::string progName = argv[0];
1781 auto slash = progName.find_last_of("/\\");
1782 if (slash != std::string::npos) {
1783 progName = progName.substr(slash + 1);
1784 }
1785
1786 std::map<std::string, benchmark_type_t> benchmarkMap;
1787 for (int i = 0; i != BENCHMARK_TYPE_COUNT; ++i) {
1788 benchmarkMap.insert(std::make_pair(std::string(BENCHMARK_SHORT_NAMES[i]), (benchmark_type_t)i));
1789 }
1790 std::vector<benchmark_type_t> selectedBenchmarks;
1791
1792 bool showHelp = false;
1793 bool error = false;
1794 bool printedBenchmarks = false;
1795 for (int i = 1; i < argc; ++i) {
1796 if (std::strcmp(argv[i], "-h") == 0 || std::strcmp(argv[i], "--help") == 0) {
1797 showHelp = true;
1798 }
1799 else if (std::strcmp(argv[i], "-p") == 0 || std::strcmp(argv[i], "--precise") == 0) {
1800 precise = true;
1801 }
1802 else if (std::strcmp(argv[i], "--run") == 0) {
1803 if (i + 1 == argc || argv[i + 1][0] == '-') {
1804 std::printf("Expected benchmark name argument for --run option.\n");
1805 if (!printedBenchmarks) {
1806 printBenchmarkNames();
1807 printedBenchmarks = true;
1808 }
1809 error = true;
1810 continue;
1811 }
1812
1813 auto it = benchmarkMap.find(argv[++i]);
1814 if (it == benchmarkMap.end()) {
1815 std::printf("Unrecognized benchmark name '%s'.\n", argv[i]);
1816 if (!printedBenchmarks) {
1817 printBenchmarkNames();
1818 printedBenchmarks = true;
1819 }
1820 error = true;
1821 continue;
1822 }
1823
1824 selectedBenchmarks.push_back(it->second);
1825 }
1826 else {
1827 std::printf("Unrecognized option '%s'\n", argv[i]);
1828 error = true;
1829 }
1830 }
1831 if (showHelp || error) {
1832 if (error) {
1833 std::printf("\n");
1834 }
1835 std::printf("%s\n Description: Runs benchmarks for moodycamel::ConcurrentQueue\n", progName.c_str());
1836 std::printf(" --help Prints this help blurb\n");
1837 std::printf(" --precise Generate more precise benchmark results (slower)\n");
1838 std::printf(" --run benchmark Runs only the selected benchmark (can be used multiple times)\n");
1839 return error ? 1 : 0;
1840 }
1841
1842 bool logExists = true;
1843 {
1844 std::ifstream fin(LOG_FILE);
1845 if (!fin) {
1846 logExists = false;
1847 }
1848 }
1849
1850 std::ofstream fout(LOG_FILE, std::ios::app);
1851 logOut = &fout;
1852 if (fout) {
1853 if (logExists) {
1854 fout << "\n\n\n";
1855 }
1856 fout << "--- New run (" << timestamp() << ") ---\n";
1857 }
1858 else {
1859 std::printf("Note: Error opening log file '%s'. Output will appear only on stdout.\n\n", LOG_FILE);
1860 logErrorReported = true;
1861 }
1862
1863 const char* bitStr = "";
1864 if (sizeof(void*) == 4 || sizeof(void*) == 8) {
1865 bitStr = sizeof(void*) == 4 ? " 32-bit" : " 64-bit";
1866 }
1867
1868 const char* cpuStr = getCPUString();
1869 sayf(0, "Running%s benchmarks on a%s %s\n", bitStr, isvowel(cpuStr[0]) ? "n" : "", cpuStr);
1870 if (precise) {
1871 sayf(4, "(precise mode)\n");
1872 }
1873 if (selectedBenchmarks.size() > 0) {
1874 sayf(4, "(selected benchmarks only)\n");
1875 }
1876 sayf(0, "Note that these are synthetic benchmarks. Take them with a grain of salt.\n\n");
1877
1878 sayf(0, "Legend:\n");
1879 sayf(4, "'Avg': Average time taken per operation, normalized to be per thread\n");
1880 sayf(4, "'Range': The minimum and maximum times taken per operation (per thread)\n");
1881 sayf(4, "'Ops/s': Overall operations per second\n");
1882 sayf(4, "'Ops/s/t': Operations per second per thread (inverse of 'Avg')\n");
1883 sayf(4, "Operations include those that fail (e.g. because the queue is empty).\n");
1884 sayf(4, "Each logical enqueue/dequeue counts as an individual operation when in bulk.\n");
1885 sayf(0, "\n");
1886
1887
1888 #ifdef NDEBUG
1889 const int ITERATIONS = precise ? 100 : 10;
1890 #else
1891 const int ITERATIONS = precise ? 20 : 2;
1892 #endif
1893
1894
1895 const double FASTEST_PERCENT_CONSIDERED = precise ? 8 : 50; // Only consider the top % of runs
1896
1897 // Make sure each run of a given benchmark has the same seed (otherwise different runs are not comparable)
1898 std::srand(std::time(NULL));
1899 unsigned int randSeeds[BENCHMARK_TYPE_COUNT];
1900 for (unsigned int i = 0; i != BENCHMARK_TYPE_COUNT; ++i) {
1901 randSeeds[i] = std::rand() * (i + 1) + 1;
1902 }
1903
1904 double opsst = 0; // ops/s/thread
1905
1906 double totalWeightedOpsst[QUEUE_COUNT];
1907 double totalWeight[QUEUE_COUNT];
1908 for (int i = 0; i != QUEUE_COUNT; ++i) {
1909 totalWeightedOpsst[i] = 0;
1910 totalWeight[i] = 0;
1911 }
1912
1913 auto logicalCores = std::thread::hardware_concurrency();
1914
1915 if (selectedBenchmarks.size() == 0) {
1916 for (int i = 0; i != BENCHMARK_TYPE_COUNT; ++i) {
1917 selectedBenchmarks.push_back((benchmark_type_t)i);
1918 }
1919 }
1920
1921 int indent = 0;
1922 for (auto selectedIt = selectedBenchmarks.cbegin(); selectedIt != selectedBenchmarks.cend(); ++selectedIt) {
1923 int benchmark = static_cast<int>(*selectedIt);
1924 auto seed = randSeeds[benchmark];
1925
1926 bool anyQueueSupportsBenchmark = false;
1927 for (int queue = 0; queue != QUEUE_COUNT; ++queue) {
1928 if (QUEUE_BENCH_SUPPORT[queue][benchmark]) {
1929 anyQueueSupportsBenchmark = true;
1930 break;
1931 }
1932 }
1933 if (!anyQueueSupportsBenchmark) {
1934 continue;
1935 }
1936
1937 sayf(0, "%s", BENCHMARK_NAMES[benchmark]);
1938 if (BENCHMARK_THREADS_MEASURED[benchmark] != 0) {
1939 if (BENCHMARK_THREADS_MEASURED[benchmark] < 0) {
1940 sayf(0, " (measuring all but %d %s)", -BENCHMARK_THREADS_MEASURED[benchmark], BENCHMARK_THREADS_MEASURED[benchmark] == -1 ? "thread" : "threads");
1941 }
1942 else {
1943 sayf(0, " (measuring %d %s)", BENCHMARK_THREADS_MEASURED[benchmark], BENCHMARK_THREADS_MEASURED[benchmark] == 1 ? "thread" : "threads");
1944 }
1945 }
1946 sayf(0, ":\n");
1947 indent += 2;
1948 sayf(indent, "(%s)\n", BENCHMARK_DESCS[benchmark]);
1949
1950 for (int queue = 0; queue != QUEUE_COUNT; ++queue) {
1951 sayf(indent, "> %s\n", QUEUE_NAMES[queue]);
1952
1953 if (!QUEUE_BENCH_SUPPORT[queue][benchmark]) {
1954 sayf(indent + 3, "(skipping, benchmark not supported...)\n\n");
1955 continue;
1956 }
1957
1958 if (QUEUE_TOKEN_SUPPORT[queue]) {
1959 indent += 4;
1960 }
1961 for (int useTokens = 0; useTokens != 2; ++useTokens) {
1962 if (QUEUE_TOKEN_SUPPORT[queue]) {
1963 sayf(indent, "%s tokens\n", useTokens == 0 ? "Without" : "With");
1964 }
1965 if (useTokens == 1 && !QUEUE_TOKEN_SUPPORT[queue]) {
1966 continue;
1967 }
1968 indent += 3;
1969
1970 std::vector<double> opssts;
1971 std::vector<int> threadCounts;
1972 for (int nthreadIndex = 0; BENCHMARK_THREADS[benchmark][nthreadIndex] != 0; ++nthreadIndex) {
1973 int nthreads = BENCHMARK_THREADS[benchmark][nthreadIndex];
1974 int measuredThreads = nthreads;
1975 if (BENCHMARK_THREADS_MEASURED[benchmark] != 0) {
1976 measuredThreads = BENCHMARK_THREADS_MEASURED[benchmark] < 0 ? nthreads + BENCHMARK_THREADS_MEASURED[benchmark] : BENCHMARK_THREADS_MEASURED[benchmark];
1977 }
1978
1979 if (logicalCores > 0 && (unsigned int)nthreads > 3 * logicalCores) {
1980 continue;
1981 }
1982 if (QUEUE_MAX_THREADS[queue] >= 0 && QUEUE_MAX_THREADS[queue] < nthreads) {
1983 continue;
1984 }
1985
1986 counter_t maxOps;
1987 switch ((queue_id_t)queue) {
1988 case queue_moodycamel_ConcurrentQueue:
1989 maxOps = determineMaxOpsForBenchmark<moodycamel::ConcurrentQueue<int, Traits>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed);
1990 break;
1991 case queue_moodycamel_BlockingConcurrentQueue:
1992 maxOps = determineMaxOpsForBenchmark<moodycamel::BlockingConcurrentQueue<int, Traits>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed);
1993 break;
1994 case queue_lockbased:
1995 maxOps = determineMaxOpsForBenchmark<LockBasedQueue<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed);
1996 break;
1997 case queue_simplelockfree:
1998 maxOps = determineMaxOpsForBenchmark<SimpleLockFreeQueue<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed);
1999 break;
2000 case queue_boost:
2001 maxOps = determineMaxOpsForBenchmark<BoostQueueWrapper<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed);
2002 break;
2003 case queue_tbb:
2004 maxOps = determineMaxOpsForBenchmark<TbbQueueWrapper<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed);
2005 break;
2006 case queue_std:
2007 maxOps = determineMaxOpsForBenchmark<StdQueueWrapper<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed);
2008 break;
2009 case queue_dlib:
2010 maxOps = determineMaxOpsForBenchmark<DlibQueueWrapper<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed);
2011 break;
2012 default:
2013 assert(false && "There should be a case here for every queue in the benchmarks!");
2014 }
2015 //std::printf("maxOps: %llu\n", maxOps);
2016
2017 int maxThreads = QUEUE_MAX_THREADS[queue];
2018 std::vector<BenchmarkResult> results(ITERATIONS);
2019 for (int i = 0; i < ITERATIONS; ++i) {
2020 double elapsed;
2021 counter_t ops = 0;
2022
2023 switch ((queue_id_t)queue) {
2024 case queue_moodycamel_ConcurrentQueue:
2025 elapsed = runBenchmark<moodycamel::ConcurrentQueue<int, Traits>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed, maxOps, maxThreads, ops);
2026 break;
2027 case queue_moodycamel_BlockingConcurrentQueue:
2028 elapsed = runBenchmark<moodycamel::BlockingConcurrentQueue<int, Traits>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed, maxOps, maxThreads, ops);
2029 break;
2030 case queue_lockbased:
2031 elapsed = runBenchmark<LockBasedQueue<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed, maxOps, maxThreads, ops);
2032 break;
2033 case queue_simplelockfree:
2034 elapsed = runBenchmark<SimpleLockFreeQueue<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed, maxOps, maxThreads, ops);
2035 break;
2036 case queue_boost:
2037 elapsed = runBenchmark<BoostQueueWrapper<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed, maxOps, maxThreads, ops);
2038 break;
2039 case queue_tbb:
2040 elapsed = runBenchmark<TbbQueueWrapper<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed, maxOps, maxThreads, ops);
2041 break;
2042 case queue_std:
2043 elapsed = runBenchmark<StdQueueWrapper<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed, maxOps, maxThreads, ops);
2044 break;
2045 case queue_dlib:
2046 elapsed = runBenchmark<DlibQueueWrapper<int>, int>((benchmark_type_t)benchmark, nthreads, (bool)useTokens, seed, maxOps, maxThreads, ops);
2047 break;
2048 default:
2049 assert(false && "There should be a case here for every queue in the benchmarks!");
2050 }
2051
2052 results[i].elapsedTime = elapsed;
2053 results[i].operations = ops;
2054 }
2055
2056 std::sort(&results[0], &results[0] + ITERATIONS);
2057 int consideredCount = std::max(2, (int)(ITERATIONS * FASTEST_PERCENT_CONSIDERED / 100));
2058
2059 double min = safe_divide(results[0].elapsedTime / 1000.0, (double)results[0].operations / measuredThreads);
2060 double max = safe_divide(results[0].elapsedTime / 1000.0, (double)results[0].operations / measuredThreads);
2061 double ops = 0;
2062 double time = 0;
2063 for (int i = 0; i != consideredCount; ++i) {
2064 double msPerOperation = safe_divide(results[i].elapsedTime / 1000.0, (double)results[i].operations / measuredThreads);
2065 if (msPerOperation < min) {
2066 min = msPerOperation;
2067 }
2068 else if (msPerOperation > max) {
2069 max = msPerOperation;
2070 }
2071
2072 time += results[i].elapsedTime;
2073 ops += results[i].operations;
2074 }
2075
2076 double avg = safe_divide(time / 1000.0, ops / measuredThreads);
2077 double opsPerSecond = safe_divide(ops, time / 1000.0);
2078 opsst = opsPerSecond / (double)measuredThreads;
2079
2080 opssts.push_back(opsst);
2081 threadCounts.push_back(measuredThreads);
2082
2083 sayf(indent, "%-3d %7s: Avg: %7ss Range: [%7ss, %7ss] Ops/s: %7s Ops/s/t: %7s\n", nthreads, nthreads != 1 ? "threads" : "thread", pretty(avg), pretty(min), pretty(max), pretty(opsPerSecond), pretty(opsst));
2084 if (nthreads == 1 && BENCHMARK_SINGLE_THREAD_NOTES[benchmark][0] != '\0') {
2085 sayf(indent + 7, "^ Note: %s\n", BENCHMARK_SINGLE_THREAD_NOTES[benchmark]);
2086 }
2087 }
2088
2089 opsst = 0;
2090 double divisor = 0;
2091 for (size_t i = 0; i != opssts.size(); ++i) {
2092 opsst += opssts[i] * std::sqrt(threadCounts[i]);
2093 totalWeightedOpsst[queue] += opssts[i] * std::sqrt(threadCounts[i]);
2094 divisor += std::sqrt(threadCounts[i]);
2095 totalWeight[queue] += std::sqrt(threadCounts[i]);
2096 }
2097 opsst /= divisor;
2098 sayf(indent, "Operations per second per thread (weighted average): %7s\n\n", opsst == 0 ? "(n/a)" : pretty(opsst));
2099
2100 indent -= 3;
2101 }
2102 if (QUEUE_TOKEN_SUPPORT[queue]) {
2103 indent -= 4;
2104 }
2105 }
2106 indent -= 2;
2107 }
2108
2109 sayf(0, "Overall average operations per second per thread (where higher-concurrency runs have more weight):\n");
2110 sayf(0, "(Take this summary with a grain of salt -- look at the individual benchmark results for a much\nbetter idea of how the queues measure up to each other):\n");
2111 for (int queue = 0; queue != QUEUE_COUNT; ++queue) {
2112 opsst = safe_divide(totalWeightedOpsst[queue], totalWeight[queue]);
2113 if (QUEUE_SUMMARY_NOTES[queue] != nullptr && QUEUE_SUMMARY_NOTES[queue][0] != '\0') {
2114 sayf(4, "%s (%s): %7s\n", QUEUE_NAMES[queue], QUEUE_SUMMARY_NOTES[queue], opsst == 0 ? "(n/a)" : pretty(opsst));
2115 }
2116 else {
2117 sayf(4, "%s: %7s\n", QUEUE_NAMES[queue], opsst == 0 ? "(n/a)" : pretty(opsst));
2118 }
2119 }
2120
2121 return 0;
2122 }
2123