1 #ifndef __BLOCKING_QUEUE__ 2 #define __BLOCKING_QUEUE__ 3 4 #include "boost/thread/thread.hpp" 5 #include "boost/thread/condition.hpp" 6 #include <deque> 7 8 namespace Base 9 { 10 11 template <typename T> class CBlockingQueue 12 { 13 public: 14 typedef boost::shared_lock<boost::shared_mutex> reader_lock; 15 typedef boost::unique_lock<boost::shared_mutex> writer_lock; 16 typedef boost::upgrade_lock<boost::shared_mutex> upg_reader_lock; 17 typedef boost::upgrade_to_unique_lock< boost::shared_mutex > upgrade_to_writer; 18 19 20 //typedef boost::mutex::scoped_lock scoped_lock; 21 CBlockingQueue()22 CBlockingQueue() { m_maxQueueElements = 0xFFFFFFFF; } 23 24 bool push( const T& el, bool pushBack = true, bool checkMax = true ) 25 { 26 writer_lock lock( m_mutex ); 27 28 if ( checkMax && m_queue.size() == m_maxQueueElements ) 29 { 30 while ( m_queue.size() == m_maxQueueElements ) 31 m_fullCond.wait(lock); 32 } 33 34 if ( pushBack ) 35 m_queue.push_back( el ); 36 else 37 m_queue.push_front( el ); 38 39 m_emptyCond.notify_all(); 40 return true; 41 } 42 43 bool peek( T& el, bool wait = false, bool popFront = true ) 44 { 45 upg_reader_lock lock(m_mutex); 46 47 if ( wait ) 48 { 49 while (m_queue.empty()) 50 { 51 upgrade_to_writer wlock(lock); 52 m_emptyCond.wait(m_mutex); 53 } 54 } 55 else 56 { 57 if ( m_queue.empty() ) 58 return false; 59 } 60 61 if ( popFront ) 62 { 63 el = m_queue.front(); 64 //m_queue.pop_front(); 65 } 66 else 67 { 68 el = m_queue.back(); 69 //m_queue.pop_back(); 70 } 71 72 if ( m_queue.size() < m_maxQueueElements ) 73 m_fullCond.notify_all(); 74 75 if (m_queue.empty()) 76 m_nonEmptyCond.notify_all(); 77 78 return true; 79 } 80 peekn(T & el,size_t n)81 void peekn( T& el, size_t n ) 82 { 83 reader_lock lock(m_mutex); 84 85 el = m_queue[n]; 86 } 87 88 bool pop( T& el, bool wait = false, bool popFront = true ) 89 { 90 upg_reader_lock lock(m_mutex); 91 92 if ( wait ) 93 { 94 while (m_queue.empty()) 95 { 96 upgrade_to_writer wlock(lock); 97 m_emptyCond.wait(m_mutex); 98 } 99 } 100 else 101 { 102 if (m_queue.empty()) 103 { 104 m_nonEmptyCond.notify_all(); 105 return false; 106 } 107 } 108 109 upgrade_to_writer wlock(lock); 110 111 if ( popFront ) 112 { 113 el = m_queue.front(); 114 m_queue.pop_front(); 115 } 116 else 117 { 118 el = m_queue.back(); 119 m_queue.pop_back(); 120 } 121 122 if ( m_queue.size() < m_maxQueueElements ) 123 m_fullCond.notify_all(); 124 125 if (m_queue.empty()) 126 m_nonEmptyCond.notify_all(); 127 128 return true; 129 } 130 empty()131 bool empty() 132 { 133 reader_lock lock( m_mutex ); 134 return m_queue.empty(); 135 } 136 size()137 size_t size() 138 { 139 reader_lock lock( m_mutex ); 140 return m_queue.size(); 141 } 142 clear(size_t leave)143 void clear( size_t leave ) 144 { 145 writer_lock lock( m_mutex ); 146 147 if ( leave == 0 ) 148 { 149 m_queue.clear(); 150 m_nonEmptyCond.notify_all(); 151 } 152 else 153 { 154 size_t sz = m_queue.size(); 155 156 while ( sz > leave ) 157 { 158 m_queue.pop_back(); 159 160 sz--; 161 } 162 } 163 164 if ( leave < m_maxQueueElements ) 165 m_fullCond.notify_all(); 166 } 167 setMaxQueueElements(size_t max)168 void setMaxQueueElements( size_t max ) 169 { 170 writer_lock lock( m_mutex ); 171 172 m_maxQueueElements = max; 173 } 174 waitForEmpty()175 bool waitForEmpty() 176 { 177 upg_reader_lock lock( m_mutex ); 178 179 if (m_queue.empty()) 180 return true; 181 182 upgrade_to_writer wlock(lock); 183 184 m_nonEmptyCond.wait( m_mutex ); 185 186 return true; 187 } 188 189 private: 190 boost::shared_mutex m_mutex; 191 boost::condition m_fullCond; 192 boost::condition m_emptyCond; 193 boost::condition m_nonEmptyCond; 194 size_t m_maxQueueElements; 195 std::deque<T> m_queue; 196 }; 197 198 }; 199 200 #endif //__BLOCKING_QUEUE__ 201