1 /* 2 * Copyright Andrey Semashev 2007 - 2015. 3 * Distributed under the Boost Software License, Version 1.0. 4 * (See accompanying file LICENSE_1_0.txt or copy at 5 * http://www.boost.org/LICENSE_1_0.txt) 6 */ 7 /*! 8 * \file async_frontend.hpp 9 * \author Andrey Semashev 10 * \date 14.07.2009 11 * 12 * The header contains implementation of asynchronous sink frontend. 13 */ 14 15 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ 16 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ 17 18 #include <boost/log/detail/config.hpp> 19 20 #ifdef BOOST_HAS_PRAGMA_ONCE 21 #pragma once 22 #endif 23 24 #if defined(BOOST_LOG_NO_THREADS) 25 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment 26 #endif 27 28 #include <boost/bind.hpp> 29 #include <boost/static_assert.hpp> 30 #include <boost/smart_ptr/shared_ptr.hpp> 31 #include <boost/smart_ptr/make_shared_object.hpp> 32 #include <boost/thread/locks.hpp> 33 #include <boost/thread/recursive_mutex.hpp> 34 #include <boost/thread/thread.hpp> 35 #include <boost/thread/condition_variable.hpp> 36 #include <boost/log/exceptions.hpp> 37 #include <boost/log/detail/locking_ptr.hpp> 38 #include <boost/log/detail/parameter_tools.hpp> 39 #include <boost/log/core/record_view.hpp> 40 #include <boost/log/sinks/basic_sink_frontend.hpp> 41 #include <boost/log/sinks/frontend_requirements.hpp> 42 #include <boost/log/sinks/unbounded_fifo_queue.hpp> 43 #include <boost/log/keywords/start_thread.hpp> 44 #include <boost/log/detail/header.hpp> 45 46 namespace boost { 47 48 BOOST_LOG_OPEN_NAMESPACE 49 50 namespace sinks { 51 52 #ifndef BOOST_LOG_DOXYGEN_PASS 53 54 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, types)\ 55 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ 56 explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ 57 base_type(true),\ 58 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ 59 m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\ 60 m_StopRequested(false),\ 61 m_FlushRequested(false)\ 62 {\ 63 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ 64 start_feeding_thread();\ 65 }\ 66 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ 67 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ 68 base_type(true),\ 69 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ 70 m_pBackend(backend),\ 71 m_StopRequested(false),\ 72 m_FlushRequested(false)\ 73 {\ 74 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ 75 start_feeding_thread();\ 76 } 77 78 #endif // BOOST_LOG_DOXYGEN_PASS 79 80 /*! 81 * \brief Asynchronous logging sink frontend 82 * 83 * The frontend starts a separate thread on construction. All logging records are passed 84 * to the backend in this dedicated thread only. 85 */ 86 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue > 87 class asynchronous_sink : 88 public aux::make_sink_frontend_base< SinkBackendT >::type, 89 public QueueingStrategyT 90 { 91 typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type; 92 typedef QueueingStrategyT queue_base_type; 93 94 private: 95 //! Backend synchronization mutex type 96 typedef boost::recursive_mutex backend_mutex_type; 97 //! Frontend synchronization mutex type 98 typedef typename base_type::mutex_type frontend_mutex_type; 99 100 //! A scope guard that implements thread ID management 101 class scoped_thread_id 102 { 103 private: 104 frontend_mutex_type& m_Mutex; 105 condition_variable_any& m_Cond; 106 thread::id& m_ThreadID; 107 bool volatile& m_StopRequested; 108 109 public: 110 //! Initializing constructor scoped_thread_id(frontend_mutex_type & mut,condition_variable_any & cond,thread::id & tid,bool volatile & sr)111 scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, bool volatile& sr) 112 : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) 113 { 114 lock_guard< frontend_mutex_type > lock(m_Mutex); 115 if (m_ThreadID != thread::id()) 116 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); 117 m_ThreadID = this_thread::get_id(); 118 } 119 //! Initializing constructor scoped_thread_id(unique_lock<frontend_mutex_type> & l,condition_variable_any & cond,thread::id & tid,bool volatile & sr)120 scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, bool volatile& sr) 121 : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) 122 { 123 unique_lock< frontend_mutex_type > lock(move(l)); 124 if (m_ThreadID != thread::id()) 125 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); 126 m_ThreadID = this_thread::get_id(); 127 } 128 //! Destructor ~scoped_thread_id()129 ~scoped_thread_id() 130 { 131 try 132 { 133 lock_guard< frontend_mutex_type > lock(m_Mutex); 134 m_StopRequested = false; 135 m_ThreadID = thread::id(); 136 m_Cond.notify_all(); 137 } 138 catch (...) 139 { 140 } 141 } 142 143 private: 144 scoped_thread_id(scoped_thread_id const&); 145 scoped_thread_id& operator= (scoped_thread_id const&); 146 }; 147 148 //! A scope guard that resets a flag on destructor 149 class scoped_flag 150 { 151 private: 152 frontend_mutex_type& m_Mutex; 153 condition_variable_any& m_Cond; 154 volatile bool& m_Flag; 155 156 public: scoped_flag(frontend_mutex_type & mut,condition_variable_any & cond,volatile bool & f)157 explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, volatile bool& f) : 158 m_Mutex(mut), m_Cond(cond), m_Flag(f) 159 { 160 } ~scoped_flag()161 ~scoped_flag() 162 { 163 try 164 { 165 lock_guard< frontend_mutex_type > lock(m_Mutex); 166 m_Flag = false; 167 m_Cond.notify_all(); 168 } 169 catch (...) 170 { 171 } 172 } 173 174 private: 175 scoped_flag(scoped_flag const&); 176 scoped_flag& operator= (scoped_flag const&); 177 }; 178 179 public: 180 //! Sink implementation type 181 typedef SinkBackendT sink_backend_type; 182 //! \cond 183 BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met"); 184 //! \endcond 185 186 #ifndef BOOST_LOG_DOXYGEN_PASS 187 188 //! A pointer type that locks the backend until it's destroyed 189 typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr; 190 191 #else // BOOST_LOG_DOXYGEN_PASS 192 193 //! A pointer type that locks the backend until it's destroyed 194 typedef implementation_defined locked_backend_ptr; 195 196 #endif // BOOST_LOG_DOXYGEN_PASS 197 198 private: 199 //! Synchronization mutex 200 backend_mutex_type m_BackendMutex; 201 //! Pointer to the backend 202 const shared_ptr< sink_backend_type > m_pBackend; 203 204 //! Dedicated record feeding thread 205 thread m_DedicatedFeedingThread; 206 //! Feeding thread ID 207 thread::id m_FeedingThreadID; 208 //! Condition variable to implement blocking operations 209 condition_variable_any m_BlockCond; 210 211 //! The flag indicates that the feeding loop has to be stopped 212 volatile bool m_StopRequested; // TODO: make it a real atomic 213 //! The flag indicates that queue flush has been requested 214 volatile bool m_FlushRequested; // TODO: make it a real atomic 215 216 public: 217 /*! 218 * Default constructor. Constructs the sink backend instance. 219 * Requires the backend to be default-constructible. 220 * 221 * \param start_thread If \c true, the frontend creates a thread to feed 222 * log records to the backend. Otherwise no thread is 223 * started and it is assumed that the user will call 224 * either \c run or \c feed_records himself. 225 */ asynchronous_sink(bool start_thread=true)226 asynchronous_sink(bool start_thread = true) : 227 base_type(true), 228 m_pBackend(boost::make_shared< sink_backend_type >()), 229 m_StopRequested(false), 230 m_FlushRequested(false) 231 { 232 if (start_thread) 233 start_feeding_thread(); 234 } 235 /*! 236 * Constructor attaches user-constructed backend instance 237 * 238 * \param backend Pointer to the backend instance. 239 * \param start_thread If \c true, the frontend creates a thread to feed 240 * log records to the backend. Otherwise no thread is 241 * started and it is assumed that the user will call 242 * either \c run or \c feed_records himself. 243 * 244 * \pre \a backend is not \c NULL. 245 */ asynchronous_sink(shared_ptr<sink_backend_type> const & backend,bool start_thread=true)246 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) : 247 base_type(true), 248 m_pBackend(backend), 249 m_StopRequested(false), 250 m_FlushRequested(false) 251 { 252 if (start_thread) 253 start_feeding_thread(); 254 } 255 256 // Constructors that pass arbitrary parameters to the backend constructor 257 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~) 258 259 /*! 260 * Destructor. Implicitly stops the dedicated feeding thread, if one is running. 261 */ ~asynchronous_sink()262 ~asynchronous_sink() 263 { 264 boost::this_thread::disable_interruption no_interrupts; 265 stop(); 266 } 267 268 /*! 269 * Locking accessor to the attached backend 270 */ locked_backend()271 locked_backend_ptr locked_backend() 272 { 273 return locked_backend_ptr(m_pBackend, m_BackendMutex); 274 } 275 276 /*! 277 * Enqueues the log record to the backend 278 */ consume(record_view const & rec)279 void consume(record_view const& rec) 280 { 281 if (m_FlushRequested) 282 { 283 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); 284 // Wait until flush is done 285 while (m_FlushRequested) 286 m_BlockCond.wait(lock); 287 } 288 queue_base_type::enqueue(rec); 289 } 290 291 /*! 292 * The method attempts to pass logging record to the backend 293 */ try_consume(record_view const & rec)294 bool try_consume(record_view const& rec) 295 { 296 if (!m_FlushRequested) 297 { 298 return queue_base_type::try_enqueue(rec); 299 } 300 else 301 return false; 302 } 303 304 /*! 305 * The method starts record feeding loop and effectively blocks until either of this happens: 306 * 307 * \li the thread is interrupted due to either standard thread interruption or a call to \c stop 308 * \li an exception is thrown while processing a log record in the backend, and the exception is 309 * not terminated by the exception handler, if one is installed 310 * 311 * \pre The sink frontend must be constructed without spawning a dedicated thread 312 */ run()313 void run() 314 { 315 // First check that no other thread is running 316 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); 317 318 // Now start the feeding loop 319 while (true) 320 { 321 do_feed_records(); 322 if (!m_StopRequested) 323 { 324 // Block until new record is available 325 record_view rec; 326 if (queue_base_type::dequeue_ready(rec)) 327 base_type::feed_record(rec, m_BackendMutex, *m_pBackend); 328 } 329 else 330 break; 331 } 332 } 333 334 /*! 335 * The method softly interrupts record feeding loop. This method must be called when the \c run 336 * method execution has to be interrupted. Unlike regular thread interruption, calling 337 * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend 338 * will attempt to finish its business with the record in progress and return afterwards. 339 * This method can be called either if the sink was created with a dedicated thread, 340 * or if the feeding loop was initiated by user. 341 * 342 * \note Returning from this method does not guarantee that there are no records left buffered 343 * in the sink frontend. It is possible that log records keep coming during and after this 344 * method is called. At some point of execution of this method log records stop being processed, 345 * and all records that come after this point are put into the queue. These records will be 346 * processed upon further calls to \c run or \c feed_records. 347 */ stop()348 void stop() 349 { 350 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); 351 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) 352 { 353 try 354 { 355 m_StopRequested = true; 356 queue_base_type::interrupt_dequeue(); 357 while (m_StopRequested) 358 m_BlockCond.wait(lock); 359 } 360 catch (...) 361 { 362 m_StopRequested = false; 363 throw; 364 } 365 366 lock.unlock(); 367 m_DedicatedFeedingThread.join(); 368 } 369 } 370 371 /*! 372 * The method feeds log records that may have been buffered to the backend and returns 373 * 374 * \pre The sink frontend must be constructed without spawning a dedicated thread 375 */ feed_records()376 void feed_records() 377 { 378 // First check that no other thread is running 379 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); 380 381 // Now start the feeding loop 382 do_feed_records(); 383 } 384 385 /*! 386 * The method feeds all log records that may have been buffered to the backend and returns. 387 * Unlike \c feed_records, in case of ordering queueing the method also feeds records 388 * that were enqueued during the ordering window, attempting to empty the queue completely. 389 * 390 * \pre The sink frontend must be constructed without spawning a dedicated thread 391 */ flush()392 void flush() 393 { 394 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); 395 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) 396 { 397 // There is already a thread feeding records, let it do the job 398 m_FlushRequested = true; 399 queue_base_type::interrupt_dequeue(); 400 while (!m_StopRequested && m_FlushRequested) 401 m_BlockCond.wait(lock); 402 403 // The condition may have been signalled when the feeding thread was finishing. 404 // In that case records may not have been flushed, and we do the flush ourselves. 405 if (m_FeedingThreadID != thread::id()) 406 return; 407 } 408 409 m_FlushRequested = true; 410 411 // Flush records ourselves. The guard releases the lock. 412 scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested); 413 414 do_feed_records(); 415 } 416 417 private: 418 #ifndef BOOST_LOG_DOXYGEN_PASS 419 //! The method spawns record feeding thread start_feeding_thread()420 void start_feeding_thread() 421 { 422 boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread); 423 } 424 425 //! The record feeding loop do_feed_records()426 void do_feed_records() 427 { 428 while (!m_StopRequested) 429 { 430 record_view rec; 431 bool dequeued = false; 432 if (!m_FlushRequested) 433 dequeued = queue_base_type::try_dequeue_ready(rec); 434 else 435 dequeued = queue_base_type::try_dequeue(rec); 436 437 if (dequeued) 438 base_type::feed_record(rec, m_BackendMutex, *m_pBackend); 439 else 440 break; 441 } 442 443 if (m_FlushRequested) 444 { 445 scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested); 446 base_type::flush_backend(m_BackendMutex, *m_pBackend); 447 } 448 } 449 #endif // BOOST_LOG_DOXYGEN_PASS 450 }; 451 452 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL 453 454 } // namespace sinks 455 456 BOOST_LOG_CLOSE_NAMESPACE // namespace log 457 458 } // namespace boost 459 460 #include <boost/log/detail/footer.hpp> 461 462 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ 463