1 #ifndef __SDR_QUEUE_HH__
2 #define __SDR_QUEUE_HH__
3 
4 #include <list>
5 #include <map>
6 #include "buffer.hh"
7 #include <pthread.h>
8 #include <iostream>
9 
10 
11 namespace sdr {
12 
13 // Forward decl.
14 class SinkBase;
15 
16 /** Interface of a delegate. */
17 class DelegateInterface {
18 public:
19   /** Call back interface. */
20   virtual void operator() () = 0;
21   /** Returns the instance of the delegate. */
22   virtual void *instance() = 0;
23 };
24 
25 /** Specific delegate to a method of an object . */
26 template <class T>
27 class Delegate: public DelegateInterface
28 {
29 public:
30   /** Constructs a delegate to the method @c func of the instance @c instance.*/
Delegate(T * instance,void (T::* func)(void))31   Delegate(T *instance, void (T::*func)(void)) : _instance(instance), _function(func) { }
32   /** Destructor. */
~Delegate()33   virtual ~Delegate() {}
34   /** Callback, simply calls the method of the instance given to the constructor. */
operator ()()35   virtual void operator() () { (_instance->*_function)(); }
36   /** Returns the instance of the delegate. */
instance()37   virtual void *instance() { return _instance; }
38 
39 protected:
40   /** The instance. */
41   T *_instance;
42   /** The method. */
43   void (T::*_function)(void);
44 };
45 
46 
47 /** Central message queue (singleton). Must be created before any other SDR object is constructed.
48  * The queue collects all buffers for processing and routes them to their destination. The queue
49  * loop can either be run in a separate thread by passing @c parallel=true to the factory method
50  * @c get. In this case, the @c exec method will return immediately. Otherwise, the queue loop
51  * will be executed in the thread calling @c exec which blocks until the queue is stopped by
52  * a call to @c stop. */
53 class Queue
54 {
55 public:
56   /** The internal used message type. */
57   class Message {
58   public:
59     /** Constructor. */
Message(const RawBuffer & buffer,SinkBase * sink,bool allow_overwrite)60     Message(const RawBuffer &buffer, SinkBase *sink, bool allow_overwrite)
61       : _buffer(buffer), _sink(sink), _allow_overwrite(allow_overwrite) { }
62     /** Copy constructor. */
Message(const Message & other)63     Message(const Message &other)
64       : _buffer(other._buffer), _sink(other._sink), _allow_overwrite(other._allow_overwrite) { }
65     /** Assignment operator. */
operator =(const Message & other)66     const Message &operator= (const Message &other) {
67       _buffer = other._buffer;
68       _sink   = other._sink;
69       _allow_overwrite = other._allow_overwrite;
70       return *this;
71     }
72     /** Returns the buffer of the message. **/
buffer() const73     inline const RawBuffer &buffer() const { return _buffer; }
74     /** Returns the buffer of the message. **/
buffer()75     inline RawBuffer &buffer() { return _buffer; }
76     /** Returns the destination of the message. **/
sink() const77     inline SinkBase *sink() const { return _sink; }
78     /** If true, the sender allows to overwrite the content of the buffer. **/
allowOverwrite() const79     inline bool allowOverwrite() const { return _allow_overwrite; }
80 
81   protected:
82     /** The buffer being send. */
83     RawBuffer _buffer;
84     /** The destination. */
85     SinkBase *_sink;
86     /** If true, the sender allows to overwrite the buffer. */
87     bool _allow_overwrite;
88   };
89 
90 protected:
91   /** Hidden constructor, use @c get to get the singleton instance. */
92   Queue();
93 
94 public:
95   /** Destructor. */
96   virtual ~Queue();
97 
98   /** Get a reference to the global instance of the queue. If @c parallel is @c true, the
99    * queue will be constructed in parallel mode, means the queue loop will be executed in a
100    * separate thread. Please note that this option is only used in the first call, when the
101    * singleton instance of the queue is created. */
102   static Queue &get();
103 
104   /** Adds a buffer and its receiver to the queue. If @c allow_overwrite is @c true, the
105    * the receiver is allowed to overwrite the content of the buffer. */
106   void send(const RawBuffer &buffer, SinkBase *sink, bool allow_overwrite=false);
107 
108   /** Enters the queue loop, if @c parallel=true was passed to @c get, @c exec will execute the
109    * queue loop in a separate thread and returns immediately. Otherwise, @c exec will block until
110    * the queue is stopped. */
111   void start();
112 
113   /** Signals the queue to stop processing. */
114   void stop();
115   /** Wait for the queue to exit the queue loop. */
116   void wait();
117 
118   /** Returns true if the queue loop is stopped. */
119   bool isStopped() const;
120   /** Returns true if the queue loop is running. */
121   bool isRunning() const;
122 
123   /** Adds a callback to the idle event. The method gets called repeatedly while the queue looop
124    * is idle, means that there are no messages to be processed. This can be used to trigger an
125    * input source to read more data. */
126   template <class T>
addIdle(T * instance,void (T::* function)(void))127   void addIdle(T *instance, void (T::*function)(void)) {
128     _idle.push_back(new Delegate<T>(instance, function));
129   }
130 
131   /** Removes all callbacks of the given instance from the idle signal. */
132   template <class T>
remIdle(T * instance)133   void remIdle(T *instance) {
134     std::list<DelegateInterface *>::iterator item = _idle.begin();
135     while (item != _idle.end()) {
136       if ( (*item)->instance() == ((void *)instance)) {
137         item = _idle.erase(item);
138       } else {
139         item++;
140       }
141     }
142   }
143 
144   /** Adds a callback to the start event. The method gets called once the queue loop is started. */
145   template <class T>
addStart(T * instance,void (T::* function)(void))146   void addStart(T *instance, void (T::*function)(void)) {
147     _onStart.push_back(new Delegate<T>(instance, function));
148   }
149 
150   /** Removes all callbacks of the given instance from the start signal. */
151   template <class T>
remStart(T * instance)152   void remStart(T *instance) {
153     std::list<DelegateInterface *>::iterator item = _onStart.begin();
154     while (item != _onStart.end()) {
155       if ( (*item)->instance() == ((void *)instance)) {
156         item = _onStart.erase(item);
157       } else {
158         item++;
159       }
160     }
161   }
162 
163   /** Adds a callback to the stop event. The method gets called once the queue loop is stopped. */
164   template <class T>
addStop(T * instance,void (T::* function)(void))165   void addStop(T *instance, void (T::*function)(void)) {
166     _onStop.push_back(new Delegate<T>(instance, function));
167   }
168 
169   /** Removes all callbacks of the given instance from the stop signal. */
170   template <class T>
remStop(T * instance)171   void remStop(T *instance) {
172     std::list<DelegateInterface *>::iterator item = _onStop.begin();
173     while (item != _onStop.end()) {
174       if ( (*item)->instance() == ((void *)instance)) {
175         item = _onStop.erase(item);
176       } else {
177         item++;
178       }
179     }
180   }
181 
182 protected:
183   /** The actual queue loop. */
184   void _main();
185   /** Emits the idle signal. */
186   void _signalIdle();
187   /** Emits the start signal. */
188   void _signalStart();
189   /** Emits the stop signal. */
190   void _signalStop();
191 
192 protected:
193   /** While this is true, the queue loop is executed. */
194   bool _running;
195   /** If @c _parallel is true, the thread of the queue loop. */
196   pthread_t _thread;
197   /** The queue mutex. */
198   pthread_mutex_t _queue_lock;
199   /** The queue condition. */
200   pthread_cond_t  _queue_cond;
201 
202   /** The message queue. */
203   std::list<Message> _queue;
204   /** Idle event callbacks. */
205   std::list<DelegateInterface *> _idle;
206   /** Start event callbacks. */
207   std::list<DelegateInterface *> _onStart;
208   /** Stop event callbacks. */
209   std::list<DelegateInterface *> _onStop;
210 
211 private:
212   /** The singleton instance. */
213   static Queue *_instance;
214   /** The pthread function. */
215   static void *__thread_start(void *ptr);
216 };
217 
218 
219 }
220 #endif // __SDR_QUEUE_HH__
221