1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 #ifndef THREADINGUTIL_H_ 20 #define THREADINGUTIL_H_ 21 22 #include <vector> 23 24 #ifdef THREADED 25 #include "pthread.h" 26 #endif 27 28 // ***************************************************************************** 29 // Threading primitives 30 31 // atomic post-increment; returns the previous value of the operand 32 int32_t atomic_post_incr(volatile int32_t* operand, int32_t incr); 33 // atomic fetch&store; returns the previous value of the operand 34 int32_t atomic_fetch_store(volatile int32_t *operand, int32_t value); 35 36 // a partial implementation of an atomic integer type 37 class AtomicInt{ 38 public: v_(init)39 explicit AtomicInt(int32_t init=0):v_(init){} AtomicInt(const AtomicInt & other)40 AtomicInt(const AtomicInt& other):v_(other){} 41 // assigment 42 AtomicInt& operator=(const AtomicInt& lhs){ 43 atomic_fetch_store(&v_,lhs); 44 return *this; 45 } 46 AtomicInt& operator=(int32_t i){ 47 atomic_fetch_store(&v_,i); 48 return *this; 49 } 50 // pre-increment 51 AtomicInt& operator++() { 52 atomic_post_incr(&v_,1); 53 return *this; 54 } 55 // pre-decrement 56 AtomicInt& operator--() { 57 atomic_post_incr(&v_,-1); 58 return *this; 59 } 60 // post-increment 61 AtomicInt operator++(int){ 62 return AtomicInt(atomic_post_incr(&v_,1)); 63 } 64 // post-decrement 65 AtomicInt operator--(int){ 66 return AtomicInt(atomic_post_incr(&v_,-1)); 67 } 68 69 operator int() const{ 70 return atomic_post_incr(&v_,0); 71 } get()72 int get() const{ 73 return atomic_post_incr(&v_,0); 74 } 75 private: 76 mutable int32_t v_; 77 }; 78 79 #ifdef THREADED 80 // **************************************************************************** 81 #define VALIDATE_JOBS(jm) jm.validateJobs(__FILE__,__LINE__) 82 #define VALIDATE_JOB(j) j.validate(__FILE__,__LINE__) 83 84 class Mutex{ 85 public: 86 Mutex(); 87 ~Mutex(); 88 void acquire(); 89 void release(); 90 private: 91 Mutex(const Mutex&); 92 Mutex& operator=(const Mutex&); 93 struct Impl; 94 Impl* impl_; 95 }; 96 97 class MTLock{ 98 public: MTLock(Mutex & m)99 MTLock(Mutex& m):m_(m){m.acquire();} ~MTLock()100 ~MTLock(){m_.release();} 101 Mutex& m_; 102 }; 103 104 #define synchronized(m) MTLock __lock(m) 105 106 // **************************************************************************** 107 class Latch { 108 public: ~Latch()109 virtual ~Latch() {} 110 virtual void await() const =0; 111 virtual void signalAndWait() =0; 112 virtual void signal() =0; 113 }; 114 115 class CountDownLatch: public Latch { 116 public: CountDownLatch(int count)117 CountDownLatch(int count):count_(count) { 118 pthread_cond_init(&cond_,0); 119 pthread_mutex_init(&mut_,0); 120 } ~CountDownLatch()121 virtual ~CountDownLatch() { 122 pthread_mutex_lock(&mut_); 123 if(count_!=0) { 124 count_=0; 125 pthread_cond_broadcast(&cond_); 126 } 127 pthread_mutex_unlock(&mut_); 128 129 pthread_cond_destroy(&cond_); 130 pthread_mutex_destroy(&mut_); 131 } 132 await()133 virtual void await() const { 134 pthread_mutex_lock(&mut_); 135 awaitImpl(); 136 pthread_mutex_unlock(&mut_); 137 } signalAndWait()138 virtual void signalAndWait() { 139 pthread_mutex_lock(&mut_); 140 signalImpl(); 141 awaitImpl(); 142 pthread_mutex_unlock(&mut_); 143 } signal()144 virtual void signal() { 145 pthread_mutex_lock(&mut_); 146 signalImpl(); 147 pthread_mutex_unlock(&mut_); 148 } 149 private: awaitImpl()150 void awaitImpl() const{ 151 while(count_!=0) 152 pthread_cond_wait(&cond_,&mut_); 153 } signalImpl()154 void signalImpl() { 155 if(count_>0) { 156 count_--; 157 pthread_cond_broadcast(&cond_); 158 } 159 } 160 int count_; 161 mutable pthread_mutex_t mut_; 162 mutable pthread_cond_t cond_; 163 }; 164 165 class TestJob { 166 public: 167 typedef long JobId; TestJob()168 TestJob():hasRun_(false),startLatch_(0),endLatch_(0) {} ~TestJob()169 virtual ~TestJob() { 170 join(); 171 } 172 virtual TestJob* clone() const =0; 173 174 virtual void run() =0; 175 virtual void validate(const char* file, int line) const =0; 176 177 virtual void start(Latch* startLatch=0,Latch* endLatch=0) { 178 startLatch_=startLatch;endLatch_=endLatch; 179 hasRun_=true; 180 pthread_create(&thread_, 0, thread, this); 181 } getJobId()182 virtual JobId getJobId() const { 183 return (JobId)thread_; 184 } join()185 virtual void join() { 186 if(!hasRun_) 187 return; 188 if(!pthread_equal(thread_,pthread_self())) 189 pthread_join(thread_,0); 190 else 191 pthread_detach(thread_); 192 } 193 private: awaitStart()194 void awaitStart() { 195 if(startLatch_==0) return; 196 startLatch_->signalAndWait(); 197 } signalFinished()198 void signalFinished() { 199 if(endLatch_==0) return; 200 endLatch_->signal(); 201 } thread(void * p)202 static void* thread(void* p) { 203 TestJob* j=(TestJob*)p; 204 j->awaitStart(); // wait for the start command 205 j->run(); 206 j->signalFinished(); 207 return 0; 208 } 209 bool hasRun_; 210 Latch* startLatch_; 211 Latch* endLatch_; 212 pthread_t thread_; 213 }; 214 215 class TestJobManager { 216 typedef std::vector<TestJob*> JobList; 217 public: 218 TestJobManager(const TestJob& tj,int threadCount=1): startLatch_(threadCount)219 startLatch_(threadCount),endLatch_(threadCount) 220 { 221 for(int i=0;i<threadCount;++i) 222 jobs_.push_back(tj.clone()); 223 } ~TestJobManager()224 virtual ~TestJobManager(){ 225 for(unsigned i=0;i<jobs_.size();++i) 226 delete jobs_[i]; 227 } 228 startAllJobs()229 virtual void startAllJobs() { 230 for(unsigned i=0;i<jobs_.size();++i) 231 jobs_[i]->start(&startLatch_,&endLatch_); 232 } startJobsImmediately()233 virtual void startJobsImmediately() { 234 for(unsigned i=0;i<jobs_.size();++i) 235 jobs_[i]->start(0,&endLatch_); 236 } wait()237 virtual void wait() const { 238 endLatch_.await(); 239 } validateJobs(const char * file,int line)240 virtual void validateJobs(const char* file, int line) const{ 241 for(unsigned i=0;i<jobs_.size();++i) 242 jobs_[i]->validate(file,line); 243 } 244 private: 245 JobList jobs_; 246 CountDownLatch startLatch_; 247 CountDownLatch endLatch_; 248 }; 249 250 #else // THREADED 251 // single THREADED 252 class Mutex{ 253 public: acquire()254 void acquire(){} release()255 void release(){} 256 }; 257 #define synchronized(m) 258 259 #endif // THREADED 260 261 #endif /*THREADINGUTIL_H_*/ 262