1 // Copyright (C) 1999-2005 Open Source Telecom Corporation.
2 // Copyright (C) 2006-2014 David Sugar, Tycho Softworks.
3 // Copyright (C) 2015 Cherokees of Idaho.
4 //
5 // This program is free software; you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation; either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 //
18 // As a special exception, you may use this file as part of a free software
19 // library without restriction.  Specifically, if other files instantiate
20 // templates or use macros or inline functions from this file, or you compile
21 // this file and link it with other files to produce an executable, this
22 // file does not by itself cause the resulting executable to be covered by
23 // the GNU General Public License.  This exception does not however
24 // invalidate any other reasons why the executable file might be covered by
25 // the GNU General Public License.
26 //
27 // This exception applies only to the code released under the name GNU
28 // Common C++.  If you copy code from other releases into a copy of GNU
29 // Common C++, as the General Public License permits, the exception does
30 // not apply to the code that you add in this way.  To avoid misleading
31 // anyone as to the status of such modified files, you must delete
32 // this exception notice from them.
33 //
34 // If you write modifications of your own for GNU Common C++, it is your choice
35 // whether to permit this exception to apply to your modifications.
36 // If you do not wish that, delete this exception notice.
37 //
38 
39 /**
40  * @file commoncpp/thread.h
41  * @short Common C++ thread class and sychronization objects
42  **/
43 
44 #ifndef COMMONCPP_THREAD_H_
45 #define COMMONCPP_THREAD_H_
46 
47 #ifndef COMMONCPP_CONFIG_H_
48 #include <commoncpp/config.h>
49 #endif
50 
51 #ifndef COMMONCPP_STRING_H_
52 #include <commoncpp/string.h>
53 #endif
54 
55 #define ENTER_CRITICAL  enterMutex();
56 #define LEAVE_CRITICAL  leaveMutex();
57 
58 #include <time.h>
59 
60 namespace ost {
61 
62 class __EXPORT Mutex : protected ucommon::RecursiveMutex
63 {
64 private:
65     __DELETE_COPY(Mutex);
66 
67 public:
Mutex()68     inline Mutex() : RecursiveMutex() {}
69 
enterMutex(void)70     inline void enterMutex(void) {
71         RecursiveMutex::lock();
72     }
73 
leaveMutex(void)74     inline void leaveMutex(void) {
75         RecursiveMutex::release();
76     }
77 
tryEnterMutex(void)78     inline bool tryEnterMutex(void) {
79         return RecursiveMutex::lock(0l);
80     }
81 
enter(void)82     inline void enter(void) {
83         RecursiveMutex::lock();
84     }
85 
leave(void)86     inline void leave(void) {
87         RecursiveMutex::release();
88     }
89 
test(void)90     inline bool test(void) {
91         return RecursiveMutex::lock(0l);
92     }
93 };
94 
95 /**
96  * The Mutex Counter is a counter variable which can safely be incremented
97  * or decremented by multiple threads.  A Mutex is used to protect access
98  * to the counter variable (an integer).  An initial value can be specified
99  * for the counter, and it can be manipulated with the ++ and -- operators.
100  *
101  * @author David Sugar <dyfet@ostel.com>
102  * @short Thread protected integer counter.
103  */
104 class __EXPORT MutexCounter : public Mutex
105 {
106 private:
107     __DELETE_COPY(MutexCounter);
108 
109 protected:
110     volatile int    counter;
111 
112 public:
113     /**
114      * Create and optionally name a mutex protected counter.
115      */
116     MutexCounter();
117 
118     /**
119      * Create and optionally name a mutex protected counter with
120      * an initial value.
121      *
122      * @param initial value of counter.
123      */
124     MutexCounter(int initial);
125 
126     int operator++();
127     int operator--();
128 };
129 
130 /**
131  * The MutexLock class is used to protect a section of code so that at any
132  * given time only a single thread can perform the protected operation.
133  *
134  * It use Mutex to protect operation. Using this class is usefull and
135  * exception safe.  The mutex that has been locked is automatically
136  * released when the function call stack falls out of scope, so one doesnt
137  * have to remember to unlock the mutex at each function return.
138  *
139  * A common use is
140  *
141  * void func_to_protect()
142  * {
143  *   MutexLock lock(mutex);
144  *   ... operation ...
145  * }
146  *
147  * NOTE: do not declare variable as "MutexLock (mutex)", the mutex will be
148  * released at statement end.
149  *
150  * @author Frediano Ziglio <freddy77@angelfire.com>
151  * @short Mutex automatic locker for protected access.
152  */
153 class __EXPORT MutexLock
154 {
155 private:
156     Mutex& mutex;
157 
158     __DELETE_COPY(MutexLock);
159 
160 public:
161     /**
162      * Acquire the mutex
163      *
164      * @param _mutex reference to mutex to aquire.
165      */
MutexLock(Mutex & _mutex)166     inline MutexLock( Mutex& _mutex ) : mutex( _mutex ) {
167         mutex.enterMutex();
168     }
169 
170     /**
171      * Release the mutex automatically
172      */
173     // this should be not-virtual
~MutexLock()174     inline ~MutexLock() {
175         mutex.leaveMutex();
176     }
177 };
178 
179 class __EXPORT ThreadLock : protected ucommon::RWLock
180 {
181 private:
182     __DELETE_COPY(ThreadLock);
183 
184 public:
ThreadLock()185     inline ThreadLock() : ucommon::RWLock() {}
186 
readLock(void)187     inline void readLock(void) {
188         ucommon::RWLock::access();
189     }
190 
writeLock(void)191     inline void writeLock(void) {
192         ucommon::RWLock::modify();
193     }
194 
tryReadLock(void)195     inline void tryReadLock(void) {
196         ucommon::RWLock::access(0);
197     }
198 
tryWriteLock(void)199     inline void tryWriteLock(void) {
200         ucommon::RWLock::modify(0);
201     }
202 
unlock(void)203     inline void unlock(void) {
204         ucommon::RWLock::release();
205     }
206 };
207 
208 /**
209  * The ReadLock class is used to protect a section of code through
210  * a ThreadLock for "read" access to the member function.  The
211  * ThreadLock is automatically released when the object falls out of
212  * scope.
213  *
214  * A common use is
215  *
216  * void func_to_protect()
217  * {
218  *   ReadLock lock(threadlock);
219  *   ... operation ...
220  * }
221  *
222  * NOTE: do not declare variable as "ReadLock (threadlock)", the
223  * mutex will be released at statement end.
224  *
225  * @author David Sugar <dyfet@gnu.org>
226  * @short Read mode automatic locker for protected access.
227  */
228 class __EXPORT ReadLock
229 {
230 private:
231     ThreadLock& tl;
232 
233     __DELETE_COPY(ReadLock);
234 
235 public:
236     /**
237      * Wait for read access
238      *
239      * @param _tl reference to lock to aquire.
240      */
ReadLock(ThreadLock & _tl)241     inline ReadLock( ThreadLock& _tl ) : tl( _tl ) {
242         tl.readLock();
243     }
244     /**
245      * Post the semaphore automatically
246      */
247     // this should be not-virtual
~ReadLock()248     inline ~ReadLock() {
249         tl.unlock();
250     }
251 };
252 
253 /**
254  * The WriteLock class is used to protect a section of code through
255  * a ThreadLock for "write" access to the member function.  The
256  * ThreadLock is automatically released when the object falls out of
257  * scope.
258  *
259  * A common use is
260  *
261  * void func_to_protect()
262  * {
263  *   WriteLock lock(threadlock);
264  *   ... operation ...
265  * }
266  *
267  * NOTE: do not declare variable as "WriteLock (threadlock)", the
268  * mutex will be released at statement end.
269  *
270  * @author David Sugar <dyfet@gnu.org>
271  * @short Read mode automatic locker for protected access.
272  */
273 class __EXPORT WriteLock
274 {
275 private:
276     ThreadLock& tl;
277 
278     __DELETE_COPY(WriteLock);
279 
280 public:
281     /**
282      * Wait for write access
283      *
284      * @param _tl reference to threadlock to aquire.
285      */
WriteLock(ThreadLock & _tl)286     inline WriteLock( ThreadLock& _tl ) : tl( _tl ) {
287         tl.writeLock();
288     }
289     /**
290      * Post the semaphore automatically
291      */
292     // this should be not-virtual
~WriteLock()293     inline ~WriteLock() {
294         tl.unlock();
295     }
296 };
297 
298 class __EXPORT Conditional : private ucommon::Conditional
299 {
300 private:
301     __DELETE_COPY(Conditional);
302 
303 public:
Conditional()304     inline Conditional() : ucommon::Conditional() {}
305 
306     bool wait(timeout_t timeout, bool locked = false);
307 
308     void signal(bool broadcast);
309 
enterMutex(void)310     inline void enterMutex(void) {
311         ucommon::Conditional::lock();
312     }
313 
leaveMutex(void)314     inline void leaveMutex(void) {
315         ucommon::Conditional::unlock();
316     }
317 };
318 
319 class __EXPORT Semaphore : protected ucommon::Semaphore
320 {
321 public:
Semaphore(size)322     inline Semaphore(unsigned size = 0) : ucommon::Semaphore(size) {}
323 
wait(timeout_t timeout)324     inline bool wait(timeout_t timeout) {
325         return ucommon::Semaphore::wait(timeout);
326     }
327 
wait(void)328     inline void wait(void) {
329         ucommon::Semaphore::wait();
330     }
331 
post(void)332     inline void post(void) {
333         ucommon::Semaphore::release();
334     }
335 };
336 
337 /**
338  * The SemaphoreLock class is used to protect a section of code through
339  * a semaphore so that only x instances of the member function may
340  * execute concurrently.
341  *
342  * A common use is
343  *
344  * void func_to_protect()
345  * {
346  *   SemaphoreLock lock(semaphore);
347  *   ... operation ...
348  * }
349  *
350  * NOTE: do not declare variable as "SemaohoreLock (semaphore)", the
351  * mutex will be released at statement end.
352  *
353  * @author David Sugar <dyfet@gnu.org>
354  * @short Semaphore automatic locker for protected access.
355  */
356 class __EXPORT SemaphoreLock
357 {
358 private:
359     Semaphore& sem;
360 
361 public:
362     /**
363      * Wait for the semaphore
364      */
SemaphoreLock(Semaphore & _sem)365     inline SemaphoreLock( Semaphore& _sem ) : sem( _sem ) {
366         sem.wait();
367     }
368     /**
369      * Post the semaphore automatically
370      */
371     // this should be not-virtual
~SemaphoreLock()372     inline ~SemaphoreLock() {
373         sem.post();
374     }
375 };
376 
377 class __EXPORT Event : private ucommon::TimedEvent
378 {
379 private:
380     __DELETE_COPY(Event);
381 
382 public:
Event()383     inline Event() : ucommon::TimedEvent() {}
384 
wait(void)385     inline void wait(void) {
386         ucommon::TimedEvent::wait();
387     }
388 
wait(timeout_t timeout)389     inline bool wait(timeout_t timeout) {
390         return ucommon::TimedEvent::wait(timeout);
391     }
392 
signal(void)393     inline void signal(void) {
394         ucommon::TimedEvent::signal();
395     }
396 
reset(void)397     inline void reset(void) {
398         ucommon::TimedEvent::reset();
399     }
400 
401     inline void set(timeout_t timeout = 0) {
402         ucommon::TimedEvent::set(timeout);
403     }
404 };
405 
406 class __EXPORT Thread : protected ucommon::JoinableThread
407 {
408 public:
409     /**
410      * How to raise error
411      */
412     typedef enum Throw {
413         throwNothing,  /**< continue without throwing error */
414         throwObject,   /**< throw object that cause error (throw this) */
415         throwException /**< throw an object relative to error */
416     } Throw;
417 
418 private:
419     friend class Slog;
420 
421     Throw exceptions;
422     bool detached, terminated;
423     Thread *parent;
424     size_t msgpos;
425     char msgbuf[128];
426 
427     __DELETE_COPY(Thread);
428 
429 public:
430     Thread(int pri = 0, size_t stack = 0);
431 
432     virtual ~Thread();
433 
map(void)434     inline void map(void) {
435         JoinableThread::map();
436     }
437 
438     virtual void initial(void);
439     virtual void notify(Thread *thread);
440     virtual void final(void);
441     virtual void run(void) __OVERRIDE = 0;
442 
443     void terminate(void);
444     void finalize(void);
445 
446     void detach(void);
447     void start(void);
448     void exit(void);
449 
join(void)450     inline void join(void) {
451         JoinableThread::join();
452     }
453 
sync(void)454     inline void sync(void) {
455         Thread::exit();
456     }
457 
get(void)458     static inline Thread *get(void) {
459         return (Thread *)JoinableThread::get();
460     }
461 
yield(void)462     inline static void yield(void) {
463         ucommon::Thread::yield();
464     }
465 
466     inline static void sleep(timeout_t msec = TIMEOUT_INF) {
467         ucommon::Thread::sleep(msec);
468     }
469 
470     bool isRunning(void);
471 
472     bool isThread(void);
473 
474     /**
475      * Get exception mode of the current thread.
476      *
477      * @return exception mode.
478      */
479     static Throw getException(void);
480 
481     /**
482      * Set exception mode of the current thread.
483      *
484      * @return exception mode.
485      */
486     static void setException(Throw mode);
487 
488     /**
489      * Get the thread id.
490      */
getId(void)491     inline pthread_t getId(void) const {
492         return tid;
493     }
494 };
495 
496 /**
497  * This class is used to access non-reentrant date and time functions in the
498  * standard C library.
499  *
500  * The class has two purposes:
501  * - 1 To be used internaly in CommonCpp's date and time classes to make them
502  *     thread safe.
503  * - 2 To be used by clients as thread safe replacements to the standard C
504  *     functions, much like Thread::sleep() represents a thread safe version
505  *     of the standard sleep() function.
506  *
507  * @note The class provides one function with the same name as its equivalent
508  *       standard function and one with another, unique name. For new clients,
509  *       the version with the unique name is recommended to make it easy to
510  *       grep for accidental usage of the standard functions. The version with
511  *       the standard name is provided for existing clients to sed replace their
512  *       original version.
513  *
514  * @note Also note that some functions that returned pointers have been redone
515  *       to take that pointer as an argument instead, making the caller
516  *       responsible for memory allocation/deallocation. This is almost
517  *       how POSIX specifies *_r functions (reentrant versions of the
518  *       standard time functions), except the POSIX functions also return the
519  *       given pointer while we do not. We don't use the *_r functions as they
520  *       aren't all generally available on all platforms yet.
521  *
522  * @author Idar Tollefsen <idar@cognita.no>
523  * @short Thread safe date and time functions.
524  */
525 class __EXPORT SysTime
526 {
527 private:
528     __DELETE_DEFAULTS(SysTime);
529 
530 public:
531     static time_t getTime(time_t *tloc = NULL);
time(time_t * tloc)532     static time_t time(time_t *tloc) {
533         return getTime(tloc);
534     }
535 
536     static int getTimeOfDay(struct timeval *tp);
gettimeofday(struct timeval * tp,struct timezone *)537     static int gettimeofday(struct timeval *tp, struct timezone *) {
538         return getTimeOfDay(tp);
539     }
540 
541     static struct tm *getLocalTime(const time_t *clock, struct tm *result);
locatime(const time_t * clock,struct tm * result)542     static struct tm *locatime(const time_t *clock, struct tm *result) {
543         return getLocalTime(clock, result);
544     }
545 
546     static struct tm *getGMTTime(const time_t *clock, struct tm *result);
gmtime(const time_t * clock,struct tm * result)547     static struct tm *gmtime(const time_t *clock, struct tm *result) {
548         return getGMTTime(clock, result);
549     }
550 };
551 
552 /**
553  * Timer ports are used to provide synchronized timing events when managed
554  * under a "service thread" such as SocketService.  This is made into a
555  * stand-alone base class since other derived libraries (such as the
556  * serial handlers) may also use the pooled "service thread" model
557  * and hence also require this code for managing timing.
558  *
559  * @author David Sugar <dyfet@ostel.com>
560  * @short synchronized millisecond timing for service threads.
561  */
562 class __EXPORT TimerPort
563 {
564 private:
565 #ifndef _MSWINDOWS_
566     struct timeval timer;
567 #else
568     DWORD timer;
569 #endif
570     bool active;
571 
572     __DELETE_COPY(TimerPort);
573 
574 public:
575     /**
576      * Create a timer, mark it as inactive, and set the initial
577      * "start" time to the creation time of the timer object.  This
578      * allows "incTimer" to initially refer to time delays relative
579      * to the original start time of the object.
580      */
581     TimerPort();
582 
583     /**
584      * Set a new start time for the object based on when this call is
585      * made and optionally activate the timer for a specified number
586      * of milliseconds.  This can be used to set the starting time
587      * of a realtime session.
588      *
589      * @param timeout delay in milliseconds from "now"
590      */
591     void setTimer(timeout_t timeout = 0);
592 
593     /**
594      * Set a timeout based on the current time reference value either
595      * from object creation or the last setTimer().  This reference
596      * can be used to time synchronize realtime data over specified
597      * intervals and force expiration when a new frame should be
598      * released in a synchronized manner.
599      *
600      * @param timeout delay in milliseconds from reference.
601      */
602     void incTimer(timeout_t timeout);
603 
604     /**
605      * Adjust a timeout based on the current time reference value either
606      * from object creation or the last setTimer().  This reference
607      * can be used to time synchronize realtime data over specified
608      * intervals and force expiration when a new frame should be
609      * released in a synchronized manner.
610      *
611      * @param timeout delay in milliseconds from reference.
612      */
613     void decTimer(timeout_t timeout);
614 
615     /**
616      * Sleep until the current timer expires.  This is useful in time
617      * syncing realtime periodic tasks.
618      */
619     void sleepTimer(void);
620 
621     /**
622      * This is used to "disable" the service thread from expiring
623      * the timer object.  It does not effect the reference time from
624      * either creation or a setTimer().
625      */
626     void endTimer(void);
627 
628     /**
629      * This is used by service threads to determine how much time
630      * remains before the timer expires based on a timeout specified
631      * in setTimer() or incTimer().  It can also be called after
632      * setting a timeout with incTimer() to see if the current timeout
633      * has already expired and hence that the application is already
634      * delayed and should skip frame(s).
635      *
636      * return time remaining in milliseconds, or TIMEOUT_INF if
637      * inactive.
638      */
639     timeout_t getTimer(void) const;
640 
641     /**
642      * This is used to determine how much time has elapsed since a
643      * timer port setTimer benchmark time was initially set.  This
644      * allows one to use setTimer() to set the timer to the current
645      * time and then measure elapsed time from that point forward.
646      *
647      * return time elapsed in milliseconds, or TIMEOUT_INF if
648      * inactive.
649      */
650     timeout_t getElapsed(void) const;
651 };
652 
653 #ifndef _MSWINDOWS_
654 struct  timespec *getTimeout(struct timespec *spec, timeout_t timeout);
655 #endif
656 
657 #if !defined(_MSWINDOWS_) || defined(_MSTHREADS_)
localtime_r(const time_t * t,struct tm * b)658 inline struct tm *localtime_r(const time_t *t, struct tm *b) {
659     return SysTime::getLocalTime(t, b);
660 }
661 
ctime_r(const time_t * t,char * buf)662 inline char *ctime_r(const time_t *t, char *buf) {
663     return ctime(t);
664 }
665 
gmtime_r(const time_t * t,struct tm * b)666 inline struct tm *gmtime_r(const time_t *t, struct tm *b) {
667     return SysTime::getGMTTime(t, b);
668 }
669 
asctime_r(const struct tm * tm,char * b)670 inline char *asctime_r(const struct tm *tm, char *b) {
671     return asctime(tm);
672 }
673 #endif
674 
getThread(void)675 inline Thread *getThread(void) {
676     return Thread::get();
677 }
678 
679 /**
680  * The buffer class represents an IPC service that is built upon a buffer
681  * of fixed capacity that can be used to transfer objects between one or
682  * more producer and consumer threads.  Producer threads post objects
683  * into the buffer, and consumer threads wait for and receive objects from
684  * the buffer.  Semaphores are used to to block the buffer from overflowing
685  * and indicate when there is data available, and mutexes are used to protect
686  * multiple consumers and producer threads from stepping over each other.
687  *
688  * The buffer class is an abstract class in that the actual data being
689  * buffered is not directly specified within the buffer class itself.  The
690  * buffer class should be used as a base class for a class that actually
691  * impliments buffering and which may be aware of the data types actually
692  * are being buffered.  A template class could be created based on buffer
693  * for this purpose.  Another possibility is to create a class derived
694  * from both Thread and Buffer which can be used to implement message passing
695  * threads.
696  *
697  * @author David Sugar <dyfet@ostel.com>
698  * @short Producer/Consumer buffer for use between threads.
699  */
700 #ifdef  _MSWINDOWS_
701 class __EXPORT Buffer : public Mutex
702 #else
703 class __EXPORT Buffer : public Conditional
704 #endif
705 {
706 private:
707 #ifdef  _MSWINDOWS_
708     HANDLE  sem_head, sem_tail;
709 #endif
710     size_t _size;
711     size_t _used;
712 
713 protected:
714     /**
715      * Invoke derived class buffer peeking method.
716      * @return size of object found.
717      * @param buf pointer to copy contents of head of buffer to.
718      */
719     virtual size_t onPeek(void *buf) = 0;
720 
721     /**
722      * Invoke derived class object request from buffer.
723      * @return size of object returned.
724      * @param buf pointer to hold object returned from the buffer.
725      */
726     virtual size_t onWait(void *buf) = 0;
727 
728     /**
729      * Invoke derived class posting of object to buffer.
730      * @return size of object posted.
731      * @param buf pointer to object being posted to the buffer.
732      */
733     virtual size_t onPost(void *buf) = 0;
734 
735 public:
736     /**
737      * value to return when a timed operation returned with a
738      * timeout.
739      */
740     static const size_t timeout;
741 
742     /**
743      * Create a buffer object of known capacity.
744      * @param capacity is the integer capacity of the buffer.
745      */
746     Buffer(size_t capacity);
747     /**
748      * In derived functions, may be used to free the actual memory
749      * used to hold buffered data.
750      */
751     virtual ~Buffer();
752 
753     /**
754      * Return the capacity of the buffer as specified at creation.
755      * @return size of buffer.
756      */
getSize(void)757     inline size_t getSize(void) const {
758         return _size;
759     }
760 
761     /**
762      * Return the current capacity in use for the buffer.  Free space
763      * is technically getSize() - getUsed().
764      * @return integer used capacity of the buffer.
765      * @see #getSize
766      */
getUsed(void)767     inline size_t getUsed(void) const {
768         return _used;
769     }
770 
771     /**
772      * Let one or more threads wait for an object to become available
773      * in the buffer.  The waiting thread(s) will wait forever if no
774      * object is ever placed into the buffer.
775      *
776      * @return size of object passed by buffer in bytes.
777      * @param buf pointer to store object retrieved from the buffer.
778      * @param timeout time to wait.
779      */
780     size_t wait(void *buf, timeout_t timeout = 0);
781 
782     /**
783      * Post an object into the buffer and enable a waiting thread to
784      * receive it.
785      *
786      * @return size of object posted in bytes.
787      * @param buf pointer to object to store in the buffer.
788      * @param timeout time to wait.
789      */
790     size_t post(void *buf, timeout_t timeout = 0);
791 
792     /**
793      * Peek at the current content (first object) in the buffer.
794      *
795      * @return size of object in the buffer.
796      * @param buf pointer to store object found in the buffer.
797      */
798     size_t peek(void *buf);
799 
800     /**
801      * New virtual to test if buffer is a valid object.
802      * @return true if object is valid.
803      */
804     virtual bool isValid(void);
805 };
806 
807 /**
808  * A buffer class that holds a known capacity of fixed sized objects defined
809  * during creation.
810  *
811  * @author David Sugar <dyfet@ostel.com>
812  * @short producer/consumer buffer for fixed size objects.
813  */
814 class __EXPORT FixedBuffer : public Buffer
815 {
816 private:
817     char *buf, *head, *tail;
818     size_t objsize;
819 
820 protected:
821     /**
822      * Return the first object in the buffer.
823      * @return predefined size of this buffers objects.
824      * @param buf pointer to copy contents of head of buffer to.
825      */
826     size_t onPeek(void *buf) __OVERRIDE;
827 
828     /**
829      * Wait for and return a fixed object in the buffer.
830      * @return predefined size of this buffers objects.
831      * @param buf pointer to hold object returned from the buffer.
832      */
833     size_t onWait(void *buf) __OVERRIDE;
834 
835     /**
836      * Post an object of the appropriate size into the buffer.
837      * @return predefined size of this buffers objects.
838      * @param buf pointer to data to copy into the buffer.
839      */
840     size_t onPost(void *buf) __OVERRIDE;
841 
842 public:
843     /**
844      * Create a buffer of known capacity for objects of a specified
845      * size.
846      *
847      * @param capacity of the buffer.
848      * @param objsize for each object held in the buffer.
849      */
850     FixedBuffer(size_t capacity, size_t objsize);
851 
852     /**
853      * Create a copy of an existing fixed size buffer and duplicate
854      * it's contents.
855      *
856      * @param fb existing FixedBuffer object.
857      */
858     FixedBuffer(const FixedBuffer &fb);
859 
860     /**
861      * Destroy the fixed buffer and free the memory used to store objects.
862      */
863     virtual ~FixedBuffer();
864 
865     FixedBuffer &operator=(const FixedBuffer &fb);
866 
867     bool isValid(void) __OVERRIDE;
868 };
869 
870 /**
871  * Somewhat generic queue processing class to establish a producer
872  * consumer queue.  This may be used to buffer cdr records, or for
873  * other purposes where an in-memory queue is needed for rapid
874  * posting.  This class is derived from Mutex and maintains a linked
875  * list.  A thread is used to dequeue data and pass it to a callback
876  * method that is used in place of "run" for each item present on the
877  * queue.  The conditional is used to signal the run thread when new
878  * data is posted.
879  *
880  * This class was changed by Angelo Naselli to have a timeout on the queue
881  *
882  * @short in memory data queue interface.
883  * @author David Sugar <dyfet@ostel.com>
884  */
885 class __EXPORT ThreadQueue : public Mutex, public Thread, public Semaphore
886 {
887 private:
888     void run(void) __FINAL;         // private run method
889 
890     __DELETE_COPY(ThreadQueue);
891 
892 protected:
893     typedef struct _data {
894         struct _data *next;
895         unsigned len;
896         char data[1];
897     }   data_t;
898 
899     timeout_t timeout;
900     bool started;
901 
902     data_t *first, *last;       // head/tail of list
903 
904     String name;
905 
906     /*
907      * Overloading of final(). It demarks Semaphore to avoid deadlock.
908      */
909     virtual void final() __OVERRIDE;
910 
911     /**
912      * Start of dequeing.  Maybe we need to connect a database
913      * or something, so we have a virtual...
914      */
915     virtual void startQueue(void);
916 
917     /**
918      * End of dequeing, we expect the queue is empty for now.  Maybe
919      * we need to disconnect a database or something, so we have
920      * another virtual.
921      */
922     virtual void stopQueue(void);
923 
924     /**
925      * A derivable method to call when the timout is expired.
926     */
927     virtual void onTimer(void);
928 
929     /**
930      * Virtual callback method to handle processing of a queued
931      * data items.  After the item is processed, it is deleted from
932      * memory.  We can call multiple instances of runQueue in order
933      * if multiple items are waiting.
934      *
935      * @param data item being dequed.
936      */
937     virtual void runQueue(void *data) = 0;
938 
939 public:
940     /**
941      * Create instance of our queue and give it a process priority.
942      *
943      * @param id queue ID.
944      * @param pri process priority.
945      * @param stack stack size.
946      */
947     ThreadQueue(const char *id, int pri, size_t stack = 0);
948 
949     /**
950      * Destroy the queue.
951      */
952     virtual ~ThreadQueue();
953 
954     /**
955      * Set the queue timeout.
956      * When the timer expires, the onTimer() method is called
957      * for the thread
958      *
959      * @param timeout timeout in milliseconds.
960      */
961     void setTimer(timeout_t timeout);
962 
963     /**
964      * Put some unspecified data into this queue.  A new qd
965      * structure is created and sized to contain a copy of
966      * the actual content.
967      *
968      * @param data pointer to data.
969      * @param len size of data.
970      */
971     void post(const void *data, unsigned len);
972 };
973 
974 
975 /** @relates Buffer */
976 inline size_t get(Buffer &b, void *o, timeout_t t = 0) {
977     return b.wait(o, t);
978 }
979 
980 /** @relates Buffer */
981 inline size_t put(Buffer &b, void *o, timeout_t t = 0) {
982     return b.post(o, t);
983 }
984 
985 /** @relates Buffer */
peek(Buffer & b,void * o)986 inline size_t peek(Buffer &b, void *o) {
987     return b.peek(o);
988 }
989 
990 } // namespace ost
991 
992 #endif
993