1 /*****************************************************************
2 |
3 |      Neptune - Queue :: Posix Implementation
4 |
5 |      (c) 2001-2002 Gilles Boccon-Gibod
6 |      Author: Gilles Boccon-Gibod (bok@bok.net)
7 |
8  ****************************************************************/
9 
10 /*----------------------------------------------------------------------
11 |       includes
12 +---------------------------------------------------------------------*/
13 #if defined(__SYMBIAN32__)
14 #include <stdio.h>
15 #endif
16 #include <pthread.h>
17 #include <time.h>
18 #include <sys/time.h>
19 #include <errno.h>
20 
21 #include "NptConfig.h"
22 #include "NptTypes.h"
23 #include "NptQueue.h"
24 #include "NptThreads.h"
25 #include "NptList.h"
26 #include "NptLogging.h"
27 
28 /*----------------------------------------------------------------------
29 |       logging
30 +---------------------------------------------------------------------*/
31 NPT_SET_LOCAL_LOGGER("neptune.queue.posix")
32 
33 /*----------------------------------------------------------------------
34 |       NPT_PosixQueue
35 +---------------------------------------------------------------------*/
36 class NPT_PosixQueue : public NPT_GenericQueue
37 {
38 public:
39     // methods
40                NPT_PosixQueue(NPT_Cardinal max_items);
41               ~NPT_PosixQueue() override;
42     NPT_Result Push(NPT_QueueItem* item, NPT_Timeout timeout) override;
43     NPT_Result Pop(NPT_QueueItem*& item, NPT_Timeout timeout) override;
44     NPT_Result Peek(NPT_QueueItem*& item, NPT_Timeout timeout) override;
45 
46 private:
47     void       Abort();
48     NPT_Result GetTimeOut(NPT_Timeout timeout, struct timespec& timed);
49 
50 private:
51     // members
52     NPT_Cardinal             m_MaxItems;
53     pthread_mutex_t          m_Mutex;
54     pthread_cond_t           m_CanPushCondition;
55     pthread_cond_t           m_CanPopCondition;
56     NPT_Cardinal             m_PushersWaitingCount;
57     NPT_Cardinal             m_PoppersWaitingCount;
58     NPT_List<NPT_QueueItem*> m_Items;
59     bool                     m_Aborting;
60 };
61 
62 /*----------------------------------------------------------------------
63 |       NPT_PosixQueue::NPT_PosixQueue
64 +---------------------------------------------------------------------*/
NPT_PosixQueue(NPT_Cardinal max_items)65 NPT_PosixQueue::NPT_PosixQueue(NPT_Cardinal max_items) :
66     m_MaxItems(max_items),
67     m_PushersWaitingCount(0),
68     m_PoppersWaitingCount(0),
69     m_Aborting(false)
70 {
71     pthread_mutex_init(&m_Mutex, NULL);
72     pthread_cond_init(&m_CanPushCondition, NULL);
73     pthread_cond_init(&m_CanPopCondition, NULL);
74 }
75 
76 /*----------------------------------------------------------------------
77 |       NPT_PosixQueue::~NPT_PosixQueue()
78 +---------------------------------------------------------------------*/
~NPT_PosixQueue()79 NPT_PosixQueue::~NPT_PosixQueue()
80 {
81     Abort();
82 
83     // destroy resources
84     pthread_cond_destroy(&m_CanPushCondition);
85     pthread_cond_destroy(&m_CanPopCondition);
86     pthread_mutex_destroy(&m_Mutex);
87 }
88 
89 /*----------------------------------------------------------------------
90 |       NPT_PosixQueue::Abort
91 +---------------------------------------------------------------------*/
92 void
Abort()93 NPT_PosixQueue::Abort()
94 {
95     pthread_cond_t abort_condition;
96     pthread_cond_init(&abort_condition, NULL);
97 
98     struct timespec timed;
99     GetTimeOut(20, timed);
100 
101     // acquire mutex
102     if (pthread_mutex_lock(&m_Mutex)) {
103         return;
104     }
105 
106     // tell other threads that they should exit immediately
107     m_Aborting = true;
108 
109     // notify clients
110     pthread_cond_broadcast(&m_CanPopCondition);
111     pthread_cond_broadcast(&m_CanPushCondition);
112 
113     // wait for all waiters to exit
114     while (m_PoppersWaitingCount > 0 || m_PushersWaitingCount > 0) {
115         pthread_cond_timedwait(&abort_condition,
116                                &m_Mutex,
117                                &timed);
118     }
119 
120     pthread_mutex_unlock(&m_Mutex);
121 }
122 
123 /*----------------------------------------------------------------------
124 |       NPT_PosixQueue::GetTimeOut
125 +---------------------------------------------------------------------*/
126 NPT_Result
GetTimeOut(NPT_Timeout timeout,struct timespec & timed)127 NPT_PosixQueue::GetTimeOut(NPT_Timeout timeout, struct timespec& timed)
128 {
129     if (timeout != NPT_TIMEOUT_INFINITE) {
130         // get current time from system
131         struct timeval now;
132         if (gettimeofday(&now, NULL)) {
133             return NPT_FAILURE;
134         }
135 
136         now.tv_usec += timeout * 1000;
137         if (now.tv_usec >= 1000000) {
138             now.tv_sec += now.tv_usec / 1000000;
139             now.tv_usec = now.tv_usec % 1000000;
140         }
141 
142         // setup timeout
143         timed.tv_sec  = now.tv_sec;
144         timed.tv_nsec = now.tv_usec * 1000;
145     }
146     return NPT_SUCCESS;
147 }
148 
149 /*----------------------------------------------------------------------
150 |       NPT_PosixQueue::Push
151 +---------------------------------------------------------------------*/
152 NPT_Result
Push(NPT_QueueItem * item,NPT_Timeout timeout)153 NPT_PosixQueue::Push(NPT_QueueItem* item, NPT_Timeout timeout)
154 {
155     struct timespec timed;
156     if (timeout != NPT_TIMEOUT_INFINITE) {
157         NPT_CHECK(GetTimeOut(timeout, timed));
158     }
159 
160     // lock the mutex that protects the list
161     if (pthread_mutex_lock(&m_Mutex)) {
162         return NPT_FAILURE;
163     }
164 
165     NPT_Result result = NPT_SUCCESS;
166     // check that we have not exceeded the max
167     if (m_MaxItems) {
168         while (m_Items.GetItemCount() >= m_MaxItems) {
169             // wait until we can push
170             ++m_PushersWaitingCount;
171             if (timeout == NPT_TIMEOUT_INFINITE) {
172                 pthread_cond_wait(&m_CanPushCondition, &m_Mutex);
173                 --m_PushersWaitingCount;
174             } else {
175                 int wait_res = pthread_cond_timedwait(&m_CanPushCondition,
176                                                       &m_Mutex,
177                                                       &timed);
178                 --m_PushersWaitingCount;
179                 if (wait_res == ETIMEDOUT) {
180                     result = NPT_ERROR_TIMEOUT;
181                     break;
182                 }
183             }
184 
185             if (m_Aborting) {
186                 result = NPT_ERROR_INTERRUPTED;
187                 break;
188             }
189         }
190     }
191 
192     // add the item to the list
193     if (result == NPT_SUCCESS) {
194         m_Items.Add(item);
195 
196         // wake up any thread that may be waiting to pop
197         if (m_PoppersWaitingCount) {
198             pthread_cond_broadcast(&m_CanPopCondition);
199         }
200     }
201 
202     // unlock the mutex
203     pthread_mutex_unlock(&m_Mutex);
204 
205     return result;
206 }
207 
208 /*----------------------------------------------------------------------
209 |       NPT_PosixQueue::Pop
210 +---------------------------------------------------------------------*/
211 NPT_Result
Pop(NPT_QueueItem * & item,NPT_Timeout timeout)212 NPT_PosixQueue::Pop(NPT_QueueItem*& item, NPT_Timeout timeout)
213 {
214     struct timespec timed;
215     if (timeout != NPT_TIMEOUT_INFINITE) {
216         NPT_CHECK(GetTimeOut(timeout, timed));
217     }
218 
219     // lock the mutex that protects the list
220     if (pthread_mutex_lock(&m_Mutex)) {
221         return NPT_FAILURE;
222     }
223 
224     NPT_Result result;
225     if (timeout) {
226         while ((result = m_Items.PopHead(item)) == NPT_ERROR_LIST_EMPTY) {
227             // no item in the list, wait for one
228             ++m_PoppersWaitingCount;
229             if (timeout == NPT_TIMEOUT_INFINITE) {
230                 pthread_cond_wait(&m_CanPopCondition, &m_Mutex);
231                 --m_PoppersWaitingCount;
232             } else {
233                 int wait_res = pthread_cond_timedwait(&m_CanPopCondition,
234                                                       &m_Mutex,
235                                                       &timed);
236                 --m_PoppersWaitingCount;
237                 if (wait_res == ETIMEDOUT) {
238                     result = NPT_ERROR_TIMEOUT;
239                     break;
240                 }
241             }
242 
243             if (m_Aborting) {
244                 result = NPT_ERROR_INTERRUPTED;
245                 break;
246             }
247         }
248     } else {
249         result = m_Items.PopHead(item);
250     }
251 
252     // wake up any thread that my be waiting to push
253     if (m_MaxItems && (result == NPT_SUCCESS) && m_PushersWaitingCount) {
254         pthread_cond_broadcast(&m_CanPushCondition);
255     }
256 
257     // unlock the mutex
258     pthread_mutex_unlock(&m_Mutex);
259 
260     return result;
261 }
262 
263 /*----------------------------------------------------------------------
264 |       NPT_PosixQueue::Peek
265 +---------------------------------------------------------------------*/
266 NPT_Result
Peek(NPT_QueueItem * & item,NPT_Timeout timeout)267 NPT_PosixQueue::Peek(NPT_QueueItem*& item, NPT_Timeout timeout)
268 {
269     struct timespec timed;
270     if (timeout != NPT_TIMEOUT_INFINITE) {
271         NPT_CHECK(GetTimeOut(timeout, timed));
272     }
273 
274     // lock the mutex that protects the list
275     if (pthread_mutex_lock(&m_Mutex)) {
276         return NPT_FAILURE;
277     }
278 
279     NPT_Result result = NPT_SUCCESS;
280     NPT_List<NPT_QueueItem*>::Iterator head = m_Items.GetFirstItem();
281     if (timeout) {
282         while (!head) {
283             // no item in the list, wait for one
284             ++m_PoppersWaitingCount;
285             if (timeout == NPT_TIMEOUT_INFINITE) {
286                 pthread_cond_wait(&m_CanPopCondition, &m_Mutex);
287                 --m_PoppersWaitingCount;
288             } else {
289                 int wait_res = pthread_cond_timedwait(&m_CanPopCondition,
290                                                       &m_Mutex,
291                                                       &timed);
292                 --m_PoppersWaitingCount;
293                 if (wait_res == ETIMEDOUT) {
294                     result = NPT_ERROR_TIMEOUT;
295                     break;
296                 }
297             }
298 
299             if (m_Aborting) {
300                 result = NPT_ERROR_INTERRUPTED;
301                 break;
302             }
303 
304             head = m_Items.GetFirstItem();
305         }
306     } else {
307         if (!head) result = NPT_ERROR_LIST_EMPTY;
308     }
309 
310     item = head?*head:NULL;
311 
312     // unlock the mutex
313     pthread_mutex_unlock(&m_Mutex);
314 
315     return result;
316 }
317 
318 /*----------------------------------------------------------------------
319 |       NPT_GenericQueue::CreateInstance
320 +---------------------------------------------------------------------*/
321 NPT_GenericQueue*
CreateInstance(NPT_Cardinal max_items)322 NPT_GenericQueue::CreateInstance(NPT_Cardinal max_items)
323 {
324     NPT_LOG_FINER_1("queue max_items = %d", (int)max_items);
325     return new NPT_PosixQueue(max_items);
326 }
327 
328 
329 
330 
331