1 // ------------------------------------------------------------------------ 2 // kvu_message_queue.cpp: Special purpose queue data for RT msg passsing 3 // Copyright (C) 2009,2012 Kai Vehmanen 4 // 5 // Attributes: 6 // eca-style-version: 3 7 // 8 // This program is free software; you can redistribute it and/or modify 9 // it under the terms of the GNU General Public License as published by 10 // the Free Software Foundation; either version 2 of the License, or 11 // (at your option) any later version. 12 // 13 // This program is distributed in the hope that it will be useful, 14 // but WITHOUT ANY WARRANTY; without even the implied warranty of 15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 // GNU General Public License for more details. 17 // 18 // You should have received a copy of the GNU General Public License 19 // along with this program; if not, write to the Free Software 20 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 21 // ------------------------------------------------------------------------ 22 23 #ifndef INCLUDE_KVU_MESSAGE_QUEUE_H 24 #define INCLUDE_KVU_MESSAGE_QUEUE_H 25 26 #include <deque> 27 28 #include <errno.h> 29 #include <pthread.h> 30 #include <sys/time.h> 31 32 #include "kvu_timestamp.h" 33 34 /** 35 * Default for maximum size of the queue for operation in 36 * bounded execution time mode. 37 */ 38 static const size_t msg_queue_rt_max_size_const = 1024; 39 40 /** 41 * A queue implementation for sending generic messages between 42 * threads. 43 * 44 * All the consumer operations are real-time safe, i.e. they have 45 * bounded execution time. However, the bounded execution is 46 * guaranteed only until queue size reaches max_rt_size() items. 47 * 48 * Once queue size reached max_rt_size(), consumer operation is 49 * switched to blocking behaviour. This is done as a safety 50 * measure in real-time use. 51 * 52 * @author Kai Vehmanen 53 */ 54 template<class T> 55 class MESSAGE_QUEUE_RT_C { 56 57 public: 58 59 /** 60 * Class constructor. 61 * 62 * @param max_rt_size Change queue behaviour to be non-determistic 63 * if the number of queued messaes reaches this 64 * limit. This can be used as a safety measure 65 * real-time applications. 66 * 67 * Execution note: may block, may allocate memory 68 */ 69 MESSAGE_QUEUE_RT_C(int max_rt_size = -1) 70 : pending_pops_rep(0) { 71 pthread_mutex_init(&lock_rep, NULL); 72 pthread_cond_init(&cond_rep, NULL); 73 74 if (max_rt_size == -1) 75 max_rt_size_rep = msg_queue_rt_max_size_const; 76 else 77 max_rt_size_rep = static_cast<size_t>(max_rt_size); 78 } 79 80 /** 81 * Adds a new item to the end of the queue. 82 * 83 * Execution note: may block, may allocate memory 84 */ push_back(const T & arg)85 void push_back(const T& arg) { 86 pthread_mutex_lock(&lock_rep); 87 msgs_rep.push_back(arg); 88 pthread_cond_broadcast(&cond_rep); 89 pthread_mutex_unlock(&lock_rep); 90 } 91 92 /** 93 * Fetches, and removes, the front item in the queue. 94 * If the queue is empty, an error is returned. 95 * 96 * Execution note: rt-safe, does not block 97 * 98 * @return 1 on success, -1 if busy, 0 if empty 99 */ pop_front(T * front_msg)100 int pop_front(T* front_msg) { 101 int res = 1; 102 int lockres = pthread_mutex_trylock(&lock_rep); 103 104 if (lockres == 0) { 105 if (msgs_rep.size() > 0) { 106 if (front_msg != 0) 107 *front_msg = msgs_rep.front(); 108 msgs_rep.pop_front(); 109 } 110 else { 111 res = 0; 112 } 113 pthread_mutex_unlock(&lock_rep); 114 } 115 else { 116 res = -1; 117 } 118 119 return res; 120 } 121 122 /** 123 * Fetches but does not remove the front item in the queue. 124 * If the queue is empty, an error is returned. 125 * 126 * Execution note: rt-safe, does not block 127 * 128 * @return 1 on success, -1 if busy, 0 if empty 129 */ peek_front(T * front_msg)130 int peek_front(T* front_msg) { 131 int res = 1; 132 int lockres = pthread_mutex_trylock(&lock_rep); 133 134 if (lockres == 0) { 135 if (msgs_rep.size() > 0) { 136 if (front_msg != 0) 137 *front_msg = msgs_rep.front(); 138 } 139 else { 140 res = 0; 141 } 142 pthread_mutex_unlock(&lock_rep); 143 } 144 else { 145 res = -1; 146 } 147 148 return res; 149 } 150 151 /** 152 * Clears the queue 153 * 154 * Execution note: may block 155 */ clear(void)156 void clear(void) { 157 pthread_mutex_lock(&lock_rep); 158 msgs_rep.clear(); 159 pthread_cond_broadcast(&cond_rep); 160 pthread_mutex_unlock(&lock_rep); 161 } 162 163 /** 164 * Blocks until 'is_empty() != true'. 'timeout_sec' and 165 * 'timeout_usec' specify the upper time limit for blocking. 166 * 167 * Execution: may block, may allocate memory 168 * 169 * @pre is_empty() != true 170 */ poll(int timeout_sec,long int timeout_usec)171 void poll(int timeout_sec, long int timeout_usec) { 172 struct timeval nowtmp; 173 struct timespec now, timeout; 174 int retcode = 0; 175 176 gettimeofday(&nowtmp, NULL); 177 178 now.tv_sec = nowtmp.tv_sec; 179 now.tv_nsec = nowtmp.tv_usec * 1000; 180 timeout.tv_sec = timeout_sec; 181 timeout.tv_nsec = timeout_usec * 1000; 182 kvu_timespec_add(&now, &timeout, &timeout); 183 184 pthread_mutex_lock(&lock_rep); 185 while (msgs_rep.empty() == true && retcode != ETIMEDOUT) { 186 retcode = pthread_cond_timedwait(&cond_rep, &lock_rep, &timeout); 187 } 188 pthread_mutex_unlock(&lock_rep); 189 return; 190 } 191 192 /** 193 * Is queue empty? 194 * 195 * Execution note: rt-safe if queue size within 'max_rt_size' 196 */ is_empty(void)197 bool is_empty(void) const { 198 bool emptyres = false; 199 int ret = pthread_mutex_trylock(&lock_rep); 200 201 /* note: msgs_rep.size() is accessed without holding 202 * a lock, but that's safe as in the worst case 203 * caller blocks unnecessarily */ 204 if (ret != 0 && 205 msgs_rep.size() >= max_rt_size_rep) { 206 207 /* note: queue has grown beyond the rt-safe maximum size, 208 * change to non-bounded mode to force synchronization 209 * between the producer and consumer threads 210 */ 211 ret = pthread_mutex_lock(&lock_rep); 212 } 213 214 if (ret == 0) { 215 emptyres = (msgs_rep.size() == 0); 216 pthread_mutex_unlock(&lock_rep); 217 } 218 219 return emptyres; 220 } 221 max_rt_size(void)222 size_t max_rt_size(void) const { 223 return max_rt_size_rep; 224 } 225 226 private: 227 228 mutable pthread_mutex_t lock_rep; // mutex ensuring exclusive access to buffer 229 mutable pthread_cond_t cond_rep; 230 231 size_t max_rt_size_rep; // only modified in constructor 232 size_t pending_pops_rep; 233 234 T invalid_rep; // only modified in constructor 235 std::deque<T> msgs_rep; 236 237 }; 238 239 #endif /* INCLUDE_KVU_MESSAGE_QUEUE_H */ 240