1 /* 2 * SObjectizer 5 3 */ 4 5 /*! 6 * \since 7 * v.5.5.11 8 * 9 * \file 10 * \brief Various traits for MPMC queues. 11 */ 12 13 #include <so_5/disp/mpmc_queue_traits/pub.hpp> 14 15 #include <so_5/spinlocks.hpp> 16 17 #include <mutex> 18 #include <condition_variable> 19 20 namespace so_5 { 21 22 namespace disp { 23 24 namespace mpmc_queue_traits { 25 26 namespace combined_lock 27 { 28 29 using spinlock_t = so_5::default_spinlock_t; 30 31 // 32 // actual_cond_t 33 // 34 /*! 35 * \since 36 * v.5.5.11 37 * 38 * \brief Impementation of condition object for the case of combined lock. 39 */ 40 class actual_cond_t : public condition_t 41 { 42 //! Spinlock from parent lock object. 43 spinlock_t & m_spinlock; 44 //! Max waiting time for busy waiting stage. 45 const std::chrono::high_resolution_clock::duration m_waiting_time; 46 47 //! An indicator of notification for condition object. 48 bool m_signaled = { false }; 49 50 //! Personal mutex to be used with condition variable. 51 std::mutex m_mutex; 52 //! Condition variable for long-time waiting. 53 std::condition_variable m_condition; 54 55 public : 56 //! Initializing constructor. actual_cond_t(spinlock_t & spinlock,std::chrono::high_resolution_clock::duration waiting_time)57 actual_cond_t( 58 //! Spinlock from parent lock object. 59 spinlock_t & spinlock, 60 //! Max waiting time for busy waiting stage. 61 std::chrono::high_resolution_clock::duration waiting_time ) 62 : m_spinlock( spinlock ) 63 , m_waiting_time( std::move(waiting_time) ) 64 {} 65 66 virtual void wait()67 wait() noexcept override 68 { 69 using hrc = std::chrono::high_resolution_clock; 70 71 /* 72 * NOTE: spinlock of the parent lock object is already 73 * acquired by the current thread. 74 */ 75 m_signaled = false; 76 77 // 78 // Busy waiting stage. 79 // 80 81 // Limitation for busy waiting stage. 82 const auto stop_point = hrc::now() + m_waiting_time; 83 84 do 85 { 86 m_spinlock.unlock(); 87 88 std::this_thread::yield(); 89 90 m_spinlock.lock(); 91 92 if( m_signaled ) 93 return; 94 } 95 while( stop_point > hrc::now() ); 96 97 // If we are here then busy waiting stage failed (condition 98 // is not signaled yet) and we must go to long-time waiting. 99 // 100 // NOTE: spinlock of the parent lock object is acquired by 101 // the current thread. 102 103 // 104 // Long-time waiting stage. 105 // 106 107 // Personal mutex object must be acquired. 108 std::unique_lock< std::mutex > mutex_lock{ m_mutex }; 109 // Spinlock of the parent lock can be released now. 110 m_spinlock.unlock(); 111 112 // Wait on condition_variable. 113 m_condition.wait( mutex_lock, [this]{ return m_signaled; } ); 114 115 // Spinlock must be reacquired to return the parent lock 116 // in the state at the call to wait(). 117 m_spinlock.lock(); 118 } 119 120 virtual void notify()121 notify() noexcept override 122 { 123 std::lock_guard< std::mutex > mutex_lock{ m_mutex }; 124 125 m_signaled = true; 126 127 m_condition.notify_one(); 128 } 129 }; 130 131 // 132 // actual_lock_t 133 // 134 /*! 135 * \since 136 * v.5.5.11 137 * 138 * \brief Actual implementation of combined lock object. 139 */ 140 class actual_lock_t : public lock_t 141 { 142 //! Common spinlock for locking of producers and consumers. 143 spinlock_t m_spinlock; 144 //! Max waiting time for busy waiting stage. 145 const std::chrono::high_resolution_clock::duration m_waiting_time; 146 147 public : 148 //! Initializing constructor. actual_lock_t(std::chrono::high_resolution_clock::duration waiting_time)149 actual_lock_t( 150 //! Max waiting time for busy waiting stage. 151 std::chrono::high_resolution_clock::duration waiting_time ) 152 : m_waiting_time{ std::move(waiting_time) } 153 {} 154 155 virtual void lock()156 lock() noexcept override 157 { 158 m_spinlock.lock(); 159 } 160 161 virtual void unlock()162 unlock() noexcept override 163 { 164 m_spinlock.unlock(); 165 } 166 167 virtual condition_unique_ptr_t allocate_condition()168 allocate_condition() override 169 { 170 return condition_unique_ptr_t{ 171 new actual_cond_t{ m_spinlock, m_waiting_time } }; 172 } 173 }; 174 175 } /* namespace combined_lock */ 176 177 namespace simple_lock 178 { 179 180 // 181 // actual_cond_t 182 // 183 /*! 184 * \since 185 * v.5.5.11 186 * 187 * \brief Actual implementation of condition object for the case 188 * of simple locking on mutex and condition_variable. 189 */ 190 class actual_cond_t : public condition_t 191 { 192 //! An indicator of notification for condition object. 193 bool m_signaled = { false }; 194 195 //! Common mutex from the parent lock. 196 std::mutex & m_mutex; 197 //! Personal condition_variable object for condition object owner. 198 std::condition_variable m_condition; 199 200 public : 201 //! Initializing constructor. actual_cond_t(std::mutex & mutex)202 actual_cond_t( 203 //! Common mutex from the parent lock. 204 std::mutex & mutex ) 205 : m_mutex( mutex ) 206 {} 207 208 virtual void wait()209 wait() noexcept override 210 { 211 m_signaled = false; 212 213 // Common mutex is already acquired. So we can't reacquire it. 214 std::unique_lock< std::mutex > mutex_lock{ m_mutex, std::adopt_lock }; 215 m_condition.wait( mutex_lock, [this]{ return m_signaled; } ); 216 // Common mutex must remain acquired. So we disable unique_lock 217 // to release mutex in the destructor. 218 mutex_lock.release(); 219 } 220 221 virtual void notify()222 notify() noexcept override 223 { 224 m_signaled = true; 225 226 m_condition.notify_one(); 227 } 228 }; 229 230 // 231 // actual_lock_t 232 // 233 /*! 234 * \since 235 * v.5.5.11 236 * 237 * \brief Actual implementation of lock object for simple locking 238 * on mutex and condition variables. 239 */ 240 class actual_lock_t : public lock_t 241 { 242 //! Common mutex for all producers and consumers. 243 std::mutex m_mutex; 244 245 public : actual_lock_t()246 actual_lock_t() 247 {} 248 249 virtual void lock()250 lock() noexcept override 251 { 252 m_mutex.lock(); 253 } 254 255 virtual void unlock()256 unlock() noexcept override 257 { 258 m_mutex.unlock(); 259 } 260 261 virtual condition_unique_ptr_t allocate_condition()262 allocate_condition() override 263 { 264 return condition_unique_ptr_t{ new actual_cond_t{ m_mutex } }; 265 } 266 }; 267 268 } /* namespace simple_lock */ 269 270 // 271 // combined_lock_factory 272 // 273 SO_5_FUNC lock_factory_t combined_lock_factory(std::chrono::high_resolution_clock::duration waiting_time)274combined_lock_factory( 275 std::chrono::high_resolution_clock::duration waiting_time ) 276 { 277 return [waiting_time] { 278 return lock_unique_ptr_t{ new combined_lock::actual_lock_t{ 279 std::move(waiting_time) } }; 280 }; 281 } 282 283 // 284 // simple_lock_factory 285 // 286 SO_5_FUNC lock_factory_t simple_lock_factory()287simple_lock_factory() 288 { 289 return [] { 290 return lock_unique_ptr_t{ new simple_lock::actual_lock_t{} }; 291 }; 292 } 293 294 } /* namespace mpmc_queue_traits */ 295 296 } /* namespace disp */ 297 298 } /* namespace so_5 */ 299 300 301