1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 /** @file AmThread.h */
28 #ifndef _AmThread_h_
29 #define _AmThread_h_
30 
31 #include <pthread.h>
32 #include <sys/time.h>
33 #include <time.h>
34 #include <errno.h>
35 
36 #include <queue>
37 
38 /**
39  * \brief C++ Wrapper class for pthread mutex
40  */
41 class AmMutex
42 {
43   pthread_mutex_t m;
44 
45 public:
46   AmMutex(bool recursive = false);
47   ~AmMutex();
48   void lock();
49   void unlock();
50 };
51 
52 /**
53  * \brief  Simple lock class
54  */
55 class AmLock
56 {
57   AmMutex& m;
58 public:
AmLock(AmMutex & _m)59   AmLock(AmMutex& _m) : m(_m) {
60     m.lock();
61   }
~AmLock()62   ~AmLock(){
63     m.unlock();
64   }
65 };
66 
67 /**
68  * \brief Shared variable.
69  *
70  * Include a variable and its mutex.
71  * @warning Don't use safe functions (set,get)
72  * within a {lock(); ... unlock();} block. Use
73  * unsafe function instead.
74  */
75 template<class T>
76 class AmSharedVar
77 {
78   T       t;
79   AmMutex m;
80 
81 public:
AmSharedVar(const T & _t)82   AmSharedVar(const T& _t) : t(_t) {}
AmSharedVar()83   AmSharedVar() {}
84 
get()85   T get() {
86     lock();
87     T res = unsafe_get();
88     unlock();
89     return res;
90   }
91 
set(const T & new_val)92   void set(const T& new_val) {
93     lock();
94     unsafe_set(new_val);
95     unlock();
96   }
97 
98   const AmSharedVar<T>& operator =(const T& new_val) {
99    set(new_val);
100    return *this;
101   }
102 
lock()103   void lock() { m.lock(); }
unlock()104   void unlock() { m.unlock(); }
105 
unsafe_get()106   const T& unsafe_get() { return t; }
unsafe_set(const T & new_val)107   void unsafe_set(const T& new_val) { t = new_val; }
108 };
109 
110 /**
111  * \brief C++ Wrapper class for pthread condition
112  */
113 template<class T>
114 class AmCondition
115 {
116   T               t;
117   pthread_mutex_t m;
118   pthread_cond_t  cond;
119 
init_cond()120   void init_cond() {
121     pthread_mutex_init(&m,NULL);
122     pthread_cond_init(&cond,NULL);
123   }
124 
125 public:
AmCondition()126   AmCondition() : t() { init_cond(); }
AmCondition(const T & _t)127   AmCondition(const T& _t) : t(_t) { init_cond(); }
128 
~AmCondition()129   ~AmCondition()
130   {
131     pthread_cond_destroy(&cond);
132     pthread_mutex_destroy(&m);
133   }
134 
135   /** Change the condition's value. */
set(const T & newval)136   void set(const T& newval)
137   {
138     pthread_mutex_lock(&m);
139     t = newval;
140     if(t)
141       pthread_cond_broadcast(&cond);
142     pthread_mutex_unlock(&m);
143   }
144 
get()145   T get()
146   {
147     T val;
148     pthread_mutex_lock(&m);
149     val = t;
150     pthread_mutex_unlock(&m);
151     return val;
152   }
153 
154   /** Waits for the condition to be true. */
wait_for()155   void wait_for()
156   {
157     pthread_mutex_lock(&m);
158     while(!t){
159       pthread_cond_wait(&cond,&m);
160     }
161     pthread_mutex_unlock(&m);
162   }
163 
164   /** Waits for the condition to be true or a timeout. */
wait_for_to(unsigned long msec)165   bool wait_for_to(unsigned long msec)
166   {
167     struct timeval now;
168     struct timespec timeout;
169     int retcode = 0;
170     bool ret = false;
171 
172     gettimeofday(&now, NULL);
173     timeout.tv_sec = now.tv_sec + (msec / 1000);
174     timeout.tv_nsec = (now.tv_usec + (msec % 1000)*1000)*1000;
175     if(timeout.tv_nsec >= 1000000000){
176       timeout.tv_sec++;
177       timeout.tv_nsec -= 1000000000;
178     }
179 
180     pthread_mutex_lock(&m);
181     while(!t && !retcode){
182       retcode = pthread_cond_timedwait(&cond,&m, &timeout);
183     }
184 
185     if(t) ret = true;
186     pthread_mutex_unlock(&m);
187 
188     return ret;
189   }
190 };
191 
192 /**
193  * \brief C++ Wrapper class for pthread
194  */
195 class AmThread
196 {
197   pthread_t _td;
198   AmMutex   _m_td;
199 
200   AmSharedVar<bool> _stopped;
201 
202   static void* _start(void*);
203 
204 protected:
205   virtual void run()=0;
206   virtual void on_stop()=0;
207 
208 public:
209   unsigned long _pid;
210   AmThread();
~AmThread()211   virtual ~AmThread() {}
212 
onIdle()213   virtual void onIdle() {}
214 
215   /** Start it ! */
216   void start();
217   /** Stop it ! */
218   void stop();
219   /** @return true if this thread doesn't run. */
is_stopped()220   bool is_stopped() { return _stopped.get(); }
221   /** Wait for this thread to finish */
222   void join();
223   /** kill the thread (if pthread_setcancelstate(PTHREAD_CANCEL_ENABLED) has been set) **/
224   void cancel();
225 
226   int setRealtime();
227 };
228 
229 /**
230  * \brief Container/garbage collector for threads.
231  *
232  * AmThreadWatcher waits for threads to stop
233  * and delete them.
234  * It gets started automatically when needed.
235  * Once you added a thread to the container,
236  * there is no mean to get it out.
237  */
238 class AmThreadWatcher: public AmThread
239 {
240   static AmThreadWatcher* _instance;
241   static AmMutex          _inst_mut;
242 
243   std::queue<AmThread*> thread_queue;
244   AmMutex          q_mut;
245 
246   /** the daemon only runs if this is true */
247   AmCondition<bool> _run_cond;
248 
249   AmThreadWatcher();
250   void run();
251   void on_stop();
252 
253 public:
254   static AmThreadWatcher* instance();
255   void add(AmThread*);
256 };
257 
258 template<class T>
259 class AmThreadLocalStorage
260 {
261   pthread_key_t key;
262 
__del_tls_obj(void * obj)263   static void __del_tls_obj(void* obj) {
264     delete static_cast<T*>(obj);
265   }
266 
267 public:
AmThreadLocalStorage()268   AmThreadLocalStorage() {
269     pthread_key_create(&key,__del_tls_obj);
270   }
271 
~AmThreadLocalStorage()272   ~AmThreadLocalStorage() {
273     pthread_key_delete(key);
274   }
275 
get()276   T* get() {
277     return static_cast<T*>(pthread_getspecific(key));
278   }
279 
set(T * p)280   void set(T* p) {
281     pthread_setspecific(key,(void*)p);
282   }
283 };
284 
285 #endif
286 
287 // Local Variables:
288 // mode:C++
289 // End:
290 
291