1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
31 #define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
32 
33 #include "err.hpp"
34 #include "mutex.hpp"
35 
36 //  Condition variable class encapsulates OS mutex in a platform-independent way.
37 
38 #if defined(ZMQ_USE_CV_IMPL_NONE)
39 
40 namespace zmq
41 {
42 class condition_variable_t
43 {
44   public:
condition_variable_t()45     inline condition_variable_t () { zmq_assert (false); }
46 
wait(mutex_t * mutex_,int timeout_)47     inline int wait (mutex_t *mutex_, int timeout_)
48     {
49         zmq_assert (false);
50         return -1;
51     }
52 
broadcast()53     inline void broadcast () { zmq_assert (false); }
54 
55     ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
56 };
57 }
58 
59 #elif defined(ZMQ_USE_CV_IMPL_WIN32API)
60 
61 #include "windows.hpp"
62 
63 namespace zmq
64 {
65 class condition_variable_t
66 {
67   public:
condition_variable_t()68     inline condition_variable_t () { InitializeConditionVariable (&_cv); }
69 
wait(mutex_t * mutex_,int timeout_)70     inline int wait (mutex_t *mutex_, int timeout_)
71     {
72         int rc = SleepConditionVariableCS (&_cv, mutex_->get_cs (), timeout_);
73 
74         if (rc != 0)
75             return 0;
76 
77         rc = GetLastError ();
78 
79         if (rc != ERROR_TIMEOUT)
80             win_assert (rc);
81 
82         errno = EAGAIN;
83         return -1;
84     }
85 
broadcast()86     inline void broadcast () { WakeAllConditionVariable (&_cv); }
87 
88   private:
89     CONDITION_VARIABLE _cv;
90 
91     ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
92 };
93 }
94 
95 #elif defined(ZMQ_USE_CV_IMPL_STL11)
96 
97 #include <condition_variable>
98 
99 namespace zmq
100 {
101 class condition_variable_t
102 {
103   public:
104     condition_variable_t () ZMQ_DEFAULT;
105 
wait(mutex_t * mutex_,int timeout_)106     int wait (mutex_t *mutex_, int timeout_)
107     {
108         // this assumes that the mutex mutex_ has been locked by the caller
109         int res = 0;
110         if (timeout_ == -1) {
111             _cv.wait (
112               *mutex_); // unlock mtx and wait cv.notify_all(), lock mtx after cv.notify_all()
113         } else if (_cv.wait_for (*mutex_, std::chrono::milliseconds (timeout_))
114                    == std::cv_status::timeout) {
115             // time expired
116             errno = EAGAIN;
117             res = -1;
118         }
119         return res;
120     }
121 
broadcast()122     void broadcast ()
123     {
124         // this assumes that the mutex associated with _cv has been locked by the caller
125         _cv.notify_all ();
126     }
127 
128   private:
129     std::condition_variable_any _cv;
130 
131     ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
132 };
133 }
134 
135 #elif defined(ZMQ_USE_CV_IMPL_VXWORKS)
136 
137 #include <sysLib.h>
138 
139 namespace zmq
140 {
141 class condition_variable_t
142 {
143   public:
144     inline condition_variable_t () ZMQ_DEFAULT;
145 
~condition_variable_t()146     inline ~condition_variable_t ()
147     {
148         scoped_lock_t l (_listenersMutex);
149         for (size_t i = 0; i < _listeners.size (); i++) {
150             semDelete (_listeners[i]);
151         }
152     }
153 
wait(mutex_t * mutex_,int timeout_)154     inline int wait (mutex_t *mutex_, int timeout_)
155     {
156         //Atomically releases lock, blocks the current executing thread,
157         //and adds it to the list of threads waiting on *this. The thread
158         //will be unblocked when broadcast() is executed.
159         //It may also be unblocked spuriously. When unblocked, regardless
160         //of the reason, lock is reacquired and wait exits.
161 
162         SEM_ID sem = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY);
163         {
164             scoped_lock_t l (_listenersMutex);
165             _listeners.push_back (sem);
166         }
167         mutex_->unlock ();
168 
169         int rc;
170         if (timeout_ < 0)
171             rc = semTake (sem, WAIT_FOREVER);
172         else {
173             int ticksPerSec = sysClkRateGet ();
174             int timeoutTicks = (timeout_ * ticksPerSec) / 1000 + 1;
175             rc = semTake (sem, timeoutTicks);
176         }
177 
178         {
179             scoped_lock_t l (_listenersMutex);
180             // remove sem from listeners
181             for (size_t i = 0; i < _listeners.size (); i++) {
182                 if (_listeners[i] == sem) {
183                     _listeners.erase (_listeners.begin () + i);
184                     break;
185                 }
186             }
187             semDelete (sem);
188         }
189         mutex_->lock ();
190 
191         if (rc == 0)
192             return 0;
193 
194         if (rc == S_objLib_OBJ_TIMEOUT) {
195             errno = EAGAIN;
196             return -1;
197         }
198 
199         return -1;
200     }
201 
broadcast()202     inline void broadcast ()
203     {
204         scoped_lock_t l (_listenersMutex);
205         for (size_t i = 0; i < _listeners.size (); i++) {
206             semGive (_listeners[i]);
207         }
208     }
209 
210   private:
211     mutex_t _listenersMutex;
212     std::vector<SEM_ID> _listeners;
213 
214     ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
215 };
216 }
217 
218 #elif defined(ZMQ_USE_CV_IMPL_PTHREADS)
219 
220 #include <pthread.h>
221 
222 #if defined(__ANDROID_API__) && __ANDROID_API__ < 21
223 #define ANDROID_LEGACY
224 extern "C" int pthread_cond_timedwait_monotonic_np (pthread_cond_t *,
225                                                     pthread_mutex_t *,
226                                                     const struct timespec *);
227 #endif
228 
229 namespace zmq
230 {
231 class condition_variable_t
232 {
233   public:
condition_variable_t()234     inline condition_variable_t ()
235     {
236         pthread_condattr_t attr;
237         pthread_condattr_init (&attr);
238 #if !defined(ZMQ_HAVE_OSX) && !defined(ANDROID_LEGACY)
239         pthread_condattr_setclock (&attr, CLOCK_MONOTONIC);
240 #endif
241         int rc = pthread_cond_init (&_cond, &attr);
242         posix_assert (rc);
243     }
244 
~condition_variable_t()245     inline ~condition_variable_t ()
246     {
247         int rc = pthread_cond_destroy (&_cond);
248         posix_assert (rc);
249     }
250 
wait(mutex_t * mutex_,int timeout_)251     inline int wait (mutex_t *mutex_, int timeout_)
252     {
253         int rc;
254 
255         if (timeout_ != -1) {
256             struct timespec timeout;
257 
258 #ifdef ZMQ_HAVE_OSX
259             timeout.tv_sec = 0;
260             timeout.tv_nsec = 0;
261 #else
262             rc = clock_gettime (CLOCK_MONOTONIC, &timeout);
263             posix_assert (rc);
264 #endif
265 
266             timeout.tv_sec += timeout_ / 1000;
267             timeout.tv_nsec += (timeout_ % 1000) * 1000000;
268 
269             if (timeout.tv_nsec >= 1000000000) {
270                 timeout.tv_sec++;
271                 timeout.tv_nsec -= 1000000000;
272             }
273 #ifdef ZMQ_HAVE_OSX
274             rc = pthread_cond_timedwait_relative_np (
275               &_cond, mutex_->get_mutex (), &timeout);
276 #elif defined(ANDROID_LEGACY)
277             rc = pthread_cond_timedwait_monotonic_np (
278               &_cond, mutex_->get_mutex (), &timeout);
279 #else
280             rc =
281               pthread_cond_timedwait (&_cond, mutex_->get_mutex (), &timeout);
282 #endif
283         } else
284             rc = pthread_cond_wait (&_cond, mutex_->get_mutex ());
285 
286         if (rc == 0)
287             return 0;
288 
289         if (rc == ETIMEDOUT) {
290             errno = EAGAIN;
291             return -1;
292         }
293 
294         posix_assert (rc);
295         return -1;
296     }
297 
broadcast()298     inline void broadcast ()
299     {
300         int rc = pthread_cond_broadcast (&_cond);
301         posix_assert (rc);
302     }
303 
304   private:
305     pthread_cond_t _cond;
306 
307     ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t)
308 };
309 }
310 
311 #endif
312 
313 #endif
314