1 /*
2  * SObjectizer 5
3  */
4 
5 /*!
6  * \since
7  * v.5.5.11
8  *
9  * \file
10  * \brief Various traits for MPMC queues.
11  */
12 
13 #include <so_5/disp/mpmc_queue_traits/pub.hpp>
14 
15 #include <so_5/spinlocks.hpp>
16 
17 #include <mutex>
18 #include <condition_variable>
19 
20 namespace so_5 {
21 
22 namespace disp {
23 
24 namespace mpmc_queue_traits {
25 
26 namespace combined_lock
27 {
28 
29 using spinlock_t = so_5::default_spinlock_t;
30 
31 //
32 // actual_cond_t
33 //
34 /*!
35  * \since
36  * v.5.5.11
37  *
38  * \brief Impementation of condition object for the case of combined lock.
39  */
40 class actual_cond_t : public condition_t
41 	{
42 		//! Spinlock from parent lock object.
43 		spinlock_t & m_spinlock;
44 		//! Max waiting time for busy waiting stage.
45 		const std::chrono::high_resolution_clock::duration m_waiting_time;
46 
47 		//! An indicator of notification for condition object.
48 		bool m_signaled = { false };
49 
50 		//! Personal mutex to be used with condition variable.
51 		std::mutex m_mutex;
52 		//! Condition variable for long-time waiting.
53 		std::condition_variable m_condition;
54 
55 	public :
56 		//! Initializing constructor.
actual_cond_t(spinlock_t & spinlock,std::chrono::high_resolution_clock::duration waiting_time)57 		actual_cond_t(
58 			//! Spinlock from parent lock object.
59 			spinlock_t & spinlock,
60 			//! Max waiting time for busy waiting stage.
61 			std::chrono::high_resolution_clock::duration waiting_time )
62 			:	m_spinlock( spinlock )
63 			,	m_waiting_time( std::move(waiting_time) )
64 			{}
65 
66 		virtual void
wait()67 		wait() noexcept override
68 			{
69 				using hrc = std::chrono::high_resolution_clock;
70 
71 				/*
72 				 * NOTE: spinlock of the parent lock object is already
73 				 * acquired by the current thread.
74 				 */
75 				m_signaled = false;
76 
77 				//
78 				// Busy waiting stage.
79 				//
80 
81 				// Limitation for busy waiting stage.
82 				const auto stop_point = hrc::now() + m_waiting_time;
83 
84 				do
85 					{
86 						m_spinlock.unlock();
87 
88 						std::this_thread::yield();
89 
90 						m_spinlock.lock();
91 
92 						if( m_signaled )
93 							return;
94 					}
95 				while( stop_point > hrc::now() );
96 
97 				// If we are here then busy waiting stage failed (condition
98 				// is not signaled yet) and we must go to long-time waiting.
99 				//
100 				// NOTE: spinlock of the parent lock object is acquired by
101 				// the current thread.
102 
103 				//
104 				// Long-time waiting stage.
105 				//
106 
107 				// Personal mutex object must be acquired.
108 				std::unique_lock< std::mutex > mutex_lock{ m_mutex };
109 				// Spinlock of the parent lock can be released now.
110 				m_spinlock.unlock();
111 
112 				// Wait on condition_variable.
113 				m_condition.wait( mutex_lock, [this]{ return m_signaled; } );
114 
115 				// Spinlock must be reacquired to return the parent lock
116 				// in the state at the call to wait().
117 				m_spinlock.lock();
118 			}
119 
120 		virtual void
notify()121 		notify() noexcept override
122 			{
123 				std::lock_guard< std::mutex > mutex_lock{ m_mutex };
124 
125 				m_signaled = true;
126 
127 				m_condition.notify_one();
128 			}
129 	};
130 
131 //
132 // actual_lock_t
133 //
134 /*!
135  * \since
136  * v.5.5.11
137  *
138  * \brief Actual implementation of combined lock object.
139  */
140 class actual_lock_t : public lock_t
141 	{
142 		//! Common spinlock for locking of producers and consumers.
143 		spinlock_t m_spinlock;
144 		//! Max waiting time for busy waiting stage.
145 		const std::chrono::high_resolution_clock::duration m_waiting_time;
146 
147 	public :
148 		//! Initializing constructor.
actual_lock_t(std::chrono::high_resolution_clock::duration waiting_time)149 		actual_lock_t(
150 			//! Max waiting time for busy waiting stage.
151 			std::chrono::high_resolution_clock::duration waiting_time )
152 			:	m_waiting_time{ std::move(waiting_time) }
153 			{}
154 
155 		virtual void
lock()156 		lock() noexcept override
157 			{
158 				m_spinlock.lock();
159 			}
160 
161 		virtual void
unlock()162 		unlock() noexcept override
163 			{
164 				m_spinlock.unlock();
165 			}
166 
167 		virtual condition_unique_ptr_t
allocate_condition()168 		allocate_condition() override
169 			{
170 				return condition_unique_ptr_t{
171 					new actual_cond_t{ m_spinlock, m_waiting_time } };
172 			}
173 	};
174 
175 } /* namespace combined_lock */
176 
177 namespace simple_lock
178 {
179 
180 //
181 // actual_cond_t
182 //
183 /*!
184  * \since
185  * v.5.5.11
186  *
187  * \brief Actual implementation of condition object for the case
188  * of simple locking on mutex and condition_variable.
189  */
190 class actual_cond_t : public condition_t
191 	{
192 		//! An indicator of notification for condition object.
193 		bool m_signaled = { false };
194 
195 		//! Common mutex from the parent lock.
196 		std::mutex & m_mutex;
197 		//! Personal condition_variable object for condition object owner.
198 		std::condition_variable m_condition;
199 
200 	public :
201 		//! Initializing constructor.
actual_cond_t(std::mutex & mutex)202 		actual_cond_t(
203 			//! Common mutex from the parent lock.
204 			std::mutex & mutex )
205 			:	m_mutex( mutex )
206 			{}
207 
208 		virtual void
wait()209 		wait() noexcept override
210 			{
211 				m_signaled = false;
212 
213 				// Common mutex is already acquired. So we can't reacquire it.
214 				std::unique_lock< std::mutex > mutex_lock{ m_mutex, std::adopt_lock };
215 				m_condition.wait( mutex_lock, [this]{ return m_signaled; } );
216 				// Common mutex must remain acquired. So we disable unique_lock
217 				// to release mutex in the destructor.
218 				mutex_lock.release();
219 			}
220 
221 		virtual void
notify()222 		notify() noexcept override
223 			{
224 				m_signaled = true;
225 
226 				m_condition.notify_one();
227 			}
228 	};
229 
230 //
231 // actual_lock_t
232 //
233 /*!
234  * \since
235  * v.5.5.11
236  *
237  * \brief Actual implementation of lock object for simple locking
238  * on mutex and condition variables.
239  */
240 class actual_lock_t : public lock_t
241 	{
242 		//! Common mutex for all producers and consumers.
243 		std::mutex m_mutex;
244 
245 	public :
actual_lock_t()246 		actual_lock_t()
247 			{}
248 
249 		virtual void
lock()250 		lock() noexcept override
251 			{
252 				m_mutex.lock();
253 			}
254 
255 		virtual void
unlock()256 		unlock() noexcept override
257 			{
258 				m_mutex.unlock();
259 			}
260 
261 		virtual condition_unique_ptr_t
allocate_condition()262 		allocate_condition() override
263 			{
264 				return condition_unique_ptr_t{ new actual_cond_t{ m_mutex } };
265 			}
266 	};
267 
268 } /* namespace simple_lock */
269 
270 //
271 // combined_lock_factory
272 //
273 SO_5_FUNC lock_factory_t
combined_lock_factory(std::chrono::high_resolution_clock::duration waiting_time)274 combined_lock_factory(
275 	std::chrono::high_resolution_clock::duration waiting_time )
276 	{
277 		return [waiting_time] {
278 				return lock_unique_ptr_t{ new combined_lock::actual_lock_t{
279 					std::move(waiting_time) } };
280 			};
281 	}
282 
283 //
284 // simple_lock_factory
285 //
286 SO_5_FUNC lock_factory_t
simple_lock_factory()287 simple_lock_factory()
288 	{
289 		return [] {
290 				return lock_unique_ptr_t{ new simple_lock::actual_lock_t{} };
291 			};
292 	}
293 
294 } /* namespace mpmc_queue_traits */
295 
296 } /* namespace disp */
297 
298 } /* namespace so_5 */
299 
300 
301