1 // circular_queue.cxx
2 //
3 //  Author(s):
4 //	Robert Stiles, KK5VD, Copyright (C) 2013
5 //	Dave Freese, W1HKJ, Copyright (C) 2013
6 //
7 // This file is part of FLAMP.
8 //
9 // This is free software; you can redistribute it and/or modify
10 // it under the terms of the GNU General Public License as published by
11 // the Free Software Foundation; either version 3 of the License, or
12 // (at your option) any later version.
13 //
14 // This software is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 //
22 
23 
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <pthread.h>
28 #include <signal.h>
29 #include <sys/types.h>
30 #include <sys/time.h>
31 
32 #include "util.h"
33 
34 #include "circular_queue.h"
35 
36 
37 using namespace std;
38 
39 /** ********************************************************
40  *
41  ***********************************************************/
Circular_queue()42 Circular_queue::Circular_queue()
43 {
44 
45 }
46 
47 /** ********************************************************
48  *
49  ***********************************************************/
Circular_queue(int po2,int (* _matchFound)(void *),int (* _readDataFrom)(void *),void * (* _queueParser)(void *))50 Circular_queue::Circular_queue(
51 							   int po2,
52 							   int (*_matchFound)(void *),
53 							   int (*_readDataFrom)(void *),
54 							   void * (*_queueParser)(void *))
55 {
56 	setUp(po2, _matchFound, _readDataFrom, _queueParser);
57 }
58 
59 /** ********************************************************
60  *
61  ***********************************************************/
~Circular_queue()62 Circular_queue::~Circular_queue()
63 {
64 	signal();
65 
66 	pthread_mutex_lock(&mutex);
67 	exit_thread = 1;
68 	pthread_mutex_unlock(&mutex);
69 
70 	pthread_join(thread, NULL);
71 	pthread_cond_destroy(&condition);
72 
73 	delete [] buffer;
74 }
75 
76 /** ********************************************************
77  *
78  ***********************************************************/
setUp(int po2,int (* _matchFound)(void *),int (* _readDataFrom)(void *),void * (* _queueParser)(void *))79 void Circular_queue::setUp(
80 						   int po2,
81 						   int (*_matchFound)(void *),
82 						   int (*_readDataFrom)(void *),
83 						   void * (*_queueParser)(void *))
84 {
85 
86 	if(!_matchFound || !_readDataFrom || !_queueParser) {
87 		throw CircQueException("Null Calling Function(s)");
88 	}
89 
90 	matchFound = _matchFound;
91 	readData   = _readDataFrom;
92 	queueParser = _queueParser;
93 
94 	buffer_size = (1 << po2);
95 	buffer = new char[buffer_size];
96 	if (!buffer) {
97 		throw CircQueException("Cannot allocate buffer");
98 	}
99 
100 	memset(buffer, 0, buffer_size);
101 	bufferCount = 0;
102 
103 	index_mask = 1;
104 	for(int i = 1; i < po2; i++)
105 		index_mask |= (index_mask << 1);
106 
107 	exit_thread = 0;
108 
109 	inhibitDataOut = CQUE_HOLD;
110 
111 	pthread_mutex_init(&mutex, NULL);
112 	pthread_cond_init(&condition, NULL);
113 
114 	int perr = pthread_create(&thread, 0, queueParser, this);
115 	if (perr) {
116 		throw CircQueException(perr, "Cannot create thread");
117 	}
118 
119 	inhibitDataOut = CQUE_RESUME;
120 
121 }
122 
123 /** ********************************************************
124  *
125  ***********************************************************/
stopDataOut()126 void Circular_queue::stopDataOut()
127 {
128 	inhibitDataOut = CQUE_HOLD;
129 }
130 
131 /** ********************************************************
132  *
133  ***********************************************************/
startDataOut()134 void Circular_queue::startDataOut()
135 {
136 	inhibitDataOut = CQUE_RESUME;
137 }
138 
139 /** ********************************************************
140  *
141  ***********************************************************/
addToQueueNullFiltered(char * _buffer,int _size)142 void Circular_queue::addToQueueNullFiltered(char *_buffer, int _size)
143 {
144 	if(!_buffer || _size < 1) return;
145 
146 	pthread_mutex_lock(&mutex);
147 
148 	while(_size > 0) {
149 		if (bufferCount >= buffer_size) {
150 			stalled = 1;
151 			break;
152 		}
153 
154 		write_index &= index_mask;
155 
156 		if(*_buffer) { // Filter out null characters
157 			buffer[write_index++] = *_buffer;
158 			bufferCount++;
159 		}
160 		_buffer++;
161 		_size--;
162 	}
163 
164 	pthread_mutex_unlock(&mutex);
165 }
166 
167 /** ********************************************************
168  *
169  ***********************************************************/
addToQueue(char * _buffer,int _size)170 void Circular_queue::addToQueue(char *_buffer, int _size)
171 {
172 	if(!_buffer || _size < 1) return;
173 
174 	pthread_mutex_lock(&mutex);
175 
176 	while(_size > 0) {
177 		if (bufferCount >= buffer_size) {
178 			stalled = 1;
179 			break;
180 		}
181 
182 		write_index &= index_mask;
183 		buffer[write_index++] = *_buffer++;
184 		bufferCount++;
185 		_size--;
186 	}
187 
188 	pthread_mutex_unlock(&mutex);
189 }
190 
191 /** ********************************************************
192  *
193  ***********************************************************/
lookAheadCRC(char * _buffer,int _size,unsigned int * crcVal,int * reset)194 int  Circular_queue::lookAheadCRC(char *_buffer, int _size,
195 	unsigned int *crcVal, int *reset)
196 {
197 	if(!_buffer || _size < 1 || !crcVal || !reset) return 0;
198 
199 	int count = 0;
200 	int temp_index = 0;
201 	int buffer_count = 0;
202 //	unsigned int crcval = *crcVal;
203 	char *cPtr = _buffer;
204 
205 	pthread_mutex_lock(&mutex);
206 
207 	temp_index = read_index;
208 	buffer_count = bufferCount;
209 
210 	if(*reset) {
211 		crcValidate.reset();
212 		*reset = 0;
213 	}
214 
215 	if (buffer_count > 0) {
216 		while(!exit_thread) {
217 
218 			if (count >= _size) break;
219 
220 			temp_index &= index_mask;
221 
222 			if (buffer_count > 0)  {
223 
224 				*cPtr = buffer[temp_index++];
225 				cPtr[1] = 0;
226 
227 				buffer_count--;
228 				count++;
229 				crcValidate.crc16(*cPtr);
230 				cPtr++;
231 			} else
232 				break;
233 
234 		}
235 	}
236 
237 	pthread_mutex_unlock(&mutex);
238 
239 	readQueData(buffer_count);
240 
241 	*crcVal = crcValidate.val();
242 
243 	return count;
244 
245 }
246 
247 /** ********************************************************
248  *
249  ***********************************************************/
lookAhead(char * _buffer,int _size)250 int Circular_queue::lookAhead(char *_buffer, int _size)
251 {
252 	if(!_buffer || _size < 1) return 0;
253 
254 	int count = 0;
255 	int temp_index = 0;
256 	int buffer_count = 0;
257 	char *cPtr = _buffer;
258 
259 	pthread_mutex_lock(&mutex);
260 
261 	temp_index = read_index;
262 	buffer_count = bufferCount;
263 
264 	if(buffer_count > 0) {
265 		while(!exit_thread) {
266 
267 			if(count >= _size) break;
268 
269 			temp_index &= index_mask;
270 
271 			if(buffer_count > 0)  {
272 				*cPtr = buffer[temp_index++];
273 				cPtr[1] = 0;
274 				cPtr++;
275 				buffer_count--;
276 				count++;
277 			} else
278 				break;
279 		}
280 	}
281 
282 	pthread_mutex_unlock(&mutex);
283 
284 	readQueData(buffer_count);
285 
286 	return count;
287 }
288 
289 /** ********************************************************
290  *
291  ***********************************************************/
readQueData(int buffer_count)292 int Circular_queue::readQueData(int buffer_count)
293 {
294 	if(buffer_count < 32 && readData) {
295 		(*readData)((void *)this);
296 	}
297 
298 	pthread_mutex_lock(&mutex);
299 	if(stalled)
300 		if(buffer_count < buffer_size)
301 			stalled = 0;
302 	pthread_mutex_unlock(&mutex);
303 
304 	return 0;
305 }
306 
307 /** ********************************************************
308  *
309  ***********************************************************/
adjustReadQueIndex(int count)310 int Circular_queue::adjustReadQueIndex(int count)
311 {
312 	if (count < 1) return 0;
313 
314 	pthread_mutex_lock(&mutex);
315 
316 	if(bufferCount > 0) {
317 		if(count >= bufferCount)
318 			count = bufferCount;
319 
320 		bufferCount -= count;
321 		read_index += count;
322 		read_index &= index_mask;
323 	}
324 	if(bufferCount < 1) {
325 		bufferCount = 0;
326 		write_index = read_index;
327 	}
328 
329 	pthread_mutex_unlock(&mutex);
330 
331 	readQueData(bufferCount);
332 
333 	return count;
334 }
335 
336 /** ********************************************************
337  *
338  ***********************************************************/
lookAheadToTerminator(char * _buffer,char terminator,int maxLen)339 int Circular_queue::lookAheadToTerminator(char *_buffer, char terminator,
340 	int maxLen)
341 {
342 	if (!_buffer || maxLen < 1) return 0;
343 
344 	int count = 0;
345 
346 	char *cPtr = _buffer;
347 
348 	pthread_mutex_lock(&mutex);
349 
350 	int temp_index   = read_index;
351 	int buffer_count = bufferCount;
352 
353 	if (buffer_count > 0) {
354 		while (!exit_thread) {
355 
356 			if(count >= maxLen)
357 				break;
358 
359 			temp_index &= index_mask;
360 
361 			if (buffer_count > 0)  {
362 				*cPtr = buffer[temp_index++];
363 				cPtr[1] = 0;
364 				buffer_count--;
365 				count++;
366 			} else
367 				break;
368 
369 			if(*cPtr == terminator) {
370 				break;
371 			}
372 
373 			cPtr++;
374 		}
375 	}
376 
377 	pthread_mutex_unlock(&mutex);
378 
379 	readQueData(buffer_count);
380 
381 	return count;
382 }
383 
384 /** ********************************************************
385  *
386  ***********************************************************/
lookAheadForCharacter(char character,int * found)387 int Circular_queue::lookAheadForCharacter(char character, int *found)
388 {
389 	if (!found) return 0;
390 
391 	int count = 0;
392 	char valueRead = 0;
393 
394 	pthread_mutex_lock(&mutex);
395 
396 	int temp_index   = read_index;
397 	int buffer_count = bufferCount;
398 
399 	if (buffer_count > 0) {
400 		*found = 0;
401 
402 		while (!exit_thread) {
403 
404 			temp_index &= index_mask;
405 
406 			if (buffer_count > 0)  {
407 				valueRead = buffer[temp_index++];
408 				buffer_count--;
409 				count++;
410 			} else
411 				break;
412 
413 			if(valueRead == character) {
414 				*found = 1;
415 				break;
416 			}
417 		}
418 	}
419 
420 	pthread_mutex_unlock(&mutex);
421 
422 	readQueData(buffer_count);
423 
424 	return count;
425 }
426 
427 /** ********************************************************
428  *
429  ***********************************************************/
stopQueue()430 void Circular_queue::stopQueue()
431 {
432 	inhibitDataOut = CQUE_HOLD;
433 }
434 
435 /** ********************************************************
436  *
437  ***********************************************************/
resumeQueue()438 void Circular_queue::resumeQueue()
439 {
440 	inhibitDataOut = CQUE_RESUME;
441 }
442 
443 /** ********************************************************
444  *
445  ***********************************************************/
timeOut(time_t & timeValue,time_t seconds,int attribute)446 bool Circular_queue::timeOut(time_t &timeValue, time_t seconds, int attribute)
447 {
448 	time_t currentTime = time(NULL);
449 	time_t ExpTime = timeValue + seconds;
450 	bool ret = false;
451 
452 	switch(attribute) {
453 		case TIME_SET:
454 			timeValue = currentTime;
455 			ret = true;
456 			break;
457 
458 		case TIME_COUNT:
459 			if(currentTime > ExpTime) {
460 				timeValue = 0;
461 				ret = true;
462 			}
463 			break;
464 	}
465 
466 	if(timeValue == 0 && seconds > 0)
467 		timeValue = currentTime;
468 
469 	return ret;
470 }
471 
472 /** ********************************************************
473  *
474  ***********************************************************/
sleep(int seconds,int milliseconds)475 void Circular_queue::sleep(int seconds, int milliseconds)
476 {
477 	struct timespec		ts;
478 	struct timeval		tp;
479 
480 	gettimeofday(&tp, NULL);
481 
482 	ts.tv_nsec = (tp.tv_usec * 1000) + milliseconds * 1000000;
483 	ts.tv_sec  = tp.tv_sec + seconds;
484 
485 	pthread_mutex_lock(&mutex);
486 	pthread_cond_timedwait(&condition, &mutex, &ts);
487 	pthread_mutex_unlock(&mutex);
488 }
489 
490 /** ********************************************************
491  *
492  ***********************************************************/
signal(void)493 void Circular_queue::signal(void)
494 {
495 	pthread_cond_signal(&condition);
496 }
497