1 /*****************************************************************
2 |
3 | Neptune - Queue :: Posix Implementation
4 |
5 | (c) 2001-2002 Gilles Boccon-Gibod
6 | Author: Gilles Boccon-Gibod (bok@bok.net)
7 |
8 ****************************************************************/
9
10 /*----------------------------------------------------------------------
11 | includes
12 +---------------------------------------------------------------------*/
13 #if defined(__SYMBIAN32__)
14 #include <stdio.h>
15 #endif
16 #include <pthread.h>
17 #include <time.h>
18 #include <sys/time.h>
19 #include <errno.h>
20
21 #include "NptConfig.h"
22 #include "NptTypes.h"
23 #include "NptQueue.h"
24 #include "NptThreads.h"
25 #include "NptList.h"
26 #include "NptLogging.h"
27
28 /*----------------------------------------------------------------------
29 | logging
30 +---------------------------------------------------------------------*/
31 NPT_SET_LOCAL_LOGGER("neptune.queue.posix")
32
33 /*----------------------------------------------------------------------
34 | NPT_PosixQueue
35 +---------------------------------------------------------------------*/
36 class NPT_PosixQueue : public NPT_GenericQueue
37 {
38 public:
39 // methods
40 NPT_PosixQueue(NPT_Cardinal max_items);
41 ~NPT_PosixQueue() override;
42 NPT_Result Push(NPT_QueueItem* item, NPT_Timeout timeout) override;
43 NPT_Result Pop(NPT_QueueItem*& item, NPT_Timeout timeout) override;
44 NPT_Result Peek(NPT_QueueItem*& item, NPT_Timeout timeout) override;
45
46 private:
47 void Abort();
48 NPT_Result GetTimeOut(NPT_Timeout timeout, struct timespec& timed);
49
50 private:
51 // members
52 NPT_Cardinal m_MaxItems;
53 pthread_mutex_t m_Mutex;
54 pthread_cond_t m_CanPushCondition;
55 pthread_cond_t m_CanPopCondition;
56 NPT_Cardinal m_PushersWaitingCount;
57 NPT_Cardinal m_PoppersWaitingCount;
58 NPT_List<NPT_QueueItem*> m_Items;
59 bool m_Aborting;
60 };
61
62 /*----------------------------------------------------------------------
63 | NPT_PosixQueue::NPT_PosixQueue
64 +---------------------------------------------------------------------*/
NPT_PosixQueue(NPT_Cardinal max_items)65 NPT_PosixQueue::NPT_PosixQueue(NPT_Cardinal max_items) :
66 m_MaxItems(max_items),
67 m_PushersWaitingCount(0),
68 m_PoppersWaitingCount(0),
69 m_Aborting(false)
70 {
71 pthread_mutex_init(&m_Mutex, NULL);
72 pthread_cond_init(&m_CanPushCondition, NULL);
73 pthread_cond_init(&m_CanPopCondition, NULL);
74 }
75
76 /*----------------------------------------------------------------------
77 | NPT_PosixQueue::~NPT_PosixQueue()
78 +---------------------------------------------------------------------*/
~NPT_PosixQueue()79 NPT_PosixQueue::~NPT_PosixQueue()
80 {
81 Abort();
82
83 // destroy resources
84 pthread_cond_destroy(&m_CanPushCondition);
85 pthread_cond_destroy(&m_CanPopCondition);
86 pthread_mutex_destroy(&m_Mutex);
87 }
88
89 /*----------------------------------------------------------------------
90 | NPT_PosixQueue::Abort
91 +---------------------------------------------------------------------*/
92 void
Abort()93 NPT_PosixQueue::Abort()
94 {
95 pthread_cond_t abort_condition;
96 pthread_cond_init(&abort_condition, NULL);
97
98 struct timespec timed;
99 GetTimeOut(20, timed);
100
101 // acquire mutex
102 if (pthread_mutex_lock(&m_Mutex)) {
103 return;
104 }
105
106 // tell other threads that they should exit immediately
107 m_Aborting = true;
108
109 // notify clients
110 pthread_cond_broadcast(&m_CanPopCondition);
111 pthread_cond_broadcast(&m_CanPushCondition);
112
113 // wait for all waiters to exit
114 while (m_PoppersWaitingCount > 0 || m_PushersWaitingCount > 0) {
115 pthread_cond_timedwait(&abort_condition,
116 &m_Mutex,
117 &timed);
118 }
119
120 pthread_mutex_unlock(&m_Mutex);
121 }
122
123 /*----------------------------------------------------------------------
124 | NPT_PosixQueue::GetTimeOut
125 +---------------------------------------------------------------------*/
126 NPT_Result
GetTimeOut(NPT_Timeout timeout,struct timespec & timed)127 NPT_PosixQueue::GetTimeOut(NPT_Timeout timeout, struct timespec& timed)
128 {
129 if (timeout != NPT_TIMEOUT_INFINITE) {
130 // get current time from system
131 struct timeval now;
132 if (gettimeofday(&now, NULL)) {
133 return NPT_FAILURE;
134 }
135
136 now.tv_usec += timeout * 1000;
137 if (now.tv_usec >= 1000000) {
138 now.tv_sec += now.tv_usec / 1000000;
139 now.tv_usec = now.tv_usec % 1000000;
140 }
141
142 // setup timeout
143 timed.tv_sec = now.tv_sec;
144 timed.tv_nsec = now.tv_usec * 1000;
145 }
146 return NPT_SUCCESS;
147 }
148
149 /*----------------------------------------------------------------------
150 | NPT_PosixQueue::Push
151 +---------------------------------------------------------------------*/
152 NPT_Result
Push(NPT_QueueItem * item,NPT_Timeout timeout)153 NPT_PosixQueue::Push(NPT_QueueItem* item, NPT_Timeout timeout)
154 {
155 struct timespec timed;
156 if (timeout != NPT_TIMEOUT_INFINITE) {
157 NPT_CHECK(GetTimeOut(timeout, timed));
158 }
159
160 // lock the mutex that protects the list
161 if (pthread_mutex_lock(&m_Mutex)) {
162 return NPT_FAILURE;
163 }
164
165 NPT_Result result = NPT_SUCCESS;
166 // check that we have not exceeded the max
167 if (m_MaxItems) {
168 while (m_Items.GetItemCount() >= m_MaxItems) {
169 // wait until we can push
170 ++m_PushersWaitingCount;
171 if (timeout == NPT_TIMEOUT_INFINITE) {
172 pthread_cond_wait(&m_CanPushCondition, &m_Mutex);
173 --m_PushersWaitingCount;
174 } else {
175 int wait_res = pthread_cond_timedwait(&m_CanPushCondition,
176 &m_Mutex,
177 &timed);
178 --m_PushersWaitingCount;
179 if (wait_res == ETIMEDOUT) {
180 result = NPT_ERROR_TIMEOUT;
181 break;
182 }
183 }
184
185 if (m_Aborting) {
186 result = NPT_ERROR_INTERRUPTED;
187 break;
188 }
189 }
190 }
191
192 // add the item to the list
193 if (result == NPT_SUCCESS) {
194 m_Items.Add(item);
195
196 // wake up any thread that may be waiting to pop
197 if (m_PoppersWaitingCount) {
198 pthread_cond_broadcast(&m_CanPopCondition);
199 }
200 }
201
202 // unlock the mutex
203 pthread_mutex_unlock(&m_Mutex);
204
205 return result;
206 }
207
208 /*----------------------------------------------------------------------
209 | NPT_PosixQueue::Pop
210 +---------------------------------------------------------------------*/
211 NPT_Result
Pop(NPT_QueueItem * & item,NPT_Timeout timeout)212 NPT_PosixQueue::Pop(NPT_QueueItem*& item, NPT_Timeout timeout)
213 {
214 struct timespec timed;
215 if (timeout != NPT_TIMEOUT_INFINITE) {
216 NPT_CHECK(GetTimeOut(timeout, timed));
217 }
218
219 // lock the mutex that protects the list
220 if (pthread_mutex_lock(&m_Mutex)) {
221 return NPT_FAILURE;
222 }
223
224 NPT_Result result;
225 if (timeout) {
226 while ((result = m_Items.PopHead(item)) == NPT_ERROR_LIST_EMPTY) {
227 // no item in the list, wait for one
228 ++m_PoppersWaitingCount;
229 if (timeout == NPT_TIMEOUT_INFINITE) {
230 pthread_cond_wait(&m_CanPopCondition, &m_Mutex);
231 --m_PoppersWaitingCount;
232 } else {
233 int wait_res = pthread_cond_timedwait(&m_CanPopCondition,
234 &m_Mutex,
235 &timed);
236 --m_PoppersWaitingCount;
237 if (wait_res == ETIMEDOUT) {
238 result = NPT_ERROR_TIMEOUT;
239 break;
240 }
241 }
242
243 if (m_Aborting) {
244 result = NPT_ERROR_INTERRUPTED;
245 break;
246 }
247 }
248 } else {
249 result = m_Items.PopHead(item);
250 }
251
252 // wake up any thread that my be waiting to push
253 if (m_MaxItems && (result == NPT_SUCCESS) && m_PushersWaitingCount) {
254 pthread_cond_broadcast(&m_CanPushCondition);
255 }
256
257 // unlock the mutex
258 pthread_mutex_unlock(&m_Mutex);
259
260 return result;
261 }
262
263 /*----------------------------------------------------------------------
264 | NPT_PosixQueue::Peek
265 +---------------------------------------------------------------------*/
266 NPT_Result
Peek(NPT_QueueItem * & item,NPT_Timeout timeout)267 NPT_PosixQueue::Peek(NPT_QueueItem*& item, NPT_Timeout timeout)
268 {
269 struct timespec timed;
270 if (timeout != NPT_TIMEOUT_INFINITE) {
271 NPT_CHECK(GetTimeOut(timeout, timed));
272 }
273
274 // lock the mutex that protects the list
275 if (pthread_mutex_lock(&m_Mutex)) {
276 return NPT_FAILURE;
277 }
278
279 NPT_Result result = NPT_SUCCESS;
280 NPT_List<NPT_QueueItem*>::Iterator head = m_Items.GetFirstItem();
281 if (timeout) {
282 while (!head) {
283 // no item in the list, wait for one
284 ++m_PoppersWaitingCount;
285 if (timeout == NPT_TIMEOUT_INFINITE) {
286 pthread_cond_wait(&m_CanPopCondition, &m_Mutex);
287 --m_PoppersWaitingCount;
288 } else {
289 int wait_res = pthread_cond_timedwait(&m_CanPopCondition,
290 &m_Mutex,
291 &timed);
292 --m_PoppersWaitingCount;
293 if (wait_res == ETIMEDOUT) {
294 result = NPT_ERROR_TIMEOUT;
295 break;
296 }
297 }
298
299 if (m_Aborting) {
300 result = NPT_ERROR_INTERRUPTED;
301 break;
302 }
303
304 head = m_Items.GetFirstItem();
305 }
306 } else {
307 if (!head) result = NPT_ERROR_LIST_EMPTY;
308 }
309
310 item = head?*head:NULL;
311
312 // unlock the mutex
313 pthread_mutex_unlock(&m_Mutex);
314
315 return result;
316 }
317
318 /*----------------------------------------------------------------------
319 | NPT_GenericQueue::CreateInstance
320 +---------------------------------------------------------------------*/
321 NPT_GenericQueue*
CreateInstance(NPT_Cardinal max_items)322 NPT_GenericQueue::CreateInstance(NPT_Cardinal max_items)
323 {
324 NPT_LOG_FINER_1("queue max_items = %d", (int)max_items);
325 return new NPT_PosixQueue(max_items);
326 }
327
328
329
330
331