1 #ifndef __SDR_QUEUE_HH__ 2 #define __SDR_QUEUE_HH__ 3 4 #include <list> 5 #include <map> 6 #include "buffer.hh" 7 #include <pthread.h> 8 #include <iostream> 9 10 11 namespace sdr { 12 13 // Forward decl. 14 class SinkBase; 15 16 /** Interface of a delegate. */ 17 class DelegateInterface { 18 public: 19 /** Call back interface. */ 20 virtual void operator() () = 0; 21 /** Returns the instance of the delegate. */ 22 virtual void *instance() = 0; 23 }; 24 25 /** Specific delegate to a method of an object . */ 26 template <class T> 27 class Delegate: public DelegateInterface 28 { 29 public: 30 /** Constructs a delegate to the method @c func of the instance @c instance.*/ Delegate(T * instance,void (T::* func)(void))31 Delegate(T *instance, void (T::*func)(void)) : _instance(instance), _function(func) { } 32 /** Destructor. */ ~Delegate()33 virtual ~Delegate() {} 34 /** Callback, simply calls the method of the instance given to the constructor. */ operator ()()35 virtual void operator() () { (_instance->*_function)(); } 36 /** Returns the instance of the delegate. */ instance()37 virtual void *instance() { return _instance; } 38 39 protected: 40 /** The instance. */ 41 T *_instance; 42 /** The method. */ 43 void (T::*_function)(void); 44 }; 45 46 47 /** Central message queue (singleton). Must be created before any other SDR object is constructed. 48 * The queue collects all buffers for processing and routes them to their destination. The queue 49 * loop can either be run in a separate thread by passing @c parallel=true to the factory method 50 * @c get. In this case, the @c exec method will return immediately. Otherwise, the queue loop 51 * will be executed in the thread calling @c exec which blocks until the queue is stopped by 52 * a call to @c stop. */ 53 class Queue 54 { 55 public: 56 /** The internal used message type. */ 57 class Message { 58 public: 59 /** Constructor. */ Message(const RawBuffer & buffer,SinkBase * sink,bool allow_overwrite)60 Message(const RawBuffer &buffer, SinkBase *sink, bool allow_overwrite) 61 : _buffer(buffer), _sink(sink), _allow_overwrite(allow_overwrite) { } 62 /** Copy constructor. */ Message(const Message & other)63 Message(const Message &other) 64 : _buffer(other._buffer), _sink(other._sink), _allow_overwrite(other._allow_overwrite) { } 65 /** Assignment operator. */ operator =(const Message & other)66 const Message &operator= (const Message &other) { 67 _buffer = other._buffer; 68 _sink = other._sink; 69 _allow_overwrite = other._allow_overwrite; 70 return *this; 71 } 72 /** Returns the buffer of the message. **/ buffer() const73 inline const RawBuffer &buffer() const { return _buffer; } 74 /** Returns the buffer of the message. **/ buffer()75 inline RawBuffer &buffer() { return _buffer; } 76 /** Returns the destination of the message. **/ sink() const77 inline SinkBase *sink() const { return _sink; } 78 /** If true, the sender allows to overwrite the content of the buffer. **/ allowOverwrite() const79 inline bool allowOverwrite() const { return _allow_overwrite; } 80 81 protected: 82 /** The buffer being send. */ 83 RawBuffer _buffer; 84 /** The destination. */ 85 SinkBase *_sink; 86 /** If true, the sender allows to overwrite the buffer. */ 87 bool _allow_overwrite; 88 }; 89 90 protected: 91 /** Hidden constructor, use @c get to get the singleton instance. */ 92 Queue(); 93 94 public: 95 /** Destructor. */ 96 virtual ~Queue(); 97 98 /** Get a reference to the global instance of the queue. If @c parallel is @c true, the 99 * queue will be constructed in parallel mode, means the queue loop will be executed in a 100 * separate thread. Please note that this option is only used in the first call, when the 101 * singleton instance of the queue is created. */ 102 static Queue &get(); 103 104 /** Adds a buffer and its receiver to the queue. If @c allow_overwrite is @c true, the 105 * the receiver is allowed to overwrite the content of the buffer. */ 106 void send(const RawBuffer &buffer, SinkBase *sink, bool allow_overwrite=false); 107 108 /** Enters the queue loop, if @c parallel=true was passed to @c get, @c exec will execute the 109 * queue loop in a separate thread and returns immediately. Otherwise, @c exec will block until 110 * the queue is stopped. */ 111 void start(); 112 113 /** Signals the queue to stop processing. */ 114 void stop(); 115 /** Wait for the queue to exit the queue loop. */ 116 void wait(); 117 118 /** Returns true if the queue loop is stopped. */ 119 bool isStopped() const; 120 /** Returns true if the queue loop is running. */ 121 bool isRunning() const; 122 123 /** Adds a callback to the idle event. The method gets called repeatedly while the queue looop 124 * is idle, means that there are no messages to be processed. This can be used to trigger an 125 * input source to read more data. */ 126 template <class T> addIdle(T * instance,void (T::* function)(void))127 void addIdle(T *instance, void (T::*function)(void)) { 128 _idle.push_back(new Delegate<T>(instance, function)); 129 } 130 131 /** Removes all callbacks of the given instance from the idle signal. */ 132 template <class T> remIdle(T * instance)133 void remIdle(T *instance) { 134 std::list<DelegateInterface *>::iterator item = _idle.begin(); 135 while (item != _idle.end()) { 136 if ( (*item)->instance() == ((void *)instance)) { 137 item = _idle.erase(item); 138 } else { 139 item++; 140 } 141 } 142 } 143 144 /** Adds a callback to the start event. The method gets called once the queue loop is started. */ 145 template <class T> addStart(T * instance,void (T::* function)(void))146 void addStart(T *instance, void (T::*function)(void)) { 147 _onStart.push_back(new Delegate<T>(instance, function)); 148 } 149 150 /** Removes all callbacks of the given instance from the start signal. */ 151 template <class T> remStart(T * instance)152 void remStart(T *instance) { 153 std::list<DelegateInterface *>::iterator item = _onStart.begin(); 154 while (item != _onStart.end()) { 155 if ( (*item)->instance() == ((void *)instance)) { 156 item = _onStart.erase(item); 157 } else { 158 item++; 159 } 160 } 161 } 162 163 /** Adds a callback to the stop event. The method gets called once the queue loop is stopped. */ 164 template <class T> addStop(T * instance,void (T::* function)(void))165 void addStop(T *instance, void (T::*function)(void)) { 166 _onStop.push_back(new Delegate<T>(instance, function)); 167 } 168 169 /** Removes all callbacks of the given instance from the stop signal. */ 170 template <class T> remStop(T * instance)171 void remStop(T *instance) { 172 std::list<DelegateInterface *>::iterator item = _onStop.begin(); 173 while (item != _onStop.end()) { 174 if ( (*item)->instance() == ((void *)instance)) { 175 item = _onStop.erase(item); 176 } else { 177 item++; 178 } 179 } 180 } 181 182 protected: 183 /** The actual queue loop. */ 184 void _main(); 185 /** Emits the idle signal. */ 186 void _signalIdle(); 187 /** Emits the start signal. */ 188 void _signalStart(); 189 /** Emits the stop signal. */ 190 void _signalStop(); 191 192 protected: 193 /** While this is true, the queue loop is executed. */ 194 bool _running; 195 /** If @c _parallel is true, the thread of the queue loop. */ 196 pthread_t _thread; 197 /** The queue mutex. */ 198 pthread_mutex_t _queue_lock; 199 /** The queue condition. */ 200 pthread_cond_t _queue_cond; 201 202 /** The message queue. */ 203 std::list<Message> _queue; 204 /** Idle event callbacks. */ 205 std::list<DelegateInterface *> _idle; 206 /** Start event callbacks. */ 207 std::list<DelegateInterface *> _onStart; 208 /** Stop event callbacks. */ 209 std::list<DelegateInterface *> _onStop; 210 211 private: 212 /** The singleton instance. */ 213 static Queue *_instance; 214 /** The pthread function. */ 215 static void *__thread_start(void *ptr); 216 }; 217 218 219 } 220 #endif // __SDR_QUEUE_HH__ 221