1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 //! \file test_semaphore.cpp
18 //! \brief Test for [internal] functionality
19
20 #if _WIN32 || _WIN64
21 #define _CRT_SECURE_NO_WARNINGS
22 #endif
23
24 // Test for counting semaphore
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/spin_barrier.h"
28 #include "tbb/blocked_range.h"
29 #include "tbb/tick_count.h"
30 #include "../../src/tbb/semaphore.h"
31 #include <atomic>
32 #include <vector>
33
34 using tbb::detail::r1::semaphore;
35
36 std::atomic<int> pCount;
37 utils::SpinBarrier sBarrier;
38
39 // Semaphore basis function:
40 // set semaphore to initial value
41 // see that semaphore only allows that number of threads to be active
42 class Body : utils::NoAssign {
43 const int nIters;
44 semaphore& mySem;
45 std::vector<int>& ourCounts;
46 std::vector<double>& tottime;
47
48 static constexpr int tickCounts = 1; // millisecond
49 static constexpr int innerWait = 5; // millisecond
50 public:
Body(int nThread,int nIter,semaphore & sem,std::vector<int> & our_counts,std::vector<double> & tot_time)51 Body( int nThread, int nIter, semaphore& sem,
52 std::vector<int>& our_counts, std::vector<double>& tot_time )
53 : nIters(nIter), mySem(sem), ourCounts(our_counts), tottime(tot_time)
54 {
55 sBarrier.initialize(nThread);
56 pCount = 0;
57 }
58
operator ()(const int tid) const59 void operator()( const int tid ) const {
60 sBarrier.wait();
61
62 for (int i = 0; i < nIters; ++i) {
63 utils::Sleep(tid * tickCounts);
64 tbb::tick_count t0 = tbb::tick_count::now();
65 mySem.P();
66 tbb::tick_count t1 = tbb::tick_count::now();
67 tottime[tid] += (t1 - t0).seconds();
68
69 int curval = ++pCount;
70 if (curval > ourCounts[tid]) {
71 ourCounts[tid] = curval;
72 }
73 utils::Sleep(innerWait);
74 --pCount;
75 REQUIRE(int(pCount) >= 0);
76 mySem.V();
77 }
78 }
79 }; // class Body
80
test_semaphore(int sem_init_cnt,int extra_threads)81 void test_semaphore( int sem_init_cnt, int extra_threads ) {
82 semaphore my_sem(sem_init_cnt);
83 int n_threads = sem_init_cnt + extra_threads;
84
85 std::vector<int> max_vals(n_threads);
86 std::vector<double> tot_times(n_threads);
87
88 int n_iters = 10;
89 Body body(n_threads, n_iters, my_sem, max_vals, tot_times);
90
91 pCount = 0;
92 utils::NativeParallelFor(n_threads, body);
93
94 if (extra_threads == 0) {
95 double allPWaits = 0;
96 for (auto item : tot_times) {
97 allPWaits += item;
98 }
99 allPWaits /= static_cast<double>(n_threads * n_iters);
100 }
101 REQUIRE_MESSAGE(!pCount, "not all threads decremented pCount");
102
103 int max_count = -1;
104 for (auto item : max_vals) {
105 max_count = utils::max(max_count, item);
106 }
107 REQUIRE_MESSAGE(max_count <= sem_init_cnt, "Too many threads in semaphore-protected increment");
108 }
109
110 #include "../../src/tbb/semaphore.cpp"
111 #if _WIN32 || _WIN64
112 #include "../../src/tbb/dynamic_link.cpp"
113 #endif
114
115 constexpr std::size_t N_TIMES = 1000;
116
117 template <typename S>
118 struct Counter {
119 std::atomic<long> value;
120 S my_sem;
CounterCounter121 Counter() : value(0) {}
122 }; // struct Counter
123
124 // Function object for use with parallel_for.h
125 template <typename C>
126 struct AddOne : utils::NoAssign {
127 C& my_counter;
128
129 // Increments counter once for each iteration in the iteration space
operator ()AddOne130 void operator()( int ) const {
131 for (std::size_t i = 0; i < N_TIMES; ++i) {
132 my_counter.my_sem.P();
133 ++my_counter.value;
134 my_counter.my_sem.V();
135 }
136 }
137
AddOneAddOne138 AddOne( C& c ) : my_counter(c) {
139 my_counter.my_sem.V();
140 }
141 }; // struct AddOne
142
test_binary_semaphore(int n_threads)143 void test_binary_semaphore( int n_threads ) {
144 Counter<tbb::detail::r1::binary_semaphore> counter;
145 AddOne<decltype(counter)> AddOneBody(counter);
146 utils::NativeParallelFor(n_threads, AddOneBody);
147 REQUIRE_MESSAGE(n_threads * N_TIMES == counter.value, "Binary semaphore operations P()/V() have a race");
148 }
149
150 // Power of 2, the most tokens that can be in flight
151 constexpr std::size_t MAX_TOKENS = 32;
152 enum FilterType { imaProducer, imaConsumer };
153
154 class FilterBase : utils::NoAssign {
155 protected:
156 FilterType ima;
157 unsigned totTokens; // total number of tokens to be emitted, only used by producer
158 std::atomic<unsigned>& myTokens;
159 std::atomic<unsigned>& otherTokens;
160
161 unsigned myWait;
162 semaphore& my_sem;
163 semaphore& next_sem;
164
165 unsigned* myBuffer;
166 unsigned* nextBuffer;
167 unsigned curToken;
168 public:
FilterBase(FilterType filter,unsigned tot_tokens,std::atomic<unsigned> & my_tokens,std::atomic<unsigned> & other_tokens,unsigned my_wait,semaphore & m_sem,semaphore & n_sem,unsigned * buf,unsigned * n_buf)169 FilterBase( FilterType filter,
170 unsigned tot_tokens,
171 std::atomic<unsigned>& my_tokens,
172 std::atomic<unsigned>& other_tokens,
173 unsigned my_wait,
174 semaphore& m_sem,
175 semaphore& n_sem,
176 unsigned* buf,
177 unsigned* n_buf )
178 : ima(filter), totTokens(tot_tokens), myTokens(my_tokens),
179 otherTokens(other_tokens), myWait(my_wait), my_sem(m_sem),
180 next_sem(n_sem), myBuffer(buf), nextBuffer(n_buf)
181 {
182 curToken = 0;
183 }
184
185 void Produce( const int );
186 void Consume( const int );
operator ()(const int tid)187 void operator()( const int tid ) {
188 if (ima == imaConsumer) {
189 Consume(tid);
190 } else {
191 Produce(tid);
192 }
193 }
194 }; // class FilterBase
195
196 class ProduceConsumeBody {
197 FilterBase** my_filters;
198 public:
ProduceConsumeBody(FilterBase ** filters)199 ProduceConsumeBody( FilterBase** filters ) : my_filters(filters) {}
200
operator ()(const int tid) const201 void operator()( const int tid ) const {
202 my_filters[tid]->operator()(tid);
203 }
204 }; // class ProduceConsumeBody
205
206 // send a bunch of non-null "tokens" to consumer, then a NULL
Produce(const int)207 void FilterBase::Produce( const int ) {
208 nextBuffer[0] = 0; // just in case we provide no tokens
209 sBarrier.wait();
210 while(totTokens) {
211 while(!myTokens) {
212 my_sem.P();
213 }
214 // we have a slot available
215 --myTokens; // moving this down reduces spurious wakeups
216 --totTokens;
217 if (totTokens) {
218 nextBuffer[curToken & (MAX_TOKENS - 1)] = curToken * 3 + 1;
219 } else {
220 nextBuffer[curToken & (MAX_TOKENS - 1)] = 0;
221 }
222 ++curToken;
223
224 utils::Sleep(myWait);
225 unsigned temp = ++otherTokens;
226 if (temp == 1) {
227 next_sem.V();
228 }
229 }
230 next_sem.V(); // final wakeup
231 }
232
Consume(const int)233 void FilterBase::Consume( const int ) {
234 unsigned myToken;
235 sBarrier.wait();
236 do {
237 while( !myTokens ) {
238 my_sem.P();
239 }
240 // we have a slot available
241 --myTokens;
242 myToken = myBuffer[curToken & (MAX_TOKENS - 1)];
243 if (myToken) {
244 REQUIRE_MESSAGE(myToken == curToken * 3 + 1, "Error in received token");
245 ++curToken;
246 utils::Sleep(myWait);
247 unsigned temp = ++otherTokens;
248 if (temp == 1) {
249 next_sem.V();
250 }
251 }
252 } while(myToken);
253 // end of processing
254 REQUIRE_MESSAGE(curToken + 1 == totTokens, "Didn't receive enough tokens");
255 }
256
257 // test of producer/consumer with atomic buffer cnt and semaphore
258 // nTokens are total number of tokens through the pipe
259 // pWait is the wait time for the producer
260 // cWait is the wait time for the consumer
test_producer_consumer(unsigned totTokens,unsigned nTokens,unsigned pWait,unsigned cWait)261 void test_producer_consumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait ) {
262 semaphore p_sem;
263 semaphore c_sem;
264 std::atomic<unsigned> p_tokens;
265 std::atomic<unsigned> c_tokens(0);
266
267 unsigned c_buffer[MAX_TOKENS];
268 FilterBase* my_filters[2]; // one producer, one concumer
269
270 REQUIRE_MESSAGE(nTokens <= MAX_TOKENS, "Not enough slots for tokens");
271
272 my_filters[0] = new FilterBase(imaProducer, totTokens, p_tokens, c_tokens, pWait, c_sem, p_sem, nullptr, &(c_buffer[0]));
273 my_filters[1] = new FilterBase(imaConsumer, totTokens, c_tokens, p_tokens, cWait, p_sem, c_sem, c_buffer, nullptr);
274
275 p_tokens = nTokens;
276 ProduceConsumeBody body(my_filters);
277 sBarrier.initialize(2);
278 utils::NativeParallelFor(2, body);
279 delete my_filters[0];
280 delete my_filters[1];
281 }
282
283 //! \brief \ref error_guessing
284 TEST_CASE("test binary semaphore") {
285 test_binary_semaphore(utils::MaxThread);
286 }
287
288 //! \brief \ref error_guessing
289 TEST_CASE("test semaphore") {
290 for (int sem_size = 1; sem_size <= int(utils::MaxThread); ++sem_size) {
291 for (int ex_threads = 0; ex_threads <= int(utils::MaxThread) - sem_size; ++ex_threads) {
292 test_semaphore(sem_size, ex_threads);
293 }
294 }
295 }
296
297 //! \brief \ref error_guessing
298 TEST_CASE("test producer-consumer") {
299 test_producer_consumer(10, 2, 5, 5);
300 test_producer_consumer(10, 2, 20, 5);
301 test_producer_consumer(10, 2, 5, 20);
302
303 test_producer_consumer(10, 1, 5, 5);
304 test_producer_consumer(20, 10, 5, 20);
305 test_producer_consumer(64, 32, 1, 20);
306 }
307