1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //
18 // Test for counting semaphore.
19 //
20 // set semaphore to N
21 // create N + M threads
22 // have each thread
23 //   A. P()
24 //   B. increment atomic count
25 //   C. spin for awhile checking the value of the count; make sure it doesn't exceed N
26 //   D. decrement atomic count
27 //   E. V()
28 //
29 
30 #include "../tbb/semaphore.h"
31 #include "tbb/atomic.h"
32 #include "tbb/blocked_range.h"
33 
34 #include <vector>
35 using std::vector;
36 
37 #include "harness_assert.h"
38 #include "harness.h"
39 
40 using tbb::internal::semaphore;
41 
42 #include "harness_barrier.h"
43 
44 tbb::atomic<int> pCount;
45 
46 Harness::SpinBarrier sBarrier;
47 
48 #include "tbb/tick_count.h"
49 // semaphore basic function:
50 //   set semaphore to initial value
51 //   see that semaphore only allows that number of threads to be active
52 class Body: NoAssign {
53     const int nIters;
54     tbb::internal::semaphore &mySem;
55     vector<int> &ourCounts;
56     vector<double> &tottime;
57     static const int tickCounts = 1;  // millisecond
58     static const int innerWait = 5; // millisecond
59 public:
Body(int nThread_,int nIter_,semaphore & mySem_,vector<int> & ourCounts_,vector<double> & tottime_)60     Body(int nThread_, int nIter_, semaphore &mySem_,
61             vector<int>& ourCounts_,
62             vector<double>& tottime_
63             ) : nIters(nIter_), mySem(mySem_), ourCounts(ourCounts_), tottime(tottime_) { sBarrier.initialize(nThread_); pCount = 0; }
operator ()(const int tid) const64 void operator()(const int tid) const {
65     sBarrier.wait();
66     for(int i=0; i < nIters; ++i) {
67         Harness::Sleep( tid * tickCounts );
68         tbb::tick_count t0 = tbb::tick_count::now();
69         mySem.P();
70         tbb::tick_count t1 = tbb::tick_count::now();
71         tottime[tid] += (t1-t0).seconds();
72         int curval = ++pCount;
73         if(curval > ourCounts[tid]) ourCounts[tid] = curval;
74         Harness::Sleep( innerWait );
75         --pCount;
76         ASSERT((int)pCount >= 0, NULL);
77         mySem.V();
78     }
79 }
80 };
81 
82 
testSemaphore(int semInitCnt,int extraThreads)83 void testSemaphore( int semInitCnt, int extraThreads ) {
84     semaphore my_sem(semInitCnt);
85     // tbb::task_scheduler_init init(tbb::task_scheduler_init::deferred);
86     int nThreads = semInitCnt + extraThreads;
87     vector<int> maxVals(nThreads);
88     vector<double> totTimes(nThreads);
89     int nIters = 10;
90     Body myBody(nThreads, nIters, my_sem, maxVals, totTimes);
91 
92     REMARK( " sem(%d) with %d extra threads\n", semInitCnt, extraThreads);
93     pCount = 0;
94     NativeParallelFor(nThreads, myBody);
95     if(extraThreads == 0) {
96         double allPWaits = 0;
97         for(vector<double>::const_iterator j = totTimes.begin(); j != totTimes.end(); ++j) {
98             allPWaits += *j;
99         }
100         allPWaits /= static_cast<double>(nThreads * nIters);
101         REMARK("Average wait for P() in uncontested case for nThreads = %d is %g\n", nThreads, allPWaits);
102     }
103     ASSERT(!pCount, "not all threads decremented pCount");
104     int maxCount = -1;
105     for(vector<int>::const_iterator i=maxVals.begin(); i!= maxVals.end();++i) {
106         maxCount = max(maxCount,*i);
107     }
108     ASSERT(maxCount <= semInitCnt,"too many threads in semaphore-protected increment");
109     if(maxCount < semInitCnt) {
110         REMARK("Not enough threads in semaphore-protected region (%d < %d)\n", static_cast<int>(maxCount), semInitCnt);
111     }
112 }
113 
114 #include "../tbb/semaphore.cpp"
115 #if _WIN32||_WIN64
116 #include "../tbb/dynamic_link.cpp"
117 
testOSVersion()118 void testOSVersion() {
119 #if __TBB_USE_SRWLOCK
120      BOOL bIsWindowsVistaOrLater;
121 #if  __TBB_WIN8UI_SUPPORT
122      bIsWindowsVistaOrLater = true;
123 #else
124      OSVERSIONINFO osvi;
125 
126      memset( (void*)&osvi, 0, sizeof(OSVERSIONINFO) );
127      osvi.dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
128      GetVersionEx(&osvi);
129      bIsWindowsVistaOrLater = (osvi.dwMajorVersion >= 6 );
130 #endif
131 
132      if( bIsWindowsVistaOrLater ) {
133         REMARK("Checking SRWLock is loaded\n");
134         tbb::internal::binary_semaphore s;
135         ASSERT( (uintptr_t)tbb::internal::__TBB_init_binsem!=(uintptr_t)&tbb::internal::init_binsem_using_event, NULL );
136         ASSERT( (uintptr_t)tbb::internal::__TBB_acquire_binsem!=(uintptr_t)&tbb::internal::acquire_binsem_using_event, NULL );
137         ASSERT( (uintptr_t)tbb::internal::__TBB_release_binsem!=(uintptr_t)&tbb::internal::release_binsem_using_event, NULL );
138      }
139 #endif /* __TBB_USE_SRWLOCK */
140 }
141 #endif /* _WIN32||_WIN64 */
142 
143 #define N_TIMES 1000
144 
145 template<typename S>
146 struct Counter {
147     volatile long value;
148     S my_sem;
CounterCounter149     Counter() : value(0) {}
150 };
151 
152 //! Function object for use with parallel_for.h.
153 template<typename C>
154 struct AddOne: NoAssign {
155     C& my_counter;
156     /** Increments counter once for each iteration in the iteration space. */
operator ()AddOne157     void operator()( int /*tid*/ ) const {
158         for( size_t i=0; i<N_TIMES; ++i ) {
159             my_counter.my_sem.P();
160             my_counter.value = my_counter.value + 1;
161             my_counter.my_sem.V();
162         }
163     }
AddOneAddOne164     AddOne( C& c_ ) : my_counter(c_) { my_counter.my_sem.V(); }
165 };
166 
testBinarySemaphore(int nThreads)167 void testBinarySemaphore( int nThreads ) {
168     REMARK("Testing binary semaphore\n");
169     Counter<tbb::internal::binary_semaphore> counter;
170     AddOne<Counter<tbb::internal::binary_semaphore> > myAddOne(counter);
171     NativeParallelFor( nThreads, myAddOne );
172     ASSERT( nThreads*N_TIMES==counter.value, "Binary semaphore operations P()/V() have a race");
173 }
174 
175 // Power of 2, the most tokens that can be in flight.
176 #define MAX_TOKENS 32
177 enum FilterType { imaProducer, imaConsumer };
178 class FilterBase : NoAssign {
179 protected:
180     FilterType ima;
181     unsigned totTokens;  // total number of tokens to be emitted, only used by producer
182     tbb::atomic<unsigned>& myTokens;
183     tbb::atomic<unsigned>& otherTokens;
184     unsigned myWait;
185     semaphore &mySem;
186     semaphore &nextSem;
187     unsigned* myBuffer;
188     unsigned* nextBuffer;
189     unsigned curToken;
190 public:
FilterBase(FilterType ima_,unsigned totTokens_,tbb::atomic<unsigned> & myTokens_,tbb::atomic<unsigned> & otherTokens_,unsigned myWait_,semaphore & mySem_,semaphore & nextSem_,unsigned * myBuffer_,unsigned * nextBuffer_)191     FilterBase( FilterType ima_
192             ,unsigned totTokens_
193             ,tbb::atomic<unsigned>& myTokens_
194             ,tbb::atomic<unsigned>& otherTokens_
195             ,unsigned myWait_
196             ,semaphore &mySem_
197             ,semaphore &nextSem_
198             ,unsigned* myBuffer_
199             ,unsigned* nextBuffer_
200             )
201         : ima(ima_),totTokens(totTokens_),myTokens(myTokens_),otherTokens(otherTokens_),myWait(myWait_),mySem(mySem_),
202           nextSem(nextSem_),myBuffer(myBuffer_),nextBuffer(nextBuffer_)
203     {
204         curToken = 0;
205     }
206     void Produce(const int tid);
207     void Consume(const int tid);
operator ()(const int tid)208     void operator()(const int tid) { if(ima == imaConsumer) Consume(tid); else Produce(tid); }
209 };
210 
211 class ProduceConsumeBody {
212     FilterBase** myFilters;
213     public:
ProduceConsumeBody(FilterBase ** myFilters_)214     ProduceConsumeBody(FilterBase** myFilters_) : myFilters(myFilters_) {}
operator ()(const int tid) const215     void operator()(const int tid) const {
216         myFilters[tid]->operator()(tid);
217     }
218 };
219 
220 // send a bunch of non-Null "tokens" to consumer, then a NULL.
Produce(const int)221 void FilterBase::Produce(const int /*tid*/) {
222     nextBuffer[0] = 0;  // just in case we provide no tokens
223     sBarrier.wait();
224     while(totTokens) {
225         while(!myTokens)
226             mySem.P();
227         // we have a slot available.
228         --myTokens;  // moving this down reduces spurious wakeups
229         --totTokens;
230         if(totTokens)
231             nextBuffer[curToken&(MAX_TOKENS-1)] = curToken*3+1;
232         else
233             nextBuffer[curToken&(MAX_TOKENS-1)] = 0;
234         ++curToken;
235         Harness::Sleep(myWait);
236         unsigned temp = ++otherTokens;
237         if(temp == 1)
238             nextSem.V();
239     }
240     nextSem.V();  // final wakeup
241 }
242 
Consume(const int)243 void FilterBase::Consume(const int /*tid*/) {
244     unsigned myToken;
245     sBarrier.wait();
246     do {
247         while(!myTokens)
248             mySem.P();
249         // we have a slot available.
250         --myTokens;  // moving this down reduces spurious wakeups
251         myToken = myBuffer[curToken&(MAX_TOKENS-1)];
252         if(myToken) {
253             ASSERT(myToken == curToken*3+1, "Error in received token");
254             ++curToken;
255             Harness::Sleep(myWait);
256             unsigned temp = ++otherTokens;
257             if(temp == 1)
258                 nextSem.V();
259         }
260     } while(myToken);
261     // end of processing
262     ASSERT(curToken + 1 == totTokens, "Didn't receive enough tokens");
263 }
264 
265 // -- test of producer/consumer with atomic buffer cnt and semaphore
266 // nTokens are total number of tokens through the pipe
267 // pWait is the wait time for the producer
268 // cWait is the wait time for the consumer
testProducerConsumer(unsigned totTokens,unsigned nTokens,unsigned pWait,unsigned cWait)269 void testProducerConsumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait) {
270     semaphore pSem;
271     semaphore cSem;
272     tbb::atomic<unsigned> pTokens;
273     tbb::atomic<unsigned> cTokens;
274     cTokens = 0;
275     unsigned cBuffer[MAX_TOKENS];
276     FilterBase* myFilters[2];  // one producer, one consumer
277     REMARK("Testing producer/consumer with %lu total tokens, %lu tokens at a time, producer wait(%lu), consumer wait (%lu)\n", totTokens, nTokens, pWait, cWait);
278     ASSERT(nTokens <= MAX_TOKENS, "Not enough slots for tokens");
279     myFilters[0] = new FilterBase(imaProducer, totTokens, pTokens, cTokens, pWait, cSem, pSem, (unsigned *)NULL, &(cBuffer[0]));
280     myFilters[1] = new FilterBase(imaConsumer, totTokens, cTokens, pTokens, cWait, pSem, cSem, cBuffer, (unsigned *)NULL);
281     pTokens = nTokens;
282     ProduceConsumeBody myBody(myFilters);
283     sBarrier.initialize(2);
284     NativeParallelFor(2, myBody);
285     delete myFilters[0];
286     delete myFilters[1];
287 }
288 
TestMain()289 int TestMain() {
290     REMARK("Started\n");
291 #if _WIN32||_WIN64
292     testOSVersion();
293 #endif
294     if(MaxThread > 0) {
295         testBinarySemaphore( MaxThread );
296         for(int semSize = 1; semSize <= MaxThread; ++semSize) {
297             for(int exThreads = 0; exThreads <= MaxThread - semSize; ++exThreads) {
298                 testSemaphore( semSize, exThreads );
299             }
300         }
301     }
302     // Test producer/consumer with varying execution times and buffer sizes
303     // ( total tokens, tokens in buffer, sleep for producer, sleep for consumer )
304     testProducerConsumer( 10, 2, 5, 5 );
305     testProducerConsumer( 10, 2, 20, 5 );
306     testProducerConsumer( 10, 2, 5, 20 );
307     testProducerConsumer( 10, 1, 5, 5 );
308     testProducerConsumer( 20, 10, 5, 20 );
309     testProducerConsumer( 64, 32, 1, 20 );
310     return Harness::Done;
311 }
312