1 // ------------------------------------------------------------------------
2 // kvu_message_queue.cpp: Special purpose queue data for RT msg passsing
3 // Copyright (C) 2009,2012 Kai Vehmanen
4 //
5 // Attributes:
6 //     eca-style-version: 3
7 //
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12 //
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU General Public License
19 // along with this program; if not, write to the Free Software
20 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
21 // ------------------------------------------------------------------------
22 
23 #ifndef INCLUDE_KVU_MESSAGE_QUEUE_H
24 #define INCLUDE_KVU_MESSAGE_QUEUE_H
25 
26 #include <deque>
27 
28 #include <errno.h>
29 #include <pthread.h>
30 #include <sys/time.h>
31 
32 #include "kvu_timestamp.h"
33 
34 /**
35  * Default for maximum size of the queue for operation in
36  * bounded execution time mode.
37  */
38 static const size_t msg_queue_rt_max_size_const = 1024;
39 
40 /**
41  * A queue implementation for sending generic messages between
42  * threads.
43  *
44  * All the consumer operations are real-time safe, i.e. they have
45  * bounded execution time. However, the bounded execution is
46  * guaranteed only until queue size reaches max_rt_size() items.
47  *
48  * Once queue size reached max_rt_size(), consumer operation is
49  * switched to blocking behaviour. This is done as a safety
50  * measure in real-time use.
51  *
52  * @author Kai Vehmanen
53  */
54 template<class T>
55 class MESSAGE_QUEUE_RT_C {
56 
57 public:
58 
59   /**
60    * Class constructor.
61    *
62    * @param max_rt_size Change queue behaviour to be non-determistic
63    *                    if the number of queued messaes reaches this
64    *                    limit. This can be used as a safety measure
65    *                    real-time applications.
66    *
67    * Execution note: may block, may allocate memory
68    */
69   MESSAGE_QUEUE_RT_C(int max_rt_size = -1)
70     : pending_pops_rep(0) {
71     pthread_mutex_init(&lock_rep, NULL);
72     pthread_cond_init(&cond_rep, NULL);
73 
74     if (max_rt_size == -1)
75       max_rt_size_rep = msg_queue_rt_max_size_const;
76     else
77       max_rt_size_rep = static_cast<size_t>(max_rt_size);
78   }
79 
80   /**
81    * Adds a new item to the end of the queue.
82    *
83    * Execution note: may block, may allocate memory
84    */
push_back(const T & arg)85   void push_back(const T& arg) {
86     pthread_mutex_lock(&lock_rep);
87     msgs_rep.push_back(arg);
88     pthread_cond_broadcast(&cond_rep);
89     pthread_mutex_unlock(&lock_rep);
90   }
91 
92   /**
93    * Fetches, and removes, the front item in the queue.
94    * If the queue is empty, an error is returned.
95    *
96    * Execution note: rt-safe, does not block
97    *
98    * @return 1 on success, -1 if busy, 0 if empty
99    */
pop_front(T * front_msg)100   int pop_front(T* front_msg) {
101     int res = 1;
102     int lockres = pthread_mutex_trylock(&lock_rep);
103 
104     if (lockres == 0) {
105       if (msgs_rep.size() > 0) {
106 	if (front_msg != 0)
107 	  *front_msg = msgs_rep.front();
108 	msgs_rep.pop_front();
109       }
110       else {
111 	res = 0;
112       }
113       pthread_mutex_unlock(&lock_rep);
114     }
115     else {
116       res = -1;
117     }
118 
119     return res;
120   }
121 
122   /**
123    * Fetches but does not remove the front item in the queue.
124    * If the queue is empty, an error is returned.
125    *
126    * Execution note: rt-safe, does not block
127    *
128    * @return 1 on success, -1 if busy, 0 if empty
129    */
peek_front(T * front_msg)130   int peek_front(T* front_msg) {
131     int res = 1;
132     int lockres = pthread_mutex_trylock(&lock_rep);
133 
134     if (lockres == 0) {
135       if (msgs_rep.size() > 0) {
136         if (front_msg != 0)
137           *front_msg = msgs_rep.front();
138       }
139       else {
140         res = 0;
141       }
142       pthread_mutex_unlock(&lock_rep);
143     }
144     else {
145       res = -1;
146     }
147 
148     return res;
149   }
150 
151   /**
152    * Clears the queue
153    *
154    * Execution note: may block
155    */
clear(void)156   void clear(void) {
157     pthread_mutex_lock(&lock_rep);
158     msgs_rep.clear();
159     pthread_cond_broadcast(&cond_rep);
160     pthread_mutex_unlock(&lock_rep);
161   }
162 
163   /**
164    * Blocks until 'is_empty() != true'. 'timeout_sec' and
165    * 'timeout_usec' specify the upper time limit for blocking.
166    *
167    * Execution: may block, may allocate memory
168    *
169    * @pre is_empty() != true
170    */
poll(int timeout_sec,long int timeout_usec)171   void poll(int timeout_sec, long int timeout_usec) {
172     struct timeval nowtmp;
173     struct timespec now, timeout;
174     int retcode = 0;
175 
176     gettimeofday(&nowtmp, NULL);
177 
178     now.tv_sec = nowtmp.tv_sec;
179     now.tv_nsec = nowtmp.tv_usec * 1000;
180     timeout.tv_sec = timeout_sec;
181     timeout.tv_nsec = timeout_usec * 1000;
182     kvu_timespec_add(&now, &timeout, &timeout);
183 
184     pthread_mutex_lock(&lock_rep);
185     while (msgs_rep.empty() == true && retcode != ETIMEDOUT) {
186       retcode = pthread_cond_timedwait(&cond_rep, &lock_rep, &timeout);
187     }
188     pthread_mutex_unlock(&lock_rep);
189     return;
190   }
191 
192   /**
193    * Is queue empty?
194    *
195    * Execution note: rt-safe if queue size within 'max_rt_size'
196    */
is_empty(void)197   bool is_empty(void) const {
198     bool emptyres = false;
199     int ret = pthread_mutex_trylock(&lock_rep);
200 
201     /* note: msgs_rep.size() is accessed without holding
202      *       a lock, but that's safe as in the worst case
203      *       caller blocks unnecessarily */
204     if (ret != 0 &&
205 	msgs_rep.size() >= max_rt_size_rep) {
206 
207       /* note: queue has grown beyond the rt-safe maximum size,
208        *       change to non-bounded mode to force synchronization
209        *       between the producer and consumer threads
210        */
211       ret = pthread_mutex_lock(&lock_rep);
212     }
213 
214     if (ret == 0) {
215       emptyres = (msgs_rep.size() == 0);
216       pthread_mutex_unlock(&lock_rep);
217     }
218 
219     return emptyres;
220   }
221 
max_rt_size(void)222   size_t max_rt_size(void) const {
223     return max_rt_size_rep;
224   }
225 
226 private:
227 
228   mutable pthread_mutex_t lock_rep; // mutex ensuring exclusive access to buffer
229   mutable pthread_cond_t cond_rep;
230 
231   size_t max_rt_size_rep;   // only modified in constructor
232   size_t pending_pops_rep;
233 
234   T invalid_rep;            // only modified in constructor
235   std::deque<T> msgs_rep;
236 
237 };
238 
239 #endif /* INCLUDE_KVU_MESSAGE_QUEUE_H */
240