1 // Copyright (C) 1999-2005 Open Source Telecom Corporation.
2 // Copyright (C) 2006-2014 David Sugar, Tycho Softworks.
3 // Copyright (C) 2015 Cherokees of Idaho.
4 //
5 // This program is free software; you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation; either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with this program. If not, see <http://www.gnu.org/licenses/>.
17 //
18 // As a special exception, you may use this file as part of a free software
19 // library without restriction. Specifically, if other files instantiate
20 // templates or use macros or inline functions from this file, or you compile
21 // this file and link it with other files to produce an executable, this
22 // file does not by itself cause the resulting executable to be covered by
23 // the GNU General Public License. This exception does not however
24 // invalidate any other reasons why the executable file might be covered by
25 // the GNU General Public License.
26 //
27 // This exception applies only to the code released under the name GNU
28 // Common C++. If you copy code from other releases into a copy of GNU
29 // Common C++, as the General Public License permits, the exception does
30 // not apply to the code that you add in this way. To avoid misleading
31 // anyone as to the status of such modified files, you must delete
32 // this exception notice from them.
33 //
34 // If you write modifications of your own for GNU Common C++, it is your choice
35 // whether to permit this exception to apply to your modifications.
36 // If you do not wish that, delete this exception notice.
37 //
38
39 /**
40 * @file commoncpp/thread.h
41 * @short Common C++ thread class and sychronization objects
42 **/
43
44 #ifndef COMMONCPP_THREAD_H_
45 #define COMMONCPP_THREAD_H_
46
47 #ifndef COMMONCPP_CONFIG_H_
48 #include <commoncpp/config.h>
49 #endif
50
51 #ifndef COMMONCPP_STRING_H_
52 #include <commoncpp/string.h>
53 #endif
54
55 #define ENTER_CRITICAL enterMutex();
56 #define LEAVE_CRITICAL leaveMutex();
57
58 #include <time.h>
59
60 namespace ost {
61
62 class __EXPORT Mutex : protected ucommon::RecursiveMutex
63 {
64 private:
65 __DELETE_COPY(Mutex);
66
67 public:
Mutex()68 inline Mutex() : RecursiveMutex() {}
69
enterMutex(void)70 inline void enterMutex(void) {
71 RecursiveMutex::lock();
72 }
73
leaveMutex(void)74 inline void leaveMutex(void) {
75 RecursiveMutex::release();
76 }
77
tryEnterMutex(void)78 inline bool tryEnterMutex(void) {
79 return RecursiveMutex::lock(0l);
80 }
81
enter(void)82 inline void enter(void) {
83 RecursiveMutex::lock();
84 }
85
leave(void)86 inline void leave(void) {
87 RecursiveMutex::release();
88 }
89
test(void)90 inline bool test(void) {
91 return RecursiveMutex::lock(0l);
92 }
93 };
94
95 /**
96 * The Mutex Counter is a counter variable which can safely be incremented
97 * or decremented by multiple threads. A Mutex is used to protect access
98 * to the counter variable (an integer). An initial value can be specified
99 * for the counter, and it can be manipulated with the ++ and -- operators.
100 *
101 * @author David Sugar <dyfet@ostel.com>
102 * @short Thread protected integer counter.
103 */
104 class __EXPORT MutexCounter : public Mutex
105 {
106 private:
107 __DELETE_COPY(MutexCounter);
108
109 protected:
110 volatile int counter;
111
112 public:
113 /**
114 * Create and optionally name a mutex protected counter.
115 */
116 MutexCounter();
117
118 /**
119 * Create and optionally name a mutex protected counter with
120 * an initial value.
121 *
122 * @param initial value of counter.
123 */
124 MutexCounter(int initial);
125
126 int operator++();
127 int operator--();
128 };
129
130 /**
131 * The MutexLock class is used to protect a section of code so that at any
132 * given time only a single thread can perform the protected operation.
133 *
134 * It use Mutex to protect operation. Using this class is usefull and
135 * exception safe. The mutex that has been locked is automatically
136 * released when the function call stack falls out of scope, so one doesnt
137 * have to remember to unlock the mutex at each function return.
138 *
139 * A common use is
140 *
141 * void func_to_protect()
142 * {
143 * MutexLock lock(mutex);
144 * ... operation ...
145 * }
146 *
147 * NOTE: do not declare variable as "MutexLock (mutex)", the mutex will be
148 * released at statement end.
149 *
150 * @author Frediano Ziglio <freddy77@angelfire.com>
151 * @short Mutex automatic locker for protected access.
152 */
153 class __EXPORT MutexLock
154 {
155 private:
156 Mutex& mutex;
157
158 __DELETE_COPY(MutexLock);
159
160 public:
161 /**
162 * Acquire the mutex
163 *
164 * @param _mutex reference to mutex to aquire.
165 */
MutexLock(Mutex & _mutex)166 inline MutexLock( Mutex& _mutex ) : mutex( _mutex ) {
167 mutex.enterMutex();
168 }
169
170 /**
171 * Release the mutex automatically
172 */
173 // this should be not-virtual
~MutexLock()174 inline ~MutexLock() {
175 mutex.leaveMutex();
176 }
177 };
178
179 class __EXPORT ThreadLock : protected ucommon::RWLock
180 {
181 private:
182 __DELETE_COPY(ThreadLock);
183
184 public:
ThreadLock()185 inline ThreadLock() : ucommon::RWLock() {}
186
readLock(void)187 inline void readLock(void) {
188 ucommon::RWLock::access();
189 }
190
writeLock(void)191 inline void writeLock(void) {
192 ucommon::RWLock::modify();
193 }
194
tryReadLock(void)195 inline void tryReadLock(void) {
196 ucommon::RWLock::access(0);
197 }
198
tryWriteLock(void)199 inline void tryWriteLock(void) {
200 ucommon::RWLock::modify(0);
201 }
202
unlock(void)203 inline void unlock(void) {
204 ucommon::RWLock::release();
205 }
206 };
207
208 /**
209 * The ReadLock class is used to protect a section of code through
210 * a ThreadLock for "read" access to the member function. The
211 * ThreadLock is automatically released when the object falls out of
212 * scope.
213 *
214 * A common use is
215 *
216 * void func_to_protect()
217 * {
218 * ReadLock lock(threadlock);
219 * ... operation ...
220 * }
221 *
222 * NOTE: do not declare variable as "ReadLock (threadlock)", the
223 * mutex will be released at statement end.
224 *
225 * @author David Sugar <dyfet@gnu.org>
226 * @short Read mode automatic locker for protected access.
227 */
228 class __EXPORT ReadLock
229 {
230 private:
231 ThreadLock& tl;
232
233 __DELETE_COPY(ReadLock);
234
235 public:
236 /**
237 * Wait for read access
238 *
239 * @param _tl reference to lock to aquire.
240 */
ReadLock(ThreadLock & _tl)241 inline ReadLock( ThreadLock& _tl ) : tl( _tl ) {
242 tl.readLock();
243 }
244 /**
245 * Post the semaphore automatically
246 */
247 // this should be not-virtual
~ReadLock()248 inline ~ReadLock() {
249 tl.unlock();
250 }
251 };
252
253 /**
254 * The WriteLock class is used to protect a section of code through
255 * a ThreadLock for "write" access to the member function. The
256 * ThreadLock is automatically released when the object falls out of
257 * scope.
258 *
259 * A common use is
260 *
261 * void func_to_protect()
262 * {
263 * WriteLock lock(threadlock);
264 * ... operation ...
265 * }
266 *
267 * NOTE: do not declare variable as "WriteLock (threadlock)", the
268 * mutex will be released at statement end.
269 *
270 * @author David Sugar <dyfet@gnu.org>
271 * @short Read mode automatic locker for protected access.
272 */
273 class __EXPORT WriteLock
274 {
275 private:
276 ThreadLock& tl;
277
278 __DELETE_COPY(WriteLock);
279
280 public:
281 /**
282 * Wait for write access
283 *
284 * @param _tl reference to threadlock to aquire.
285 */
WriteLock(ThreadLock & _tl)286 inline WriteLock( ThreadLock& _tl ) : tl( _tl ) {
287 tl.writeLock();
288 }
289 /**
290 * Post the semaphore automatically
291 */
292 // this should be not-virtual
~WriteLock()293 inline ~WriteLock() {
294 tl.unlock();
295 }
296 };
297
298 class __EXPORT Conditional : private ucommon::Conditional
299 {
300 private:
301 __DELETE_COPY(Conditional);
302
303 public:
Conditional()304 inline Conditional() : ucommon::Conditional() {}
305
306 bool wait(timeout_t timeout, bool locked = false);
307
308 void signal(bool broadcast);
309
enterMutex(void)310 inline void enterMutex(void) {
311 ucommon::Conditional::lock();
312 }
313
leaveMutex(void)314 inline void leaveMutex(void) {
315 ucommon::Conditional::unlock();
316 }
317 };
318
319 class __EXPORT Semaphore : protected ucommon::Semaphore
320 {
321 public:
Semaphore(size)322 inline Semaphore(unsigned size = 0) : ucommon::Semaphore(size) {}
323
wait(timeout_t timeout)324 inline bool wait(timeout_t timeout) {
325 return ucommon::Semaphore::wait(timeout);
326 }
327
wait(void)328 inline void wait(void) {
329 ucommon::Semaphore::wait();
330 }
331
post(void)332 inline void post(void) {
333 ucommon::Semaphore::release();
334 }
335 };
336
337 /**
338 * The SemaphoreLock class is used to protect a section of code through
339 * a semaphore so that only x instances of the member function may
340 * execute concurrently.
341 *
342 * A common use is
343 *
344 * void func_to_protect()
345 * {
346 * SemaphoreLock lock(semaphore);
347 * ... operation ...
348 * }
349 *
350 * NOTE: do not declare variable as "SemaohoreLock (semaphore)", the
351 * mutex will be released at statement end.
352 *
353 * @author David Sugar <dyfet@gnu.org>
354 * @short Semaphore automatic locker for protected access.
355 */
356 class __EXPORT SemaphoreLock
357 {
358 private:
359 Semaphore& sem;
360
361 public:
362 /**
363 * Wait for the semaphore
364 */
SemaphoreLock(Semaphore & _sem)365 inline SemaphoreLock( Semaphore& _sem ) : sem( _sem ) {
366 sem.wait();
367 }
368 /**
369 * Post the semaphore automatically
370 */
371 // this should be not-virtual
~SemaphoreLock()372 inline ~SemaphoreLock() {
373 sem.post();
374 }
375 };
376
377 class __EXPORT Event : private ucommon::TimedEvent
378 {
379 private:
380 __DELETE_COPY(Event);
381
382 public:
Event()383 inline Event() : ucommon::TimedEvent() {}
384
wait(void)385 inline void wait(void) {
386 ucommon::TimedEvent::wait();
387 }
388
wait(timeout_t timeout)389 inline bool wait(timeout_t timeout) {
390 return ucommon::TimedEvent::wait(timeout);
391 }
392
signal(void)393 inline void signal(void) {
394 ucommon::TimedEvent::signal();
395 }
396
reset(void)397 inline void reset(void) {
398 ucommon::TimedEvent::reset();
399 }
400
401 inline void set(timeout_t timeout = 0) {
402 ucommon::TimedEvent::set(timeout);
403 }
404 };
405
406 class __EXPORT Thread : protected ucommon::JoinableThread
407 {
408 public:
409 /**
410 * How to raise error
411 */
412 typedef enum Throw {
413 throwNothing, /**< continue without throwing error */
414 throwObject, /**< throw object that cause error (throw this) */
415 throwException /**< throw an object relative to error */
416 } Throw;
417
418 private:
419 friend class Slog;
420
421 Throw exceptions;
422 bool detached, terminated;
423 Thread *parent;
424 size_t msgpos;
425 char msgbuf[128];
426
427 __DELETE_COPY(Thread);
428
429 public:
430 Thread(int pri = 0, size_t stack = 0);
431
432 virtual ~Thread();
433
map(void)434 inline void map(void) {
435 JoinableThread::map();
436 }
437
438 virtual void initial(void);
439 virtual void notify(Thread *thread);
440 virtual void final(void);
441 virtual void run(void) __OVERRIDE = 0;
442
443 void terminate(void);
444 void finalize(void);
445
446 void detach(void);
447 void start(void);
448 void exit(void);
449
join(void)450 inline void join(void) {
451 JoinableThread::join();
452 }
453
sync(void)454 inline void sync(void) {
455 Thread::exit();
456 }
457
get(void)458 static inline Thread *get(void) {
459 return (Thread *)JoinableThread::get();
460 }
461
yield(void)462 inline static void yield(void) {
463 ucommon::Thread::yield();
464 }
465
466 inline static void sleep(timeout_t msec = TIMEOUT_INF) {
467 ucommon::Thread::sleep(msec);
468 }
469
470 bool isRunning(void);
471
472 bool isThread(void);
473
474 /**
475 * Get exception mode of the current thread.
476 *
477 * @return exception mode.
478 */
479 static Throw getException(void);
480
481 /**
482 * Set exception mode of the current thread.
483 *
484 * @return exception mode.
485 */
486 static void setException(Throw mode);
487
488 /**
489 * Get the thread id.
490 */
getId(void)491 inline pthread_t getId(void) const {
492 return tid;
493 }
494 };
495
496 /**
497 * This class is used to access non-reentrant date and time functions in the
498 * standard C library.
499 *
500 * The class has two purposes:
501 * - 1 To be used internaly in CommonCpp's date and time classes to make them
502 * thread safe.
503 * - 2 To be used by clients as thread safe replacements to the standard C
504 * functions, much like Thread::sleep() represents a thread safe version
505 * of the standard sleep() function.
506 *
507 * @note The class provides one function with the same name as its equivalent
508 * standard function and one with another, unique name. For new clients,
509 * the version with the unique name is recommended to make it easy to
510 * grep for accidental usage of the standard functions. The version with
511 * the standard name is provided for existing clients to sed replace their
512 * original version.
513 *
514 * @note Also note that some functions that returned pointers have been redone
515 * to take that pointer as an argument instead, making the caller
516 * responsible for memory allocation/deallocation. This is almost
517 * how POSIX specifies *_r functions (reentrant versions of the
518 * standard time functions), except the POSIX functions also return the
519 * given pointer while we do not. We don't use the *_r functions as they
520 * aren't all generally available on all platforms yet.
521 *
522 * @author Idar Tollefsen <idar@cognita.no>
523 * @short Thread safe date and time functions.
524 */
525 class __EXPORT SysTime
526 {
527 private:
528 __DELETE_DEFAULTS(SysTime);
529
530 public:
531 static time_t getTime(time_t *tloc = NULL);
time(time_t * tloc)532 static time_t time(time_t *tloc) {
533 return getTime(tloc);
534 }
535
536 static int getTimeOfDay(struct timeval *tp);
gettimeofday(struct timeval * tp,struct timezone *)537 static int gettimeofday(struct timeval *tp, struct timezone *) {
538 return getTimeOfDay(tp);
539 }
540
541 static struct tm *getLocalTime(const time_t *clock, struct tm *result);
locatime(const time_t * clock,struct tm * result)542 static struct tm *locatime(const time_t *clock, struct tm *result) {
543 return getLocalTime(clock, result);
544 }
545
546 static struct tm *getGMTTime(const time_t *clock, struct tm *result);
gmtime(const time_t * clock,struct tm * result)547 static struct tm *gmtime(const time_t *clock, struct tm *result) {
548 return getGMTTime(clock, result);
549 }
550 };
551
552 /**
553 * Timer ports are used to provide synchronized timing events when managed
554 * under a "service thread" such as SocketService. This is made into a
555 * stand-alone base class since other derived libraries (such as the
556 * serial handlers) may also use the pooled "service thread" model
557 * and hence also require this code for managing timing.
558 *
559 * @author David Sugar <dyfet@ostel.com>
560 * @short synchronized millisecond timing for service threads.
561 */
562 class __EXPORT TimerPort
563 {
564 private:
565 #ifndef _MSWINDOWS_
566 struct timeval timer;
567 #else
568 DWORD timer;
569 #endif
570 bool active;
571
572 __DELETE_COPY(TimerPort);
573
574 public:
575 /**
576 * Create a timer, mark it as inactive, and set the initial
577 * "start" time to the creation time of the timer object. This
578 * allows "incTimer" to initially refer to time delays relative
579 * to the original start time of the object.
580 */
581 TimerPort();
582
583 /**
584 * Set a new start time for the object based on when this call is
585 * made and optionally activate the timer for a specified number
586 * of milliseconds. This can be used to set the starting time
587 * of a realtime session.
588 *
589 * @param timeout delay in milliseconds from "now"
590 */
591 void setTimer(timeout_t timeout = 0);
592
593 /**
594 * Set a timeout based on the current time reference value either
595 * from object creation or the last setTimer(). This reference
596 * can be used to time synchronize realtime data over specified
597 * intervals and force expiration when a new frame should be
598 * released in a synchronized manner.
599 *
600 * @param timeout delay in milliseconds from reference.
601 */
602 void incTimer(timeout_t timeout);
603
604 /**
605 * Adjust a timeout based on the current time reference value either
606 * from object creation or the last setTimer(). This reference
607 * can be used to time synchronize realtime data over specified
608 * intervals and force expiration when a new frame should be
609 * released in a synchronized manner.
610 *
611 * @param timeout delay in milliseconds from reference.
612 */
613 void decTimer(timeout_t timeout);
614
615 /**
616 * Sleep until the current timer expires. This is useful in time
617 * syncing realtime periodic tasks.
618 */
619 void sleepTimer(void);
620
621 /**
622 * This is used to "disable" the service thread from expiring
623 * the timer object. It does not effect the reference time from
624 * either creation or a setTimer().
625 */
626 void endTimer(void);
627
628 /**
629 * This is used by service threads to determine how much time
630 * remains before the timer expires based on a timeout specified
631 * in setTimer() or incTimer(). It can also be called after
632 * setting a timeout with incTimer() to see if the current timeout
633 * has already expired and hence that the application is already
634 * delayed and should skip frame(s).
635 *
636 * return time remaining in milliseconds, or TIMEOUT_INF if
637 * inactive.
638 */
639 timeout_t getTimer(void) const;
640
641 /**
642 * This is used to determine how much time has elapsed since a
643 * timer port setTimer benchmark time was initially set. This
644 * allows one to use setTimer() to set the timer to the current
645 * time and then measure elapsed time from that point forward.
646 *
647 * return time elapsed in milliseconds, or TIMEOUT_INF if
648 * inactive.
649 */
650 timeout_t getElapsed(void) const;
651 };
652
653 #ifndef _MSWINDOWS_
654 struct timespec *getTimeout(struct timespec *spec, timeout_t timeout);
655 #endif
656
657 #if !defined(_MSWINDOWS_) || defined(_MSTHREADS_)
localtime_r(const time_t * t,struct tm * b)658 inline struct tm *localtime_r(const time_t *t, struct tm *b) {
659 return SysTime::getLocalTime(t, b);
660 }
661
ctime_r(const time_t * t,char * buf)662 inline char *ctime_r(const time_t *t, char *buf) {
663 return ctime(t);
664 }
665
gmtime_r(const time_t * t,struct tm * b)666 inline struct tm *gmtime_r(const time_t *t, struct tm *b) {
667 return SysTime::getGMTTime(t, b);
668 }
669
asctime_r(const struct tm * tm,char * b)670 inline char *asctime_r(const struct tm *tm, char *b) {
671 return asctime(tm);
672 }
673 #endif
674
getThread(void)675 inline Thread *getThread(void) {
676 return Thread::get();
677 }
678
679 /**
680 * The buffer class represents an IPC service that is built upon a buffer
681 * of fixed capacity that can be used to transfer objects between one or
682 * more producer and consumer threads. Producer threads post objects
683 * into the buffer, and consumer threads wait for and receive objects from
684 * the buffer. Semaphores are used to to block the buffer from overflowing
685 * and indicate when there is data available, and mutexes are used to protect
686 * multiple consumers and producer threads from stepping over each other.
687 *
688 * The buffer class is an abstract class in that the actual data being
689 * buffered is not directly specified within the buffer class itself. The
690 * buffer class should be used as a base class for a class that actually
691 * impliments buffering and which may be aware of the data types actually
692 * are being buffered. A template class could be created based on buffer
693 * for this purpose. Another possibility is to create a class derived
694 * from both Thread and Buffer which can be used to implement message passing
695 * threads.
696 *
697 * @author David Sugar <dyfet@ostel.com>
698 * @short Producer/Consumer buffer for use between threads.
699 */
700 #ifdef _MSWINDOWS_
701 class __EXPORT Buffer : public Mutex
702 #else
703 class __EXPORT Buffer : public Conditional
704 #endif
705 {
706 private:
707 #ifdef _MSWINDOWS_
708 HANDLE sem_head, sem_tail;
709 #endif
710 size_t _size;
711 size_t _used;
712
713 protected:
714 /**
715 * Invoke derived class buffer peeking method.
716 * @return size of object found.
717 * @param buf pointer to copy contents of head of buffer to.
718 */
719 virtual size_t onPeek(void *buf) = 0;
720
721 /**
722 * Invoke derived class object request from buffer.
723 * @return size of object returned.
724 * @param buf pointer to hold object returned from the buffer.
725 */
726 virtual size_t onWait(void *buf) = 0;
727
728 /**
729 * Invoke derived class posting of object to buffer.
730 * @return size of object posted.
731 * @param buf pointer to object being posted to the buffer.
732 */
733 virtual size_t onPost(void *buf) = 0;
734
735 public:
736 /**
737 * value to return when a timed operation returned with a
738 * timeout.
739 */
740 static const size_t timeout;
741
742 /**
743 * Create a buffer object of known capacity.
744 * @param capacity is the integer capacity of the buffer.
745 */
746 Buffer(size_t capacity);
747 /**
748 * In derived functions, may be used to free the actual memory
749 * used to hold buffered data.
750 */
751 virtual ~Buffer();
752
753 /**
754 * Return the capacity of the buffer as specified at creation.
755 * @return size of buffer.
756 */
getSize(void)757 inline size_t getSize(void) const {
758 return _size;
759 }
760
761 /**
762 * Return the current capacity in use for the buffer. Free space
763 * is technically getSize() - getUsed().
764 * @return integer used capacity of the buffer.
765 * @see #getSize
766 */
getUsed(void)767 inline size_t getUsed(void) const {
768 return _used;
769 }
770
771 /**
772 * Let one or more threads wait for an object to become available
773 * in the buffer. The waiting thread(s) will wait forever if no
774 * object is ever placed into the buffer.
775 *
776 * @return size of object passed by buffer in bytes.
777 * @param buf pointer to store object retrieved from the buffer.
778 * @param timeout time to wait.
779 */
780 size_t wait(void *buf, timeout_t timeout = 0);
781
782 /**
783 * Post an object into the buffer and enable a waiting thread to
784 * receive it.
785 *
786 * @return size of object posted in bytes.
787 * @param buf pointer to object to store in the buffer.
788 * @param timeout time to wait.
789 */
790 size_t post(void *buf, timeout_t timeout = 0);
791
792 /**
793 * Peek at the current content (first object) in the buffer.
794 *
795 * @return size of object in the buffer.
796 * @param buf pointer to store object found in the buffer.
797 */
798 size_t peek(void *buf);
799
800 /**
801 * New virtual to test if buffer is a valid object.
802 * @return true if object is valid.
803 */
804 virtual bool isValid(void);
805 };
806
807 /**
808 * A buffer class that holds a known capacity of fixed sized objects defined
809 * during creation.
810 *
811 * @author David Sugar <dyfet@ostel.com>
812 * @short producer/consumer buffer for fixed size objects.
813 */
814 class __EXPORT FixedBuffer : public Buffer
815 {
816 private:
817 char *buf, *head, *tail;
818 size_t objsize;
819
820 protected:
821 /**
822 * Return the first object in the buffer.
823 * @return predefined size of this buffers objects.
824 * @param buf pointer to copy contents of head of buffer to.
825 */
826 size_t onPeek(void *buf) __OVERRIDE;
827
828 /**
829 * Wait for and return a fixed object in the buffer.
830 * @return predefined size of this buffers objects.
831 * @param buf pointer to hold object returned from the buffer.
832 */
833 size_t onWait(void *buf) __OVERRIDE;
834
835 /**
836 * Post an object of the appropriate size into the buffer.
837 * @return predefined size of this buffers objects.
838 * @param buf pointer to data to copy into the buffer.
839 */
840 size_t onPost(void *buf) __OVERRIDE;
841
842 public:
843 /**
844 * Create a buffer of known capacity for objects of a specified
845 * size.
846 *
847 * @param capacity of the buffer.
848 * @param objsize for each object held in the buffer.
849 */
850 FixedBuffer(size_t capacity, size_t objsize);
851
852 /**
853 * Create a copy of an existing fixed size buffer and duplicate
854 * it's contents.
855 *
856 * @param fb existing FixedBuffer object.
857 */
858 FixedBuffer(const FixedBuffer &fb);
859
860 /**
861 * Destroy the fixed buffer and free the memory used to store objects.
862 */
863 virtual ~FixedBuffer();
864
865 FixedBuffer &operator=(const FixedBuffer &fb);
866
867 bool isValid(void) __OVERRIDE;
868 };
869
870 /**
871 * Somewhat generic queue processing class to establish a producer
872 * consumer queue. This may be used to buffer cdr records, or for
873 * other purposes where an in-memory queue is needed for rapid
874 * posting. This class is derived from Mutex and maintains a linked
875 * list. A thread is used to dequeue data and pass it to a callback
876 * method that is used in place of "run" for each item present on the
877 * queue. The conditional is used to signal the run thread when new
878 * data is posted.
879 *
880 * This class was changed by Angelo Naselli to have a timeout on the queue
881 *
882 * @short in memory data queue interface.
883 * @author David Sugar <dyfet@ostel.com>
884 */
885 class __EXPORT ThreadQueue : public Mutex, public Thread, public Semaphore
886 {
887 private:
888 void run(void) __FINAL; // private run method
889
890 __DELETE_COPY(ThreadQueue);
891
892 protected:
893 typedef struct _data {
894 struct _data *next;
895 unsigned len;
896 char data[1];
897 } data_t;
898
899 timeout_t timeout;
900 bool started;
901
902 data_t *first, *last; // head/tail of list
903
904 String name;
905
906 /*
907 * Overloading of final(). It demarks Semaphore to avoid deadlock.
908 */
909 virtual void final() __OVERRIDE;
910
911 /**
912 * Start of dequeing. Maybe we need to connect a database
913 * or something, so we have a virtual...
914 */
915 virtual void startQueue(void);
916
917 /**
918 * End of dequeing, we expect the queue is empty for now. Maybe
919 * we need to disconnect a database or something, so we have
920 * another virtual.
921 */
922 virtual void stopQueue(void);
923
924 /**
925 * A derivable method to call when the timout is expired.
926 */
927 virtual void onTimer(void);
928
929 /**
930 * Virtual callback method to handle processing of a queued
931 * data items. After the item is processed, it is deleted from
932 * memory. We can call multiple instances of runQueue in order
933 * if multiple items are waiting.
934 *
935 * @param data item being dequed.
936 */
937 virtual void runQueue(void *data) = 0;
938
939 public:
940 /**
941 * Create instance of our queue and give it a process priority.
942 *
943 * @param id queue ID.
944 * @param pri process priority.
945 * @param stack stack size.
946 */
947 ThreadQueue(const char *id, int pri, size_t stack = 0);
948
949 /**
950 * Destroy the queue.
951 */
952 virtual ~ThreadQueue();
953
954 /**
955 * Set the queue timeout.
956 * When the timer expires, the onTimer() method is called
957 * for the thread
958 *
959 * @param timeout timeout in milliseconds.
960 */
961 void setTimer(timeout_t timeout);
962
963 /**
964 * Put some unspecified data into this queue. A new qd
965 * structure is created and sized to contain a copy of
966 * the actual content.
967 *
968 * @param data pointer to data.
969 * @param len size of data.
970 */
971 void post(const void *data, unsigned len);
972 };
973
974
975 /** @relates Buffer */
976 inline size_t get(Buffer &b, void *o, timeout_t t = 0) {
977 return b.wait(o, t);
978 }
979
980 /** @relates Buffer */
981 inline size_t put(Buffer &b, void *o, timeout_t t = 0) {
982 return b.post(o, t);
983 }
984
985 /** @relates Buffer */
peek(Buffer & b,void * o)986 inline size_t peek(Buffer &b, void *o) {
987 return b.peek(o);
988 }
989
990 } // namespace ost
991
992 #endif
993