1 /* 2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file 3 4 This file is part of libzmq, the ZeroMQ core engine in C++. 5 6 libzmq is free software; you can redistribute it and/or modify it under 7 the terms of the GNU Lesser General Public License (LGPL) as published 8 by the Free Software Foundation; either version 3 of the License, or 9 (at your option) any later version. 10 11 As a special exception, the Contributors give you permission to link 12 this library with independent modules to produce an executable, 13 regardless of the license terms of these independent modules, and to 14 copy and distribute the resulting executable under terms of your choice, 15 provided that you also meet, for each linked independent module, the 16 terms and conditions of the license of that module. An independent 17 module is a module which is not derived from or based on this library. 18 If you modify this library, you must extend this exception to your 19 version of the library. 20 21 libzmq is distributed in the hope that it will be useful, but WITHOUT 22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public 24 License for more details. 25 26 You should have received a copy of the GNU Lesser General Public License 27 along with this program. If not, see <http://www.gnu.org/licenses/>. 28 */ 29 30 #ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__ 31 #define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__ 32 33 #include "err.hpp" 34 #include "mutex.hpp" 35 36 // Condition variable class encapsulates OS mutex in a platform-independent way. 37 38 #if defined(ZMQ_USE_CV_IMPL_NONE) 39 40 namespace zmq 41 { 42 class condition_variable_t 43 { 44 public: condition_variable_t()45 inline condition_variable_t () { zmq_assert (false); } 46 wait(mutex_t * mutex_,int timeout_)47 inline int wait (mutex_t *mutex_, int timeout_) 48 { 49 zmq_assert (false); 50 return -1; 51 } 52 broadcast()53 inline void broadcast () { zmq_assert (false); } 54 55 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) 56 }; 57 } 58 59 #elif defined(ZMQ_USE_CV_IMPL_WIN32API) 60 61 #include "windows.hpp" 62 63 namespace zmq 64 { 65 class condition_variable_t 66 { 67 public: condition_variable_t()68 inline condition_variable_t () { InitializeConditionVariable (&_cv); } 69 wait(mutex_t * mutex_,int timeout_)70 inline int wait (mutex_t *mutex_, int timeout_) 71 { 72 int rc = SleepConditionVariableCS (&_cv, mutex_->get_cs (), timeout_); 73 74 if (rc != 0) 75 return 0; 76 77 rc = GetLastError (); 78 79 if (rc != ERROR_TIMEOUT) 80 win_assert (rc); 81 82 errno = EAGAIN; 83 return -1; 84 } 85 broadcast()86 inline void broadcast () { WakeAllConditionVariable (&_cv); } 87 88 private: 89 CONDITION_VARIABLE _cv; 90 91 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) 92 }; 93 } 94 95 #elif defined(ZMQ_USE_CV_IMPL_STL11) 96 97 #include <condition_variable> 98 99 namespace zmq 100 { 101 class condition_variable_t 102 { 103 public: 104 condition_variable_t () ZMQ_DEFAULT; 105 wait(mutex_t * mutex_,int timeout_)106 int wait (mutex_t *mutex_, int timeout_) 107 { 108 // this assumes that the mutex mutex_ has been locked by the caller 109 int res = 0; 110 if (timeout_ == -1) { 111 _cv.wait ( 112 *mutex_); // unlock mtx and wait cv.notify_all(), lock mtx after cv.notify_all() 113 } else if (_cv.wait_for (*mutex_, std::chrono::milliseconds (timeout_)) 114 == std::cv_status::timeout) { 115 // time expired 116 errno = EAGAIN; 117 res = -1; 118 } 119 return res; 120 } 121 broadcast()122 void broadcast () 123 { 124 // this assumes that the mutex associated with _cv has been locked by the caller 125 _cv.notify_all (); 126 } 127 128 private: 129 std::condition_variable_any _cv; 130 131 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) 132 }; 133 } 134 135 #elif defined(ZMQ_USE_CV_IMPL_VXWORKS) 136 137 #include <sysLib.h> 138 139 namespace zmq 140 { 141 class condition_variable_t 142 { 143 public: 144 inline condition_variable_t () ZMQ_DEFAULT; 145 ~condition_variable_t()146 inline ~condition_variable_t () 147 { 148 scoped_lock_t l (_listenersMutex); 149 for (size_t i = 0; i < _listeners.size (); i++) { 150 semDelete (_listeners[i]); 151 } 152 } 153 wait(mutex_t * mutex_,int timeout_)154 inline int wait (mutex_t *mutex_, int timeout_) 155 { 156 //Atomically releases lock, blocks the current executing thread, 157 //and adds it to the list of threads waiting on *this. The thread 158 //will be unblocked when broadcast() is executed. 159 //It may also be unblocked spuriously. When unblocked, regardless 160 //of the reason, lock is reacquired and wait exits. 161 162 SEM_ID sem = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY); 163 { 164 scoped_lock_t l (_listenersMutex); 165 _listeners.push_back (sem); 166 } 167 mutex_->unlock (); 168 169 int rc; 170 if (timeout_ < 0) 171 rc = semTake (sem, WAIT_FOREVER); 172 else { 173 int ticksPerSec = sysClkRateGet (); 174 int timeoutTicks = (timeout_ * ticksPerSec) / 1000 + 1; 175 rc = semTake (sem, timeoutTicks); 176 } 177 178 { 179 scoped_lock_t l (_listenersMutex); 180 // remove sem from listeners 181 for (size_t i = 0; i < _listeners.size (); i++) { 182 if (_listeners[i] == sem) { 183 _listeners.erase (_listeners.begin () + i); 184 break; 185 } 186 } 187 semDelete (sem); 188 } 189 mutex_->lock (); 190 191 if (rc == 0) 192 return 0; 193 194 if (rc == S_objLib_OBJ_TIMEOUT) { 195 errno = EAGAIN; 196 return -1; 197 } 198 199 return -1; 200 } 201 broadcast()202 inline void broadcast () 203 { 204 scoped_lock_t l (_listenersMutex); 205 for (size_t i = 0; i < _listeners.size (); i++) { 206 semGive (_listeners[i]); 207 } 208 } 209 210 private: 211 mutex_t _listenersMutex; 212 std::vector<SEM_ID> _listeners; 213 214 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) 215 }; 216 } 217 218 #elif defined(ZMQ_USE_CV_IMPL_PTHREADS) 219 220 #include <pthread.h> 221 222 #if defined(__ANDROID_API__) && __ANDROID_API__ < 21 223 #define ANDROID_LEGACY 224 extern "C" int pthread_cond_timedwait_monotonic_np (pthread_cond_t *, 225 pthread_mutex_t *, 226 const struct timespec *); 227 #endif 228 229 namespace zmq 230 { 231 class condition_variable_t 232 { 233 public: condition_variable_t()234 inline condition_variable_t () 235 { 236 pthread_condattr_t attr; 237 pthread_condattr_init (&attr); 238 #if !defined(ZMQ_HAVE_OSX) && !defined(ANDROID_LEGACY) 239 pthread_condattr_setclock (&attr, CLOCK_MONOTONIC); 240 #endif 241 int rc = pthread_cond_init (&_cond, &attr); 242 posix_assert (rc); 243 } 244 ~condition_variable_t()245 inline ~condition_variable_t () 246 { 247 int rc = pthread_cond_destroy (&_cond); 248 posix_assert (rc); 249 } 250 wait(mutex_t * mutex_,int timeout_)251 inline int wait (mutex_t *mutex_, int timeout_) 252 { 253 int rc; 254 255 if (timeout_ != -1) { 256 struct timespec timeout; 257 258 #ifdef ZMQ_HAVE_OSX 259 timeout.tv_sec = 0; 260 timeout.tv_nsec = 0; 261 #else 262 rc = clock_gettime (CLOCK_MONOTONIC, &timeout); 263 posix_assert (rc); 264 #endif 265 266 timeout.tv_sec += timeout_ / 1000; 267 timeout.tv_nsec += (timeout_ % 1000) * 1000000; 268 269 if (timeout.tv_nsec >= 1000000000) { 270 timeout.tv_sec++; 271 timeout.tv_nsec -= 1000000000; 272 } 273 #ifdef ZMQ_HAVE_OSX 274 rc = pthread_cond_timedwait_relative_np ( 275 &_cond, mutex_->get_mutex (), &timeout); 276 #elif defined(ANDROID_LEGACY) 277 rc = pthread_cond_timedwait_monotonic_np ( 278 &_cond, mutex_->get_mutex (), &timeout); 279 #else 280 rc = 281 pthread_cond_timedwait (&_cond, mutex_->get_mutex (), &timeout); 282 #endif 283 } else 284 rc = pthread_cond_wait (&_cond, mutex_->get_mutex ()); 285 286 if (rc == 0) 287 return 0; 288 289 if (rc == ETIMEDOUT) { 290 errno = EAGAIN; 291 return -1; 292 } 293 294 posix_assert (rc); 295 return -1; 296 } 297 broadcast()298 inline void broadcast () 299 { 300 int rc = pthread_cond_broadcast (&_cond); 301 posix_assert (rc); 302 } 303 304 private: 305 pthread_cond_t _cond; 306 307 ZMQ_NON_COPYABLE_NOR_MOVABLE (condition_variable_t) 308 }; 309 } 310 311 #endif 312 313 #endif 314