1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4
5 #include <IceUtil/Cond.h>
6
7 #ifndef _WIN32
8 # include <sys/time.h>
9 #endif
10
11 #ifdef _WIN32
12
13 # ifdef ICE_HAS_WIN32_CONDVAR
14
Cond()15 IceUtil::Cond::Cond()
16 {
17 InitializeConditionVariable(&_cond);
18 }
19
~Cond()20 IceUtil::Cond::~Cond()
21 {
22 }
23
24 void
signal()25 IceUtil::Cond::signal()
26 {
27 WakeConditionVariable(&_cond);
28 }
29
30 void
broadcast()31 IceUtil::Cond::broadcast()
32 {
33 WakeAllConditionVariable(&_cond);
34 }
35
36 # else
37
Semaphore(long initial)38 IceUtilInternal::Semaphore::Semaphore(long initial)
39 {
40 _sem = CreateSemaphore(0, initial, 0x7fffffff, 0);
41 if(_sem == INVALID_HANDLE_VALUE)
42 {
43 throw IceUtil::ThreadSyscallException(__FILE__, __LINE__, GetLastError());
44 }
45 }
46
~Semaphore()47 IceUtilInternal::Semaphore::~Semaphore()
48 {
49 CloseHandle(_sem);
50 }
51
52 void
wait() const53 IceUtilInternal::Semaphore::wait() const
54 {
55 DWORD rc = WaitForSingleObject(_sem, INFINITE);
56 if(rc != WAIT_OBJECT_0)
57 {
58 throw IceUtil::ThreadSyscallException(__FILE__, __LINE__, GetLastError());
59 }
60 }
61
62 bool
timedWait(const IceUtil::Time & timeout) const63 IceUtilInternal::Semaphore::timedWait(const IceUtil::Time& timeout) const
64 {
65 IceUtil::Int64 msTimeout = timeout.toMilliSeconds();
66 if(msTimeout < 0 || msTimeout > 0x7FFFFFFF)
67 {
68 throw IceUtil::InvalidTimeoutException(__FILE__, __LINE__, timeout);
69 }
70
71 DWORD rc = WaitForSingleObject(_sem, static_cast<DWORD>(msTimeout));
72 if(rc != WAIT_TIMEOUT && rc != WAIT_OBJECT_0)
73 {
74 throw IceUtil::ThreadSyscallException(__FILE__, __LINE__, GetLastError());
75 }
76 return rc != WAIT_TIMEOUT;
77 }
78
79 void
post(int count) const80 IceUtilInternal::Semaphore::post(int count) const
81 {
82 int rc = ReleaseSemaphore(_sem, count, 0);
83 if(rc == 0)
84 {
85 throw IceUtil::ThreadSyscallException(__FILE__, __LINE__, GetLastError());
86 }
87 }
88
89 //
90 // The _queue semaphore is used to wait for the condition variable to
91 // be signaled. When signal is called any further thread signaling or
92 // threads waiting to wait (in preWait) are blocked from proceeding
93 // using an addition _gate semaphore) until the correct number of
94 // threads waiting on the _queue semaphore drain through postWait
95 //
96 // As each thread drains through postWait if there are further threads
97 // to unblock (toUnblock > 0) the _queue is posted again to wake a
98 // further waiting thread, otherwise the _gate is posted which permits
99 // any signaling or preWait threads to continue. Therefore, the _gate
100 // semaphore is used protect further entry into signal or wait until
101 // all signaled threads have woken.
102 //
103 // _blocked is the number of waiting threads. _unblocked is the
104 // number of threads which have unblocked. We use two variables
105 // because _blocked is protected by the _gate, whereas _unblocked is
106 // protected by the _internal mutex. There is an assumption here about
107 // memory visibility since postWait does not itself acquire the _gate
108 // semaphore (note that the _gate must be held if _state != StateIdle).
109 //
110 // Threads timing out present a particular issue because they may have
111 // woken without a corresponding notification and its easy to leave
112 // the _queue in a state where a spurious wakeup will occur --
113 // consider a notify and a timed wake occuring at the same time. In
114 // this case, if we are not careful the _queue will have been posted,
115 // but the waking thread may not consume the semaphore.
116 //
Cond()117 IceUtil::Cond::Cond() :
118 _gate(1),
119 _blocked(0),
120 _unblocked(0),
121 _state(IceUtil::Cond::StateIdle)
122 {
123 }
124
~Cond()125 IceUtil::Cond::~Cond()
126 {
127 }
128
129 void
signal()130 IceUtil::Cond::signal()
131 {
132 wake(false);
133 }
134
135 void
broadcast()136 IceUtil::Cond::broadcast()
137 {
138 wake(true);
139 }
140
141 void
wake(bool broadcast)142 IceUtil::Cond::wake(bool broadcast)
143 {
144 //
145 // Lock gate. The gate will be locked if there are threads waiting
146 // to drain from postWait.
147 //
148 _gate.wait();
149
150 //
151 // Lock the internal mutex.
152 //
153 IceUtil::Mutex::Lock sync(_internal);
154
155 //
156 // Adjust the count of the number of waiting/blocked threads.
157 //
158 if(_unblocked != 0)
159 {
160 _blocked -= _unblocked;
161 _unblocked = 0;
162 }
163
164 //
165 // If there are waiting threads then we enter a signal or
166 // broadcast state.
167 //
168 if(_blocked > 0)
169 {
170 //
171 // Unblock some number of waiters. We use -1 for the signal
172 // case.
173 //
174 assert(_state == StateIdle);
175 _state = (broadcast) ? StateBroadcast : StateSignal;
176 //
177 // Posting the queue wakes a single waiting thread. After this
178 // occurs the waiting thread will wake and then either post on
179 // the _queue to wake the next waiting thread, or post on the
180 // gate to permit more signaling to proceed.
181 //
182 // Release before posting to avoid potential immediate
183 // context switch due to the mutex being locked.
184 //
185 sync.release();
186 _queue.post();
187 }
188 else
189 {
190 //
191 // Otherwise no blocked waiters, release the gate.
192 //
193 // Release before posting to avoid potential immediate
194 // context switch due to the mutex being locked.
195 //
196 sync.release();
197 _gate.post();
198 }
199 }
200
201 void
preWait() const202 IceUtil::Cond::preWait() const
203 {
204 //
205 // _gate is used to protect _blocked. Furthermore, this prevents
206 // further threads from entering the wait state while a
207 // signal/broadcast is being processed.
208 //
209 _gate.wait();
210 _blocked++;
211 _gate.post();
212 }
213
214 void
postWait(bool timedOutOrFailed) const215 IceUtil::Cond::postWait(bool timedOutOrFailed) const
216 {
217 IceUtil::Mutex::Lock sync(_internal);
218
219 //
220 // One more thread has unblocked.
221 //
222 _unblocked++;
223
224 //
225 // If _state is StateIdle then this must be a timeout, otherwise its a
226 // spurious wakeup which is incorrect.
227 //
228 if(_state == StateIdle)
229 {
230 assert(timedOutOrFailed);
231 return;
232 }
233
234 if(timedOutOrFailed)
235 {
236 //
237 // If the thread was the last blocked thread and there's a
238 // pending signal/broadcast, reset the signal/broadcast to
239 // prevent spurious wakeup.
240 //
241 if(_blocked == _unblocked)
242 {
243 _state = StateIdle;
244 //
245 // Consume the queue post to prevent spurious wakeup. Note
246 // that although the internal mutex could be released
247 // prior to this wait() call, doing so gains nothing since
248 // this wait() MUST return immediately (if it does not
249 // there is a major bug and the entire application will
250 // deadlock).
251 //
252 _queue.wait();
253 //
254 // Release before posting to avoid potential immediate
255 // context switch due to the mutex being locked.
256 //
257 sync.release();
258 _gate.post();
259 }
260 }
261 else
262 {
263 //
264 // At this point, the thread must have been woken up because
265 // of a signal/broadcast.
266 //
267 if(_state == StateSignal || _blocked == _unblocked) // Signal or no more blocked threads
268 {
269 _state = StateIdle;
270 // Release before posting to avoid potential immediate
271 // context switch due to the mutex being locked.
272 sync.release();
273 _gate.post();
274 }
275 else // Broadcast and more blocked threads to wake up.
276 {
277 // Release before posting to avoid potential immediate
278 // context switch due to the mutex being locked.
279 sync.release();
280 _queue.post();
281 }
282 }
283 }
284
285 void
dowait() const286 IceUtil::Cond::dowait() const
287 {
288 try
289 {
290 _queue.wait();
291 postWait(false);
292 }
293 catch(...)
294 {
295 postWait(true);
296 throw;
297 }
298 }
299
300 bool
timedDowait(const Time & timeout) const301 IceUtil::Cond::timedDowait(const Time& timeout) const
302 {
303 try
304 {
305 bool rc = _queue.timedWait(timeout);
306 postWait(!rc);
307 return rc;
308 }
309 catch(...)
310 {
311 postWait(true);
312 throw;
313 }
314 }
315
316 # endif // ICE_HAS_WIN32_CONDVAR
317
318 #else
319
Cond()320 IceUtil::Cond::Cond()
321 {
322 pthread_condattr_t attr;
323
324 int rc = pthread_condattr_init(&attr);
325 if(rc != 0)
326 {
327 throw ThreadSyscallException(__FILE__, __LINE__, rc);
328 }
329
330 #if !defined(__hppa) && !defined(__APPLE__)
331 rc = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
332 if(rc != 0)
333 {
334 throw ThreadSyscallException(__FILE__, __LINE__, rc);
335 }
336 #endif
337
338 rc = pthread_cond_init(&_cond, &attr);
339 if(rc != 0)
340 {
341 throw ThreadSyscallException(__FILE__, __LINE__, rc);
342 }
343
344 rc = pthread_condattr_destroy(&attr);
345 if(rc != 0)
346 {
347 throw ThreadSyscallException(__FILE__, __LINE__, rc);
348 }
349 }
350
~Cond()351 IceUtil::Cond::~Cond()
352 {
353 #ifndef NDEBUG
354 int rc = pthread_cond_destroy(&_cond);
355 assert(rc == 0);
356 #else
357 pthread_cond_destroy(&_cond);
358 #endif
359 }
360
361 void
signal()362 IceUtil::Cond::signal()
363 {
364 int rc = pthread_cond_signal(&_cond);
365 if(rc != 0)
366 {
367 throw ThreadSyscallException(__FILE__, __LINE__, rc);
368 }
369 }
370
371 void
broadcast()372 IceUtil::Cond::broadcast()
373 {
374 int rc = pthread_cond_broadcast(&_cond);
375 if(rc != 0)
376 {
377 throw ThreadSyscallException(__FILE__, __LINE__, rc);
378 }
379 }
380
381 #endif
382