1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef THREADINGUTIL_H_
20 #define THREADINGUTIL_H_
21 
22 #include <vector>
23 
24 #ifdef THREADED
25 #include "pthread.h"
26 #endif
27 
28 // *****************************************************************************
29 // Threading primitives
30 
31 // atomic post-increment; returns the previous value of the operand
32 int32_t atomic_post_incr(volatile int32_t* operand, int32_t incr);
33 // atomic fetch&store; returns the previous value of the operand
34 int32_t atomic_fetch_store(volatile int32_t *operand, int32_t value);
35 
36 // a partial implementation of an atomic integer type
37 class AtomicInt{
38 public:
v_(init)39     explicit AtomicInt(int32_t init=0):v_(init){}
AtomicInt(const AtomicInt & other)40     AtomicInt(const AtomicInt& other):v_(other){}
41     // assigment
42     AtomicInt& operator=(const AtomicInt& lhs){
43         atomic_fetch_store(&v_,lhs);
44         return *this;
45     }
46     AtomicInt& operator=(int32_t i){
47         atomic_fetch_store(&v_,i);
48         return *this;
49     }
50     // pre-increment
51     AtomicInt& operator++() {
52         atomic_post_incr(&v_,1);
53         return *this;
54     }
55     // pre-decrement
56     AtomicInt& operator--() {
57         atomic_post_incr(&v_,-1);
58         return *this;
59     }
60     // post-increment
61     AtomicInt operator++(int){
62         return AtomicInt(atomic_post_incr(&v_,1));
63     }
64     // post-decrement
65     AtomicInt operator--(int){
66         return AtomicInt(atomic_post_incr(&v_,-1));
67     }
68 
69     operator int() const{
70         return atomic_post_incr(&v_,0);
71     }
get()72     int get() const{
73         return atomic_post_incr(&v_,0);
74     }
75 private:
76     mutable int32_t v_;
77 };
78 
79 #ifdef THREADED
80 // ****************************************************************************
81 #define VALIDATE_JOBS(jm) jm.validateJobs(__FILE__,__LINE__)
82 #define VALIDATE_JOB(j) j.validate(__FILE__,__LINE__)
83 
84 class Mutex{
85 public:
86     Mutex();
87     ~Mutex();
88     void acquire();
89     void release();
90 private:
91     Mutex(const Mutex&);
92     Mutex& operator=(const Mutex&);
93     struct Impl;
94     Impl* impl_;
95 };
96 
97 class MTLock{
98 public:
MTLock(Mutex & m)99     MTLock(Mutex& m):m_(m){m.acquire();}
~MTLock()100     ~MTLock(){m_.release();}
101     Mutex& m_;
102 };
103 
104 #define synchronized(m) MTLock __lock(m)
105 
106 // ****************************************************************************
107 class Latch {
108 public:
~Latch()109     virtual ~Latch() {}
110     virtual void await() const =0;
111     virtual void signalAndWait() =0;
112     virtual void signal() =0;
113 };
114 
115 class CountDownLatch: public Latch {
116 public:
CountDownLatch(int count)117     CountDownLatch(int count):count_(count) {
118         pthread_cond_init(&cond_,0);
119         pthread_mutex_init(&mut_,0);
120     }
~CountDownLatch()121     virtual ~CountDownLatch() {
122         pthread_mutex_lock(&mut_);
123         if(count_!=0) {
124             count_=0;
125             pthread_cond_broadcast(&cond_);
126         }
127         pthread_mutex_unlock(&mut_);
128 
129         pthread_cond_destroy(&cond_);
130         pthread_mutex_destroy(&mut_);
131     }
132 
await()133     virtual void await() const {
134         pthread_mutex_lock(&mut_);
135         awaitImpl();
136         pthread_mutex_unlock(&mut_);
137     }
signalAndWait()138     virtual void signalAndWait() {
139         pthread_mutex_lock(&mut_);
140         signalImpl();
141         awaitImpl();
142         pthread_mutex_unlock(&mut_);
143     }
signal()144     virtual void signal() {
145         pthread_mutex_lock(&mut_);
146         signalImpl();
147         pthread_mutex_unlock(&mut_);
148     }
149 private:
awaitImpl()150     void awaitImpl() const{
151         while(count_!=0)
152         pthread_cond_wait(&cond_,&mut_);
153     }
signalImpl()154     void signalImpl() {
155         if(count_>0) {
156             count_--;
157             pthread_cond_broadcast(&cond_);
158         }
159     }
160     int count_;
161     mutable pthread_mutex_t mut_;
162     mutable pthread_cond_t cond_;
163 };
164 
165 class TestJob {
166 public:
167     typedef long JobId;
TestJob()168     TestJob():hasRun_(false),startLatch_(0),endLatch_(0) {}
~TestJob()169     virtual ~TestJob() {
170         join();
171     }
172     virtual TestJob* clone() const =0;
173 
174     virtual void run() =0;
175     virtual void validate(const char* file, int line) const =0;
176 
177     virtual void start(Latch* startLatch=0,Latch* endLatch=0) {
178         startLatch_=startLatch;endLatch_=endLatch;
179         hasRun_=true;
180         pthread_create(&thread_, 0, thread, this);
181     }
getJobId()182     virtual JobId getJobId() const {
183         return (JobId)thread_;
184     }
join()185     virtual void join() {
186         if(!hasRun_)
187         return;
188         if(!pthread_equal(thread_,pthread_self()))
189         pthread_join(thread_,0);
190         else
191         pthread_detach(thread_);
192     }
193 private:
awaitStart()194     void awaitStart() {
195         if(startLatch_==0) return;
196         startLatch_->signalAndWait();
197     }
signalFinished()198     void signalFinished() {
199         if(endLatch_==0) return;
200         endLatch_->signal();
201     }
thread(void * p)202     static void* thread(void* p) {
203         TestJob* j=(TestJob*)p;
204         j->awaitStart(); // wait for the start command
205         j->run();
206         j->signalFinished();
207         return 0;
208     }
209     bool hasRun_;
210     Latch* startLatch_;
211     Latch* endLatch_;
212     pthread_t thread_;
213 };
214 
215 class TestJobManager {
216     typedef std::vector<TestJob*> JobList;
217 public:
218     TestJobManager(const TestJob& tj,int threadCount=1):
startLatch_(threadCount)219         startLatch_(threadCount),endLatch_(threadCount)
220     {
221         for(int i=0;i<threadCount;++i)
222             jobs_.push_back(tj.clone());
223     }
~TestJobManager()224     virtual ~TestJobManager(){
225         for(unsigned  i=0;i<jobs_.size();++i)
226             delete jobs_[i];
227     }
228 
startAllJobs()229     virtual void startAllJobs() {
230         for(unsigned i=0;i<jobs_.size();++i)
231             jobs_[i]->start(&startLatch_,&endLatch_);
232     }
startJobsImmediately()233     virtual void startJobsImmediately() {
234         for(unsigned i=0;i<jobs_.size();++i)
235             jobs_[i]->start(0,&endLatch_);
236     }
wait()237     virtual void wait() const {
238         endLatch_.await();
239     }
validateJobs(const char * file,int line)240     virtual void validateJobs(const char* file, int line) const{
241         for(unsigned i=0;i<jobs_.size();++i)
242             jobs_[i]->validate(file,line);
243     }
244 private:
245     JobList jobs_;
246     CountDownLatch startLatch_;
247     CountDownLatch endLatch_;
248 };
249 
250 #else // THREADED
251 // single THREADED
252 class Mutex{
253 public:
acquire()254     void acquire(){}
release()255     void release(){}
256 };
257 #define synchronized(m)
258 
259 #endif // THREADED
260 
261 #endif /*THREADINGUTIL_H_*/
262