1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include <cstdlib>
18 #include <cmath>
19 #include <queue>
20 #include "tbb/tbb_stddef.h"
21 #include "tbb/spin_mutex.h"
22 #include "tbb/task_scheduler_init.h"
23 #include "tbb/parallel_for.h"
24 #include "tbb/tick_count.h"
25 #include "tbb/blocked_range.h"
26 #include "../test/harness.h"
27 #include "tbb/concurrent_priority_queue.h"
28
29 #pragma warning(disable: 4996)
30
31 #define IMPL_STL 0
32 #define IMPL_CPQ 1
33
34 using namespace tbb;
35
36 //const int contention = 75; // degree contention. 100 = 0 us busy_wait, 50 = 50*contention_unit us
37 const double contention_unit = 0.025; // in microseconds (us)
38 const double throughput_window = 30; // in seconds
39 const int num_initial_events = 10000; // number of initial events in the queue
40 const int min_elapse = 20; // min contention_units to elapse between event spawns
41 const int max_elapse = 40; // max contention_units to elapse between event spawns
42 const int min_spawn = 0; // min number of events to spawn
43 const int max_spawn = 2; // max number of events to spawn
44
45 tbb::atomic<unsigned int> operation_count;
46 tbb::tick_count start;
47 bool done;
48
49 class event {
50 public:
51 int timestamp;
52 int elapse;
53 int spawn;
54 };
55
56 class timestamp_compare {
57 public:
operator ()(event e1,event e2)58 bool operator()(event e1, event e2) {
59 return e2.timestamp<e1.timestamp;
60 }
61 };
62
63 spin_mutex *my_mutex;
64 std::priority_queue<event, std::vector<event>, timestamp_compare > *stl_cpq;
65 concurrent_priority_queue<event, timestamp_compare > *lfc_pq;
66
67 unsigned int one_us_iters = 429; // default value
68
69 // if user wants to calibrate to microseconds on particular machine, call this at beginning of program
70 // sets one_us_iters to number of iters to busy_wait for approx. 1 us
calibrate_busy_wait()71 void calibrate_busy_wait() {
72 const unsigned niter = 1000000;
73 tbb::tick_count t0 = tbb::tick_count::now();
74 for (volatile unsigned int i=0; i<niter; ++i) continue;
75 tbb::tick_count t1 = tbb::tick_count::now();
76
77 one_us_iters = (unsigned int)(niter/(t1-t0).seconds())*1e-6;
78 printf("one_us_iters: %d\n", one_us_iters);
79 }
80
busy_wait(double us)81 void busy_wait(double us)
82 {
83 unsigned int iter = us*one_us_iters;
84 for (volatile unsigned int i=0; i<iter; ++i) continue;
85 }
86
87
do_push(event elem,int nThr,int impl)88 void do_push(event elem, int nThr, int impl) {
89 if (impl == IMPL_STL) {
90 if (nThr == 1) {
91 stl_cpq->push(elem);
92 }
93 else {
94 tbb::spin_mutex::scoped_lock myLock(*my_mutex);
95 stl_cpq->push(elem);
96 }
97 }
98 else {
99 lfc_pq->push(elem);
100 }
101 }
102
do_pop(event & elem,int nThr,int impl)103 bool do_pop(event& elem, int nThr, int impl) {
104 if (impl == IMPL_STL) {
105 if (nThr == 1) {
106 if (!stl_cpq->empty()) {
107 elem = stl_cpq->top();
108 stl_cpq->pop();
109 return true;
110 }
111 }
112 else {
113 tbb::spin_mutex::scoped_lock myLock(*my_mutex);
114 if (!stl_cpq->empty()) {
115 elem = stl_cpq->top();
116 stl_cpq->pop();
117 return true;
118 }
119 }
120 }
121 else {
122 if (lfc_pq->try_pop(elem)) {
123 return true;
124 }
125 }
126 return false;
127 }
128
129 struct TestPDESloadBody : NoAssign {
130 int nThread;
131 int implementation;
132
TestPDESloadBodyTestPDESloadBody133 TestPDESloadBody(int nThread_, int implementation_) :
134 nThread(nThread_), implementation(implementation_) {}
135
operator ()TestPDESloadBody136 void operator()(const int threadID) const {
137 if (threadID == nThread) {
138 sleep(throughput_window);
139 done = true;
140 }
141 else {
142 event e, tmp;
143 unsigned int num_operations = 0;
144 for (;;) {
145 // pop an event
146 if (do_pop(e, nThread, implementation)) {
147 num_operations++;
148 // do the event
149 busy_wait(e.elapse*contention_unit);
150 while (e.spawn > 0) {
151 tmp.spawn = ((e.spawn+1-min_spawn) % ((max_spawn-min_spawn)+1))+min_spawn;
152 tmp.timestamp = e.timestamp + e.elapse;
153 e.timestamp = tmp.timestamp;
154 e.elapse = ((e.elapse+1-min_elapse) % ((max_elapse-min_elapse)+1))+min_elapse;
155 tmp.elapse = e.elapse;
156 do_push(tmp, nThread, implementation);
157 num_operations++;
158 e.spawn--;
159 busy_wait(e.elapse*contention_unit);
160 if (done) break;
161 }
162 }
163 if (done) break;
164 }
165 operation_count += num_operations;
166 }
167 }
168 };
169
preload_queue(int nThr,int impl)170 void preload_queue(int nThr, int impl) {
171 event an_event;
172 for (int i=0; i<num_initial_events; ++i) {
173 an_event.timestamp = 0;
174 an_event.elapse = (int)rand() % (max_elapse+1);
175 an_event.spawn = (int)rand() % (max_spawn+1);
176 do_push(an_event, nThr, impl);
177 }
178 }
179
TestPDESload(int nThreads)180 void TestPDESload(int nThreads) {
181 REPORT("%4d", nThreads);
182
183 operation_count = 0;
184 done = false;
185 stl_cpq = new std::priority_queue<event, std::vector<event>, timestamp_compare >;
186 preload_queue(nThreads, IMPL_STL);
187 TestPDESloadBody my_stl_test(nThreads, IMPL_STL);
188 start = tbb::tick_count::now();
189 NativeParallelFor(nThreads+1, my_stl_test);
190 delete stl_cpq;
191
192 REPORT(" %10d", operation_count/throughput_window);
193
194 operation_count = 0;
195 done = false;
196 lfc_pq = new concurrent_priority_queue<event, timestamp_compare >;
197 preload_queue(nThreads, IMPL_CPQ);
198 TestPDESloadBody my_cpq_test(nThreads, IMPL_CPQ);
199 start = tbb::tick_count::now();
200 NativeParallelFor(nThreads+1, my_cpq_test);
201 delete lfc_pq;
202
203 REPORT(" %10d\n", operation_count/throughput_window);
204 }
205
TestMain()206 int TestMain() {
207 srand(42);
208 if (MinThread < 1)
209 MinThread = 1;
210 //calibrate_busy_wait();
211 cache_aligned_allocator<spin_mutex> my_mutex_allocator;
212 my_mutex = (spin_mutex *)my_mutex_allocator.allocate(1);
213
214 REPORT("#Thr ");
215 REPORT("STL ");
216 #ifdef LINEARIZABLE
217 REPORT("CPQ_L\n");
218 #else
219 REPORT("CPQ_N\n");
220 #endif
221 for (int p = MinThread; p <= MaxThread; ++p) {
222 TestPDESload(p);
223 }
224
225 return Harness::Done;
226 }
227