1 // circular_queue.cxx
2 //
3 // Author(s):
4 // Robert Stiles, KK5VD, Copyright (C) 2013
5 // Dave Freese, W1HKJ, Copyright (C) 2013
6 //
7 // This file is part of FLAMP.
8 //
9 // This is free software; you can redistribute it and/or modify
10 // it under the terms of the GNU General Public License as published by
11 // the Free Software Foundation; either version 3 of the License, or
12 // (at your option) any later version.
13 //
14 // This software is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program. If not, see <http://www.gnu.org/licenses/>.
21 //
22
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <pthread.h>
28 #include <signal.h>
29 #include <sys/types.h>
30 #include <sys/time.h>
31
32 #include "util.h"
33
34 #include "circular_queue.h"
35
36
37 using namespace std;
38
39 /** ********************************************************
40 *
41 ***********************************************************/
Circular_queue()42 Circular_queue::Circular_queue()
43 {
44
45 }
46
47 /** ********************************************************
48 *
49 ***********************************************************/
Circular_queue(int po2,int (* _matchFound)(void *),int (* _readDataFrom)(void *),void * (* _queueParser)(void *))50 Circular_queue::Circular_queue(
51 int po2,
52 int (*_matchFound)(void *),
53 int (*_readDataFrom)(void *),
54 void * (*_queueParser)(void *))
55 {
56 setUp(po2, _matchFound, _readDataFrom, _queueParser);
57 }
58
59 /** ********************************************************
60 *
61 ***********************************************************/
~Circular_queue()62 Circular_queue::~Circular_queue()
63 {
64 signal();
65
66 pthread_mutex_lock(&mutex);
67 exit_thread = 1;
68 pthread_mutex_unlock(&mutex);
69
70 pthread_join(thread, NULL);
71 pthread_cond_destroy(&condition);
72
73 delete [] buffer;
74 }
75
76 /** ********************************************************
77 *
78 ***********************************************************/
setUp(int po2,int (* _matchFound)(void *),int (* _readDataFrom)(void *),void * (* _queueParser)(void *))79 void Circular_queue::setUp(
80 int po2,
81 int (*_matchFound)(void *),
82 int (*_readDataFrom)(void *),
83 void * (*_queueParser)(void *))
84 {
85
86 if(!_matchFound || !_readDataFrom || !_queueParser) {
87 throw CircQueException("Null Calling Function(s)");
88 }
89
90 matchFound = _matchFound;
91 readData = _readDataFrom;
92 queueParser = _queueParser;
93
94 buffer_size = (1 << po2);
95 buffer = new char[buffer_size];
96 if (!buffer) {
97 throw CircQueException("Cannot allocate buffer");
98 }
99
100 memset(buffer, 0, buffer_size);
101 bufferCount = 0;
102
103 index_mask = 1;
104 for(int i = 1; i < po2; i++)
105 index_mask |= (index_mask << 1);
106
107 exit_thread = 0;
108
109 inhibitDataOut = CQUE_HOLD;
110
111 pthread_mutex_init(&mutex, NULL);
112 pthread_cond_init(&condition, NULL);
113
114 int perr = pthread_create(&thread, 0, queueParser, this);
115 if (perr) {
116 throw CircQueException(perr, "Cannot create thread");
117 }
118
119 inhibitDataOut = CQUE_RESUME;
120
121 }
122
123 /** ********************************************************
124 *
125 ***********************************************************/
stopDataOut()126 void Circular_queue::stopDataOut()
127 {
128 inhibitDataOut = CQUE_HOLD;
129 }
130
131 /** ********************************************************
132 *
133 ***********************************************************/
startDataOut()134 void Circular_queue::startDataOut()
135 {
136 inhibitDataOut = CQUE_RESUME;
137 }
138
139 /** ********************************************************
140 *
141 ***********************************************************/
addToQueueNullFiltered(char * _buffer,int _size)142 void Circular_queue::addToQueueNullFiltered(char *_buffer, int _size)
143 {
144 if(!_buffer || _size < 1) return;
145
146 pthread_mutex_lock(&mutex);
147
148 while(_size > 0) {
149 if (bufferCount >= buffer_size) {
150 stalled = 1;
151 break;
152 }
153
154 write_index &= index_mask;
155
156 if(*_buffer) { // Filter out null characters
157 buffer[write_index++] = *_buffer;
158 bufferCount++;
159 }
160 _buffer++;
161 _size--;
162 }
163
164 pthread_mutex_unlock(&mutex);
165 }
166
167 /** ********************************************************
168 *
169 ***********************************************************/
addToQueue(char * _buffer,int _size)170 void Circular_queue::addToQueue(char *_buffer, int _size)
171 {
172 if(!_buffer || _size < 1) return;
173
174 pthread_mutex_lock(&mutex);
175
176 while(_size > 0) {
177 if (bufferCount >= buffer_size) {
178 stalled = 1;
179 break;
180 }
181
182 write_index &= index_mask;
183 buffer[write_index++] = *_buffer++;
184 bufferCount++;
185 _size--;
186 }
187
188 pthread_mutex_unlock(&mutex);
189 }
190
191 /** ********************************************************
192 *
193 ***********************************************************/
lookAheadCRC(char * _buffer,int _size,unsigned int * crcVal,int * reset)194 int Circular_queue::lookAheadCRC(char *_buffer, int _size,
195 unsigned int *crcVal, int *reset)
196 {
197 if(!_buffer || _size < 1 || !crcVal || !reset) return 0;
198
199 int count = 0;
200 int temp_index = 0;
201 int buffer_count = 0;
202 // unsigned int crcval = *crcVal;
203 char *cPtr = _buffer;
204
205 pthread_mutex_lock(&mutex);
206
207 temp_index = read_index;
208 buffer_count = bufferCount;
209
210 if(*reset) {
211 crcValidate.reset();
212 *reset = 0;
213 }
214
215 if (buffer_count > 0) {
216 while(!exit_thread) {
217
218 if (count >= _size) break;
219
220 temp_index &= index_mask;
221
222 if (buffer_count > 0) {
223
224 *cPtr = buffer[temp_index++];
225 cPtr[1] = 0;
226
227 buffer_count--;
228 count++;
229 crcValidate.crc16(*cPtr);
230 cPtr++;
231 } else
232 break;
233
234 }
235 }
236
237 pthread_mutex_unlock(&mutex);
238
239 readQueData(buffer_count);
240
241 *crcVal = crcValidate.val();
242
243 return count;
244
245 }
246
247 /** ********************************************************
248 *
249 ***********************************************************/
lookAhead(char * _buffer,int _size)250 int Circular_queue::lookAhead(char *_buffer, int _size)
251 {
252 if(!_buffer || _size < 1) return 0;
253
254 int count = 0;
255 int temp_index = 0;
256 int buffer_count = 0;
257 char *cPtr = _buffer;
258
259 pthread_mutex_lock(&mutex);
260
261 temp_index = read_index;
262 buffer_count = bufferCount;
263
264 if(buffer_count > 0) {
265 while(!exit_thread) {
266
267 if(count >= _size) break;
268
269 temp_index &= index_mask;
270
271 if(buffer_count > 0) {
272 *cPtr = buffer[temp_index++];
273 cPtr[1] = 0;
274 cPtr++;
275 buffer_count--;
276 count++;
277 } else
278 break;
279 }
280 }
281
282 pthread_mutex_unlock(&mutex);
283
284 readQueData(buffer_count);
285
286 return count;
287 }
288
289 /** ********************************************************
290 *
291 ***********************************************************/
readQueData(int buffer_count)292 int Circular_queue::readQueData(int buffer_count)
293 {
294 if(buffer_count < 32 && readData) {
295 (*readData)((void *)this);
296 }
297
298 pthread_mutex_lock(&mutex);
299 if(stalled)
300 if(buffer_count < buffer_size)
301 stalled = 0;
302 pthread_mutex_unlock(&mutex);
303
304 return 0;
305 }
306
307 /** ********************************************************
308 *
309 ***********************************************************/
adjustReadQueIndex(int count)310 int Circular_queue::adjustReadQueIndex(int count)
311 {
312 if (count < 1) return 0;
313
314 pthread_mutex_lock(&mutex);
315
316 if(bufferCount > 0) {
317 if(count >= bufferCount)
318 count = bufferCount;
319
320 bufferCount -= count;
321 read_index += count;
322 read_index &= index_mask;
323 }
324 if(bufferCount < 1) {
325 bufferCount = 0;
326 write_index = read_index;
327 }
328
329 pthread_mutex_unlock(&mutex);
330
331 readQueData(bufferCount);
332
333 return count;
334 }
335
336 /** ********************************************************
337 *
338 ***********************************************************/
lookAheadToTerminator(char * _buffer,char terminator,int maxLen)339 int Circular_queue::lookAheadToTerminator(char *_buffer, char terminator,
340 int maxLen)
341 {
342 if (!_buffer || maxLen < 1) return 0;
343
344 int count = 0;
345
346 char *cPtr = _buffer;
347
348 pthread_mutex_lock(&mutex);
349
350 int temp_index = read_index;
351 int buffer_count = bufferCount;
352
353 if (buffer_count > 0) {
354 while (!exit_thread) {
355
356 if(count >= maxLen)
357 break;
358
359 temp_index &= index_mask;
360
361 if (buffer_count > 0) {
362 *cPtr = buffer[temp_index++];
363 cPtr[1] = 0;
364 buffer_count--;
365 count++;
366 } else
367 break;
368
369 if(*cPtr == terminator) {
370 break;
371 }
372
373 cPtr++;
374 }
375 }
376
377 pthread_mutex_unlock(&mutex);
378
379 readQueData(buffer_count);
380
381 return count;
382 }
383
384 /** ********************************************************
385 *
386 ***********************************************************/
lookAheadForCharacter(char character,int * found)387 int Circular_queue::lookAheadForCharacter(char character, int *found)
388 {
389 if (!found) return 0;
390
391 int count = 0;
392 char valueRead = 0;
393
394 pthread_mutex_lock(&mutex);
395
396 int temp_index = read_index;
397 int buffer_count = bufferCount;
398
399 if (buffer_count > 0) {
400 *found = 0;
401
402 while (!exit_thread) {
403
404 temp_index &= index_mask;
405
406 if (buffer_count > 0) {
407 valueRead = buffer[temp_index++];
408 buffer_count--;
409 count++;
410 } else
411 break;
412
413 if(valueRead == character) {
414 *found = 1;
415 break;
416 }
417 }
418 }
419
420 pthread_mutex_unlock(&mutex);
421
422 readQueData(buffer_count);
423
424 return count;
425 }
426
427 /** ********************************************************
428 *
429 ***********************************************************/
stopQueue()430 void Circular_queue::stopQueue()
431 {
432 inhibitDataOut = CQUE_HOLD;
433 }
434
435 /** ********************************************************
436 *
437 ***********************************************************/
resumeQueue()438 void Circular_queue::resumeQueue()
439 {
440 inhibitDataOut = CQUE_RESUME;
441 }
442
443 /** ********************************************************
444 *
445 ***********************************************************/
timeOut(time_t & timeValue,time_t seconds,int attribute)446 bool Circular_queue::timeOut(time_t &timeValue, time_t seconds, int attribute)
447 {
448 time_t currentTime = time(NULL);
449 time_t ExpTime = timeValue + seconds;
450 bool ret = false;
451
452 switch(attribute) {
453 case TIME_SET:
454 timeValue = currentTime;
455 ret = true;
456 break;
457
458 case TIME_COUNT:
459 if(currentTime > ExpTime) {
460 timeValue = 0;
461 ret = true;
462 }
463 break;
464 }
465
466 if(timeValue == 0 && seconds > 0)
467 timeValue = currentTime;
468
469 return ret;
470 }
471
472 /** ********************************************************
473 *
474 ***********************************************************/
sleep(int seconds,int milliseconds)475 void Circular_queue::sleep(int seconds, int milliseconds)
476 {
477 struct timespec ts;
478 struct timeval tp;
479
480 gettimeofday(&tp, NULL);
481
482 ts.tv_nsec = (tp.tv_usec * 1000) + milliseconds * 1000000;
483 ts.tv_sec = tp.tv_sec + seconds;
484
485 pthread_mutex_lock(&mutex);
486 pthread_cond_timedwait(&condition, &mutex, &ts);
487 pthread_mutex_unlock(&mutex);
488 }
489
490 /** ********************************************************
491 *
492 ***********************************************************/
signal(void)493 void Circular_queue::signal(void)
494 {
495 pthread_cond_signal(&condition);
496 }
497