1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //! \file test_semaphore.cpp
18 //! \brief Test for [internal] functionality
19 
20 #if _WIN32 || _WIN64
21 #define _CRT_SECURE_NO_WARNINGS
22 #endif
23 
24 // Test for counting semaphore
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/spin_barrier.h"
28 #include "tbb/blocked_range.h"
29 #include "tbb/tick_count.h"
30 #include "../../src/tbb/semaphore.h"
31 #include <atomic>
32 #include <vector>
33 
34 using tbb::detail::r1::semaphore;
35 
36 std::atomic<int> pCount;
37 utils::SpinBarrier sBarrier;
38 
39 // Semaphore basis function:
40 //  set semaphore to initial value
41 // see that semaphore only allows that number of threads to be active
42 class Body : utils::NoAssign {
43     const int nIters;
44     semaphore& mySem;
45     std::vector<int>& ourCounts;
46     std::vector<double>& tottime;
47 
48     static constexpr int tickCounts = 1; // millisecond
49     static constexpr int innerWait = 5; // millisecond
50 public:
Body(int nThread,int nIter,semaphore & sem,std::vector<int> & our_counts,std::vector<double> & tot_time)51     Body( int nThread, int nIter, semaphore& sem,
52           std::vector<int>& our_counts, std::vector<double>& tot_time )
53         : nIters(nIter), mySem(sem), ourCounts(our_counts), tottime(tot_time)
54     {
55         sBarrier.initialize(nThread);
56         pCount = 0;
57     }
58 
operator ()(const int tid) const59     void operator()( const int tid ) const {
60         sBarrier.wait();
61 
62         for (int i = 0; i < nIters; ++i) {
63             utils::Sleep(tid * tickCounts);
64             tbb::tick_count t0 = tbb::tick_count::now();
65             mySem.P();
66             tbb::tick_count t1 = tbb::tick_count::now();
67             tottime[tid] += (t1 - t0).seconds();
68 
69             int curval = ++pCount;
70             if (curval > ourCounts[tid]) {
71                 ourCounts[tid] = curval;
72             }
73             utils::Sleep(innerWait);
74             --pCount;
75             REQUIRE(int(pCount) >= 0);
76             mySem.V();
77         }
78     }
79 }; // class Body
80 
test_semaphore(int sem_init_cnt,int extra_threads)81 void test_semaphore( int sem_init_cnt, int extra_threads ) {
82     semaphore my_sem(sem_init_cnt);
83     int n_threads = sem_init_cnt + extra_threads;
84 
85     std::vector<int> max_vals(n_threads);
86     std::vector<double> tot_times(n_threads);
87 
88     int n_iters = 10;
89     Body body(n_threads, n_iters, my_sem, max_vals, tot_times);
90 
91     pCount = 0;
92     utils::NativeParallelFor(n_threads, body);
93 
94     if (extra_threads == 0) {
95         double allPWaits = 0;
96         for (auto item : tot_times) {
97             allPWaits += item;
98         }
99         allPWaits /= static_cast<double>(n_threads * n_iters);
100     }
101     REQUIRE_MESSAGE(!pCount, "not all threads decremented pCount");
102 
103     int max_count = -1;
104     for (auto item : max_vals) {
105         max_count = utils::max(max_count, item);
106     }
107     REQUIRE_MESSAGE(max_count <= sem_init_cnt, "Too many threads in semaphore-protected increment");
108 }
109 
110 #include "../../src/tbb/semaphore.cpp"
111 #if _WIN32 || _WIN64
112 #include "../../src/tbb/dynamic_link.cpp"
113 #endif
114 
115 constexpr std::size_t N_TIMES = 1000;
116 
117 template <typename S>
118 struct Counter {
119     std::atomic<long> value;
120     S my_sem;
CounterCounter121     Counter() : value(0) {}
122 }; // struct Counter
123 
124 // Function object for use with parallel_for.h
125 template <typename C>
126 struct AddOne : utils::NoAssign {
127     C& my_counter;
128 
129     // Increments counter once for each iteration in the iteration space
operator ()AddOne130     void operator()( int ) const {
131         for (std::size_t i = 0; i < N_TIMES; ++i) {
132             my_counter.my_sem.P();
133             ++my_counter.value;
134             my_counter.my_sem.V();
135         }
136     }
137 
AddOneAddOne138     AddOne( C& c ) : my_counter(c) {
139         my_counter.my_sem.V();
140     }
141 }; // struct AddOne
142 
test_binary_semaphore(int n_threads)143 void test_binary_semaphore( int n_threads ) {
144     Counter<tbb::detail::r1::binary_semaphore> counter;
145     AddOne<decltype(counter)> AddOneBody(counter);
146     utils::NativeParallelFor(n_threads, AddOneBody);
147     REQUIRE_MESSAGE(n_threads * N_TIMES == counter.value, "Binary semaphore operations P()/V() have a race");
148 }
149 
150 // Power of 2, the most tokens that can be in flight
151 constexpr std::size_t MAX_TOKENS = 32;
152 enum FilterType { imaProducer, imaConsumer };
153 
154 class FilterBase : utils::NoAssign {
155 protected:
156     FilterType ima;
157     unsigned totTokens; // total number of tokens to be emitted, only used by producer
158     std::atomic<unsigned>& myTokens;
159     std::atomic<unsigned>& otherTokens;
160 
161     unsigned myWait;
162     semaphore& my_sem;
163     semaphore& next_sem;
164 
165     unsigned* myBuffer;
166     unsigned* nextBuffer;
167     unsigned curToken;
168 public:
FilterBase(FilterType filter,unsigned tot_tokens,std::atomic<unsigned> & my_tokens,std::atomic<unsigned> & other_tokens,unsigned my_wait,semaphore & m_sem,semaphore & n_sem,unsigned * buf,unsigned * n_buf)169     FilterBase( FilterType filter,
170                 unsigned tot_tokens,
171                 std::atomic<unsigned>& my_tokens,
172                 std::atomic<unsigned>& other_tokens,
173                 unsigned my_wait,
174                 semaphore& m_sem,
175                 semaphore& n_sem,
176                 unsigned* buf,
177                 unsigned* n_buf )
178         : ima(filter), totTokens(tot_tokens), myTokens(my_tokens),
179           otherTokens(other_tokens), myWait(my_wait), my_sem(m_sem),
180           next_sem(n_sem), myBuffer(buf), nextBuffer(n_buf)
181     {
182         curToken = 0;
183     }
184 
185     void Produce( const int );
186     void Consume( const int );
operator ()(const int tid)187     void operator()( const int tid ) {
188         if (ima == imaConsumer) {
189             Consume(tid);
190         } else {
191             Produce(tid);
192         }
193     }
194 }; // class FilterBase
195 
196 class ProduceConsumeBody {
197     FilterBase** my_filters;
198 public:
ProduceConsumeBody(FilterBase ** filters)199     ProduceConsumeBody( FilterBase** filters ) : my_filters(filters) {}
200 
operator ()(const int tid) const201     void operator()( const int tid ) const {
202         my_filters[tid]->operator()(tid);
203     }
204 }; // class ProduceConsumeBody
205 
206 // send a bunch of non-null "tokens" to consumer, then a NULL
Produce(const int)207 void FilterBase::Produce( const int ) {
208     nextBuffer[0] = 0; // just in case we provide no tokens
209     sBarrier.wait();
210     while(totTokens) {
211         while(!myTokens) {
212             my_sem.P();
213         }
214         // we have a slot available
215         --myTokens; // moving this down reduces spurious wakeups
216         --totTokens;
217         if (totTokens) {
218             nextBuffer[curToken & (MAX_TOKENS - 1)] = curToken * 3 + 1;
219         } else {
220             nextBuffer[curToken & (MAX_TOKENS - 1)] = 0;
221         }
222         ++curToken;
223 
224         utils::Sleep(myWait);
225         unsigned temp = ++otherTokens;
226         if (temp == 1) {
227             next_sem.V();
228         }
229     }
230     next_sem.V(); // final wakeup
231 }
232 
Consume(const int)233 void FilterBase::Consume( const int ) {
234     unsigned myToken;
235     sBarrier.wait();
236     do {
237         while( !myTokens ) {
238             my_sem.P();
239         }
240         // we have a slot available
241         --myTokens;
242         myToken = myBuffer[curToken & (MAX_TOKENS - 1)];
243         if (myToken) {
244             REQUIRE_MESSAGE(myToken == curToken * 3 + 1, "Error in received token");
245             ++curToken;
246             utils::Sleep(myWait);
247             unsigned temp = ++otherTokens;
248             if (temp == 1) {
249                 next_sem.V();
250             }
251         }
252     } while(myToken);
253     // end of processing
254     REQUIRE_MESSAGE(curToken + 1 == totTokens, "Didn't receive enough tokens");
255 }
256 
257 // test of producer/consumer with atomic buffer cnt and semaphore
258 // nTokens are total number of tokens through the pipe
259 // pWait is the wait time for the producer
260 // cWait is the wait time for the consumer
test_producer_consumer(unsigned totTokens,unsigned nTokens,unsigned pWait,unsigned cWait)261 void test_producer_consumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait ) {
262     semaphore p_sem;
263     semaphore c_sem;
264     std::atomic<unsigned> p_tokens;
265     std::atomic<unsigned> c_tokens(0);
266 
267     unsigned c_buffer[MAX_TOKENS];
268     FilterBase* my_filters[2]; // one producer, one concumer
269 
270     REQUIRE_MESSAGE(nTokens <= MAX_TOKENS, "Not enough slots for tokens");
271 
272     my_filters[0] = new FilterBase(imaProducer, totTokens, p_tokens, c_tokens, pWait, c_sem, p_sem, nullptr, &(c_buffer[0]));
273     my_filters[1] = new FilterBase(imaConsumer, totTokens, c_tokens, p_tokens, cWait, p_sem, c_sem, c_buffer, nullptr);
274 
275     p_tokens = nTokens;
276     ProduceConsumeBody body(my_filters);
277     sBarrier.initialize(2);
278     utils::NativeParallelFor(2, body);
279     delete my_filters[0];
280     delete my_filters[1];
281 }
282 
283 //! \brief \ref error_guessing
284 TEST_CASE("test binary semaphore") {
285     test_binary_semaphore(utils::MaxThread);
286 }
287 
288 //! \brief \ref error_guessing
289 TEST_CASE("test semaphore") {
290     for (int sem_size = 1; sem_size <= int(utils::MaxThread); ++sem_size) {
291         for (int ex_threads = 0; ex_threads <= int(utils::MaxThread) - sem_size; ++ex_threads) {
292             test_semaphore(sem_size, ex_threads);
293         }
294     }
295 }
296 
297 //! \brief \ref error_guessing
298 TEST_CASE("test producer-consumer") {
299     test_producer_consumer(10, 2, 5, 5);
300     test_producer_consumer(10, 2, 20, 5);
301     test_producer_consumer(10, 2, 5, 20);
302 
303     test_producer_consumer(10, 1, 5, 5);
304     test_producer_consumer(20, 10, 5, 20);
305     test_producer_consumer(64, 32, 1, 20);
306 }
307