1 #ifndef __BLOCKING_QUEUE__
2 #define __BLOCKING_QUEUE__
3 
4 #include "boost/thread/thread.hpp"
5 #include "boost/thread/condition.hpp"
6 #include <deque>
7 
8 namespace Base
9 {
10 
11 template <typename T> class CBlockingQueue
12 {
13 public:
14     typedef boost::shared_lock<boost::shared_mutex> reader_lock;
15     typedef boost::unique_lock<boost::shared_mutex> writer_lock;
16 	typedef boost::upgrade_lock<boost::shared_mutex> upg_reader_lock;
17 	typedef boost::upgrade_to_unique_lock< boost::shared_mutex > upgrade_to_writer;
18 
19 
20 	//typedef boost::mutex::scoped_lock   scoped_lock;
21 
CBlockingQueue()22 	CBlockingQueue() { m_maxQueueElements = 0xFFFFFFFF; }
23 
24 	bool push( const T& el, bool pushBack = true,  bool checkMax = true )
25 	{
26 		writer_lock lock( m_mutex );
27 
28 		if ( checkMax && m_queue.size() == m_maxQueueElements )
29 		{
30 			while ( m_queue.size() == m_maxQueueElements )
31 				m_fullCond.wait(lock);
32 		}
33 
34 		if ( pushBack )
35 			m_queue.push_back( el );
36 		else
37 			m_queue.push_front( el );
38 
39 		m_emptyCond.notify_all();
40 		return true;
41 	}
42 
43 	bool peek( T& el, bool wait = false, bool popFront = true )
44 	{
45 		upg_reader_lock lock(m_mutex);
46 
47 		if ( wait )
48 		{
49 			while (m_queue.empty())
50 			{
51 				upgrade_to_writer wlock(lock);
52 				m_emptyCond.wait(m_mutex);
53 			}
54 		}
55 		else
56 		{
57 			if ( m_queue.empty() )
58 				return false;
59 		}
60 
61 		if ( popFront )
62 		{
63 			el = m_queue.front();
64 			//m_queue.pop_front();
65 		}
66 		else
67 		{
68 			el = m_queue.back();
69 			//m_queue.pop_back();
70 		}
71 
72 		if ( m_queue.size() < m_maxQueueElements )
73 			m_fullCond.notify_all();
74 
75 		if (m_queue.empty())
76 			m_nonEmptyCond.notify_all();
77 
78 		return true;
79 	}
80 
peekn(T & el,size_t n)81     void peekn( T& el, size_t n )
82     {
83         reader_lock lock(m_mutex);
84 
85         el = m_queue[n];
86     }
87 
88 	bool pop( T& el, bool wait = false, bool popFront = true )
89 	{
90 		upg_reader_lock lock(m_mutex);
91 
92 		if ( wait )
93 		{
94 			while (m_queue.empty())
95 			{
96 				upgrade_to_writer wlock(lock);
97 				m_emptyCond.wait(m_mutex);
98 			}
99 		}
100 		else
101 		{
102 			if (m_queue.empty())
103 			{
104 				m_nonEmptyCond.notify_all();
105 				return false;
106 			}
107 		}
108 
109 		upgrade_to_writer wlock(lock);
110 
111 		if ( popFront )
112 		{
113 			el = m_queue.front();
114 			m_queue.pop_front();
115 		}
116 		else
117 		{
118 			el = m_queue.back();
119 			m_queue.pop_back();
120 		}
121 
122 		if ( m_queue.size() < m_maxQueueElements )
123 			m_fullCond.notify_all();
124 
125 		if (m_queue.empty())
126 			m_nonEmptyCond.notify_all();
127 
128 		return true;
129 	}
130 
empty()131 	bool empty()
132 	{
133 		reader_lock lock( m_mutex );
134 		return m_queue.empty();
135 	}
136 
size()137 	size_t size()
138 	{
139 		reader_lock lock( m_mutex );
140 		return m_queue.size();
141 	}
142 
clear(size_t leave)143 	void clear( size_t leave )
144 	{
145 		writer_lock lock( m_mutex );
146 
147 		if ( leave == 0 )
148 		{
149 			m_queue.clear();
150 			m_nonEmptyCond.notify_all();
151 		}
152 		else
153 		{
154 			size_t sz = m_queue.size();
155 
156 			while (	sz > leave )
157 			{
158 				m_queue.pop_back();
159 
160 				sz--;
161 			}
162 		}
163 
164 		if ( leave < m_maxQueueElements )
165 			m_fullCond.notify_all();
166 	}
167 
setMaxQueueElements(size_t max)168 	void setMaxQueueElements( size_t max )
169 	{
170 		writer_lock lock( m_mutex );
171 
172 		m_maxQueueElements = max;
173 	}
174 
waitForEmpty()175 	bool waitForEmpty()
176 	{
177 		upg_reader_lock lock( m_mutex );
178 
179 		if (m_queue.empty())
180 			return true;
181 
182 		upgrade_to_writer wlock(lock);
183 
184 		m_nonEmptyCond.wait( m_mutex );
185 
186 		return true;
187 	}
188 
189 private:
190 	boost::shared_mutex m_mutex;
191 	boost::condition m_fullCond;
192 	boost::condition m_emptyCond;
193 	boost::condition m_nonEmptyCond;
194 	size_t m_maxQueueElements;
195 	std::deque<T> m_queue;
196 };
197 
198 };
199 
200 #endif //__BLOCKING_QUEUE__
201