1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include <cstdlib>
18 #include <cmath>
19 #include <queue>
20 #include "tbb/tbb_stddef.h"
21 #include "tbb/spin_mutex.h"
22 #include "tbb/task_scheduler_init.h"
23 #include "tbb/parallel_for.h"
24 #include "tbb/tick_count.h"
25 #include "tbb/blocked_range.h"
26 #include "../test/harness.h"
27 #include "tbb/concurrent_priority_queue.h"
28 
29 #pragma warning(disable: 4996)
30 
31 #define IMPL_STL 0
32 #define IMPL_CPQ 1
33 
34 using namespace tbb;
35 
36 //const int contention = 75; // degree contention.  100 = 0 us busy_wait, 50 = 50*contention_unit us
37 const double contention_unit = 0.025; // in microseconds (us)
38 const double throughput_window = 30; // in seconds
39 const int num_initial_events = 10000; // number of initial events in the queue
40 const int min_elapse = 20; // min contention_units to elapse between event spawns
41 const int max_elapse = 40; // max contention_units to elapse between event spawns
42 const int min_spawn = 0; // min number of events to spawn
43 const int max_spawn = 2; // max number of events to spawn
44 
45 tbb::atomic<unsigned int> operation_count;
46 tbb::tick_count start;
47 bool done;
48 
49 class event {
50 public:
51     int timestamp;
52     int elapse;
53     int spawn;
54 };
55 
56 class timestamp_compare {
57 public:
operator ()(event e1,event e2)58     bool operator()(event e1, event e2) {
59         return e2.timestamp<e1.timestamp;
60     }
61 };
62 
63 spin_mutex *my_mutex;
64 std::priority_queue<event, std::vector<event>, timestamp_compare > *stl_cpq;
65 concurrent_priority_queue<event, timestamp_compare > *lfc_pq;
66 
67 unsigned int one_us_iters = 429; // default value
68 
69 // if user wants to calibrate to microseconds on particular machine, call this at beginning of program
70 // sets one_us_iters to number of iters to busy_wait for approx. 1 us
calibrate_busy_wait()71 void calibrate_busy_wait() {
72     const unsigned niter = 1000000;
73     tbb::tick_count t0 = tbb::tick_count::now();
74     for (volatile unsigned int i=0; i<niter; ++i) continue;
75     tbb::tick_count t1 = tbb::tick_count::now();
76 
77     one_us_iters = (unsigned int)(niter/(t1-t0).seconds())*1e-6;
78     printf("one_us_iters: %d\n", one_us_iters);
79 }
80 
busy_wait(double us)81 void busy_wait(double us)
82 {
83     unsigned int iter = us*one_us_iters;
84     for (volatile unsigned int i=0; i<iter; ++i) continue;
85 }
86 
87 
do_push(event elem,int nThr,int impl)88 void do_push(event elem, int nThr, int impl) {
89     if (impl == IMPL_STL) {
90         if (nThr == 1) {
91             stl_cpq->push(elem);
92         }
93         else {
94             tbb::spin_mutex::scoped_lock myLock(*my_mutex);
95             stl_cpq->push(elem);
96         }
97     }
98     else {
99         lfc_pq->push(elem);
100     }
101 }
102 
do_pop(event & elem,int nThr,int impl)103 bool do_pop(event& elem, int nThr, int impl) {
104     if (impl == IMPL_STL) {
105         if (nThr == 1) {
106             if (!stl_cpq->empty()) {
107                 elem = stl_cpq->top();
108                 stl_cpq->pop();
109                 return true;
110             }
111         }
112         else {
113             tbb::spin_mutex::scoped_lock myLock(*my_mutex);
114             if (!stl_cpq->empty()) {
115                 elem = stl_cpq->top();
116                 stl_cpq->pop();
117                 return true;
118             }
119         }
120     }
121     else {
122         if (lfc_pq->try_pop(elem)) {
123             return true;
124         }
125     }
126     return false;
127 }
128 
129 struct TestPDESloadBody : NoAssign {
130     int nThread;
131     int implementation;
132 
TestPDESloadBodyTestPDESloadBody133     TestPDESloadBody(int nThread_, int implementation_) :
134         nThread(nThread_), implementation(implementation_) {}
135 
operator ()TestPDESloadBody136     void operator()(const int threadID) const {
137         if (threadID == nThread) {
138             sleep(throughput_window);
139             done = true;
140         }
141         else {
142             event e, tmp;
143             unsigned int num_operations = 0;
144             for (;;) {
145                 // pop an event
146                 if (do_pop(e, nThread, implementation)) {
147                     num_operations++;
148                     // do the event
149                     busy_wait(e.elapse*contention_unit);
150                     while (e.spawn > 0) {
151                         tmp.spawn = ((e.spawn+1-min_spawn) % ((max_spawn-min_spawn)+1))+min_spawn;
152                         tmp.timestamp = e.timestamp + e.elapse;
153                         e.timestamp = tmp.timestamp;
154                         e.elapse = ((e.elapse+1-min_elapse) % ((max_elapse-min_elapse)+1))+min_elapse;
155                         tmp.elapse = e.elapse;
156                         do_push(tmp, nThread, implementation);
157                         num_operations++;
158                         e.spawn--;
159                         busy_wait(e.elapse*contention_unit);
160                         if (done) break;
161                     }
162                 }
163                 if (done) break;
164             }
165             operation_count += num_operations;
166         }
167     }
168 };
169 
preload_queue(int nThr,int impl)170 void preload_queue(int nThr, int impl) {
171     event an_event;
172     for (int i=0; i<num_initial_events; ++i) {
173         an_event.timestamp = 0;
174         an_event.elapse = (int)rand() % (max_elapse+1);
175         an_event.spawn = (int)rand() % (max_spawn+1);
176         do_push(an_event, nThr, impl);
177     }
178 }
179 
TestPDESload(int nThreads)180 void TestPDESload(int nThreads) {
181     REPORT("%4d", nThreads);
182 
183     operation_count = 0;
184     done = false;
185     stl_cpq = new std::priority_queue<event, std::vector<event>, timestamp_compare >;
186     preload_queue(nThreads, IMPL_STL);
187     TestPDESloadBody my_stl_test(nThreads, IMPL_STL);
188     start = tbb::tick_count::now();
189     NativeParallelFor(nThreads+1, my_stl_test);
190     delete stl_cpq;
191 
192     REPORT(" %10d", operation_count/throughput_window);
193 
194     operation_count = 0;
195     done = false;
196     lfc_pq = new concurrent_priority_queue<event, timestamp_compare >;
197     preload_queue(nThreads, IMPL_CPQ);
198     TestPDESloadBody my_cpq_test(nThreads, IMPL_CPQ);
199     start = tbb::tick_count::now();
200     NativeParallelFor(nThreads+1, my_cpq_test);
201     delete lfc_pq;
202 
203     REPORT(" %10d\n", operation_count/throughput_window);
204 }
205 
TestMain()206 int TestMain() {
207     srand(42);
208     if (MinThread < 1)
209         MinThread = 1;
210     //calibrate_busy_wait();
211     cache_aligned_allocator<spin_mutex> my_mutex_allocator;
212     my_mutex = (spin_mutex *)my_mutex_allocator.allocate(1);
213 
214     REPORT("#Thr ");
215     REPORT("STL        ");
216 #ifdef LINEARIZABLE
217     REPORT("CPQ_L\n");
218 #else
219     REPORT("CPQ_N\n");
220 #endif
221     for (int p = MinThread; p <= MaxThread; ++p) {
222         TestPDESload(p);
223     }
224 
225     return Harness::Done;
226 }
227