1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 #include <IceUtil/Cond.h>
6 
7 #ifndef _WIN32
8 #    include <sys/time.h>
9 #endif
10 
11 #ifdef _WIN32
12 
13 #   ifdef ICE_HAS_WIN32_CONDVAR
14 
Cond()15 IceUtil::Cond::Cond()
16 {
17     InitializeConditionVariable(&_cond);
18 }
19 
~Cond()20 IceUtil::Cond::~Cond()
21 {
22 }
23 
24 void
signal()25 IceUtil::Cond::signal()
26 {
27     WakeConditionVariable(&_cond);
28 }
29 
30 void
broadcast()31 IceUtil::Cond::broadcast()
32 {
33     WakeAllConditionVariable(&_cond);
34 }
35 
36 #   else
37 
Semaphore(long initial)38 IceUtilInternal::Semaphore::Semaphore(long initial)
39 {
40     _sem = CreateSemaphore(0, initial, 0x7fffffff, 0);
41     if(_sem == INVALID_HANDLE_VALUE)
42     {
43         throw IceUtil::ThreadSyscallException(__FILE__, __LINE__, GetLastError());
44     }
45 }
46 
~Semaphore()47 IceUtilInternal::Semaphore::~Semaphore()
48 {
49     CloseHandle(_sem);
50 }
51 
52 void
wait() const53 IceUtilInternal::Semaphore::wait() const
54 {
55     DWORD rc = WaitForSingleObject(_sem, INFINITE);
56     if(rc != WAIT_OBJECT_0)
57     {
58         throw IceUtil::ThreadSyscallException(__FILE__, __LINE__, GetLastError());
59     }
60 }
61 
62 bool
timedWait(const IceUtil::Time & timeout) const63 IceUtilInternal::Semaphore::timedWait(const IceUtil::Time& timeout) const
64 {
65     IceUtil::Int64 msTimeout = timeout.toMilliSeconds();
66     if(msTimeout < 0 || msTimeout > 0x7FFFFFFF)
67     {
68         throw IceUtil::InvalidTimeoutException(__FILE__, __LINE__, timeout);
69     }
70 
71     DWORD rc = WaitForSingleObject(_sem, static_cast<DWORD>(msTimeout));
72     if(rc != WAIT_TIMEOUT && rc != WAIT_OBJECT_0)
73     {
74         throw IceUtil::ThreadSyscallException(__FILE__, __LINE__, GetLastError());
75     }
76     return rc != WAIT_TIMEOUT;
77 }
78 
79 void
post(int count) const80 IceUtilInternal::Semaphore::post(int count) const
81 {
82     int rc = ReleaseSemaphore(_sem, count, 0);
83     if(rc == 0)
84     {
85         throw IceUtil::ThreadSyscallException(__FILE__, __LINE__, GetLastError());
86     }
87 }
88 
89 //
90 // The _queue semaphore is used to wait for the condition variable to
91 // be signaled. When signal is called any further thread signaling or
92 // threads waiting to wait (in preWait) are blocked from proceeding
93 // using an addition _gate semaphore) until the correct number of
94 // threads waiting on the _queue semaphore drain through postWait
95 //
96 // As each thread drains through postWait if there are further threads
97 // to unblock (toUnblock > 0) the _queue is posted again to wake a
98 // further waiting thread, otherwise the _gate is posted which permits
99 // any signaling or preWait threads to continue. Therefore, the _gate
100 // semaphore is used protect further entry into signal or wait until
101 // all signaled threads have woken.
102 //
103 // _blocked is the number of waiting threads.  _unblocked is the
104 // number of threads which have unblocked. We use two variables
105 // because _blocked is protected by the _gate, whereas _unblocked is
106 // protected by the _internal mutex. There is an assumption here about
107 // memory visibility since postWait does not itself acquire the _gate
108 // semaphore (note that the _gate must be held if _state != StateIdle).
109 //
110 // Threads timing out present a particular issue because they may have
111 // woken without a corresponding notification and its easy to leave
112 // the _queue in a state where a spurious wakeup will occur --
113 // consider a notify and a timed wake occuring at the same time. In
114 // this case, if we are not careful the _queue will have been posted,
115 // but the waking thread may not consume the semaphore.
116 //
Cond()117 IceUtil::Cond::Cond() :
118     _gate(1),
119     _blocked(0),
120     _unblocked(0),
121     _state(IceUtil::Cond::StateIdle)
122 {
123 }
124 
~Cond()125 IceUtil::Cond::~Cond()
126 {
127 }
128 
129 void
signal()130 IceUtil::Cond::signal()
131 {
132     wake(false);
133 }
134 
135 void
broadcast()136 IceUtil::Cond::broadcast()
137 {
138     wake(true);
139 }
140 
141 void
wake(bool broadcast)142 IceUtil::Cond::wake(bool broadcast)
143 {
144     //
145     // Lock gate. The gate will be locked if there are threads waiting
146     // to drain from postWait.
147     //
148     _gate.wait();
149 
150     //
151     // Lock the internal mutex.
152     //
153     IceUtil::Mutex::Lock sync(_internal);
154 
155     //
156     // Adjust the count of the number of waiting/blocked threads.
157     //
158     if(_unblocked != 0)
159     {
160         _blocked -= _unblocked;
161         _unblocked = 0;
162     }
163 
164     //
165     // If there are waiting threads then we enter a signal or
166     // broadcast state.
167     //
168     if(_blocked > 0)
169     {
170         //
171         // Unblock some number of waiters. We use -1 for the signal
172         // case.
173         //
174         assert(_state == StateIdle);
175         _state = (broadcast) ? StateBroadcast : StateSignal;
176         //
177         // Posting the queue wakes a single waiting thread. After this
178         // occurs the waiting thread will wake and then either post on
179         // the _queue to wake the next waiting thread, or post on the
180         // gate to permit more signaling to proceed.
181         //
182         // Release before posting to avoid potential immediate
183         // context switch due to the mutex being locked.
184         //
185         sync.release();
186         _queue.post();
187     }
188     else
189     {
190         //
191         // Otherwise no blocked waiters, release the gate.
192         //
193         // Release before posting to avoid potential immediate
194         // context switch due to the mutex being locked.
195         //
196         sync.release();
197         _gate.post();
198     }
199 }
200 
201 void
preWait() const202 IceUtil::Cond::preWait() const
203 {
204     //
205     // _gate is used to protect _blocked. Furthermore, this prevents
206     // further threads from entering the wait state while a
207     // signal/broadcast is being processed.
208     //
209     _gate.wait();
210     _blocked++;
211     _gate.post();
212 }
213 
214 void
postWait(bool timedOutOrFailed) const215 IceUtil::Cond::postWait(bool timedOutOrFailed) const
216 {
217     IceUtil::Mutex::Lock sync(_internal);
218 
219     //
220     // One more thread has unblocked.
221     //
222     _unblocked++;
223 
224     //
225     // If _state is StateIdle then this must be a timeout, otherwise its a
226     // spurious wakeup which is incorrect.
227     //
228     if(_state == StateIdle)
229     {
230         assert(timedOutOrFailed);
231         return;
232     }
233 
234     if(timedOutOrFailed)
235     {
236         //
237         // If the thread was the last blocked thread and there's a
238         // pending signal/broadcast, reset the signal/broadcast to
239         // prevent spurious wakeup.
240         //
241         if(_blocked == _unblocked)
242         {
243             _state = StateIdle;
244             //
245             // Consume the queue post to prevent spurious wakeup. Note
246             // that although the internal mutex could be released
247             // prior to this wait() call, doing so gains nothing since
248             // this wait() MUST return immediately (if it does not
249             // there is a major bug and the entire application will
250             // deadlock).
251             //
252             _queue.wait();
253             //
254             // Release before posting to avoid potential immediate
255             // context switch due to the mutex being locked.
256             //
257             sync.release();
258             _gate.post();
259         }
260     }
261     else
262     {
263         //
264         // At this point, the thread must have been woken up because
265         // of a signal/broadcast.
266         //
267         if(_state == StateSignal || _blocked == _unblocked) // Signal or no more blocked threads
268         {
269             _state = StateIdle;
270             // Release before posting to avoid potential immediate
271             // context switch due to the mutex being locked.
272             sync.release();
273             _gate.post();
274         }
275         else // Broadcast and more blocked threads to wake up.
276         {
277             // Release before posting to avoid potential immediate
278             // context switch due to the mutex being locked.
279             sync.release();
280             _queue.post();
281         }
282     }
283 }
284 
285 void
dowait() const286 IceUtil::Cond::dowait() const
287 {
288     try
289     {
290         _queue.wait();
291         postWait(false);
292     }
293     catch(...)
294     {
295         postWait(true);
296         throw;
297     }
298 }
299 
300 bool
timedDowait(const Time & timeout) const301 IceUtil::Cond::timedDowait(const Time& timeout) const
302 {
303     try
304     {
305         bool rc = _queue.timedWait(timeout);
306         postWait(!rc);
307         return rc;
308     }
309     catch(...)
310     {
311         postWait(true);
312         throw;
313     }
314 }
315 
316 #    endif // ICE_HAS_WIN32_CONDVAR
317 
318 #else
319 
Cond()320 IceUtil::Cond::Cond()
321 {
322     pthread_condattr_t attr;
323 
324     int rc = pthread_condattr_init(&attr);
325     if(rc != 0)
326     {
327         throw ThreadSyscallException(__FILE__, __LINE__, rc);
328     }
329 
330 #if !defined(__hppa) && !defined(__APPLE__)
331     rc = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
332     if(rc != 0)
333     {
334         throw ThreadSyscallException(__FILE__, __LINE__, rc);
335     }
336 #endif
337 
338     rc = pthread_cond_init(&_cond, &attr);
339     if(rc != 0)
340     {
341         throw ThreadSyscallException(__FILE__, __LINE__, rc);
342     }
343 
344     rc = pthread_condattr_destroy(&attr);
345     if(rc != 0)
346     {
347         throw ThreadSyscallException(__FILE__, __LINE__, rc);
348     }
349 }
350 
~Cond()351 IceUtil::Cond::~Cond()
352 {
353 #ifndef NDEBUG
354     int rc = pthread_cond_destroy(&_cond);
355     assert(rc == 0);
356 #else
357     pthread_cond_destroy(&_cond);
358 #endif
359 }
360 
361 void
signal()362 IceUtil::Cond::signal()
363 {
364     int rc = pthread_cond_signal(&_cond);
365     if(rc != 0)
366     {
367         throw ThreadSyscallException(__FILE__, __LINE__, rc);
368     }
369 }
370 
371 void
broadcast()372 IceUtil::Cond::broadcast()
373 {
374     int rc = pthread_cond_broadcast(&_cond);
375     if(rc != 0)
376     {
377         throw ThreadSyscallException(__FILE__, __LINE__, rc);
378     }
379 }
380 
381 #endif
382