1 /*
2  *          Copyright Andrey Semashev 2007 - 2015.
3  * Distributed under the Boost Software License, Version 1.0.
4  *    (See accompanying file LICENSE_1_0.txt or copy at
5  *          http://www.boost.org/LICENSE_1_0.txt)
6  */
7 /*!
8  * \file   async_frontend.hpp
9  * \author Andrey Semashev
10  * \date   14.07.2009
11  *
12  * The header contains implementation of asynchronous sink frontend.
13  */
14 
15 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
16 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
17 
18 #include <boost/log/detail/config.hpp>
19 
20 #ifdef BOOST_HAS_PRAGMA_ONCE
21 #pragma once
22 #endif
23 
24 #if defined(BOOST_LOG_NO_THREADS)
25 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
26 #endif
27 
28 #include <boost/bind.hpp>
29 #include <boost/static_assert.hpp>
30 #include <boost/smart_ptr/shared_ptr.hpp>
31 #include <boost/smart_ptr/make_shared_object.hpp>
32 #include <boost/thread/locks.hpp>
33 #include <boost/thread/recursive_mutex.hpp>
34 #include <boost/thread/thread.hpp>
35 #include <boost/thread/condition_variable.hpp>
36 #include <boost/log/exceptions.hpp>
37 #include <boost/log/detail/locking_ptr.hpp>
38 #include <boost/log/detail/parameter_tools.hpp>
39 #include <boost/log/core/record_view.hpp>
40 #include <boost/log/sinks/basic_sink_frontend.hpp>
41 #include <boost/log/sinks/frontend_requirements.hpp>
42 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
43 #include <boost/log/keywords/start_thread.hpp>
44 #include <boost/log/detail/header.hpp>
45 
46 namespace boost {
47 
48 BOOST_LOG_OPEN_NAMESPACE
49 
50 namespace sinks {
51 
52 #ifndef BOOST_LOG_DOXYGEN_PASS
53 
54 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, types)\
55     template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
56     explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
57         base_type(true),\
58         queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
59         m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\
60         m_StopRequested(false),\
61         m_FlushRequested(false)\
62     {\
63         if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
64             start_feeding_thread();\
65     }\
66     template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
67     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
68         base_type(true),\
69         queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
70         m_pBackend(backend),\
71         m_StopRequested(false),\
72         m_FlushRequested(false)\
73     {\
74         if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
75             start_feeding_thread();\
76     }
77 
78 #endif // BOOST_LOG_DOXYGEN_PASS
79 
80 /*!
81  * \brief Asynchronous logging sink frontend
82  *
83  * The frontend starts a separate thread on construction. All logging records are passed
84  * to the backend in this dedicated thread only.
85  */
86 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
87 class asynchronous_sink :
88     public aux::make_sink_frontend_base< SinkBackendT >::type,
89     public QueueingStrategyT
90 {
91     typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
92     typedef QueueingStrategyT queue_base_type;
93 
94 private:
95     //! Backend synchronization mutex type
96     typedef boost::recursive_mutex backend_mutex_type;
97     //! Frontend synchronization mutex type
98     typedef typename base_type::mutex_type frontend_mutex_type;
99 
100     //! A scope guard that implements thread ID management
101     class scoped_thread_id
102     {
103     private:
104         frontend_mutex_type& m_Mutex;
105         condition_variable_any& m_Cond;
106         thread::id& m_ThreadID;
107         bool volatile& m_StopRequested;
108 
109     public:
110         //! Initializing constructor
scoped_thread_id(frontend_mutex_type & mut,condition_variable_any & cond,thread::id & tid,bool volatile & sr)111         scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, bool volatile& sr)
112             : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
113         {
114             lock_guard< frontend_mutex_type > lock(m_Mutex);
115             if (m_ThreadID != thread::id())
116                 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
117             m_ThreadID = this_thread::get_id();
118         }
119         //! Initializing constructor
scoped_thread_id(unique_lock<frontend_mutex_type> & l,condition_variable_any & cond,thread::id & tid,bool volatile & sr)120         scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, bool volatile& sr)
121             : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
122         {
123             unique_lock< frontend_mutex_type > lock(move(l));
124             if (m_ThreadID != thread::id())
125                 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
126             m_ThreadID = this_thread::get_id();
127         }
128         //! Destructor
~scoped_thread_id()129         ~scoped_thread_id()
130         {
131             try
132             {
133                 lock_guard< frontend_mutex_type > lock(m_Mutex);
134                 m_StopRequested = false;
135                 m_ThreadID = thread::id();
136                 m_Cond.notify_all();
137             }
138             catch (...)
139             {
140             }
141         }
142 
143     private:
144         scoped_thread_id(scoped_thread_id const&);
145         scoped_thread_id& operator= (scoped_thread_id const&);
146     };
147 
148     //! A scope guard that resets a flag on destructor
149     class scoped_flag
150     {
151     private:
152         frontend_mutex_type& m_Mutex;
153         condition_variable_any& m_Cond;
154         volatile bool& m_Flag;
155 
156     public:
scoped_flag(frontend_mutex_type & mut,condition_variable_any & cond,volatile bool & f)157         explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, volatile bool& f) :
158             m_Mutex(mut), m_Cond(cond), m_Flag(f)
159         {
160         }
~scoped_flag()161         ~scoped_flag()
162         {
163             try
164             {
165                 lock_guard< frontend_mutex_type > lock(m_Mutex);
166                 m_Flag = false;
167                 m_Cond.notify_all();
168             }
169             catch (...)
170             {
171             }
172         }
173 
174     private:
175         scoped_flag(scoped_flag const&);
176         scoped_flag& operator= (scoped_flag const&);
177     };
178 
179 public:
180     //! Sink implementation type
181     typedef SinkBackendT sink_backend_type;
182     //! \cond
183     BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
184     //! \endcond
185 
186 #ifndef BOOST_LOG_DOXYGEN_PASS
187 
188     //! A pointer type that locks the backend until it's destroyed
189     typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
190 
191 #else // BOOST_LOG_DOXYGEN_PASS
192 
193     //! A pointer type that locks the backend until it's destroyed
194     typedef implementation_defined locked_backend_ptr;
195 
196 #endif // BOOST_LOG_DOXYGEN_PASS
197 
198 private:
199     //! Synchronization mutex
200     backend_mutex_type m_BackendMutex;
201     //! Pointer to the backend
202     const shared_ptr< sink_backend_type > m_pBackend;
203 
204     //! Dedicated record feeding thread
205     thread m_DedicatedFeedingThread;
206     //! Feeding thread ID
207     thread::id m_FeedingThreadID;
208     //! Condition variable to implement blocking operations
209     condition_variable_any m_BlockCond;
210 
211     //! The flag indicates that the feeding loop has to be stopped
212     volatile bool m_StopRequested; // TODO: make it a real atomic
213     //! The flag indicates that queue flush has been requested
214     volatile bool m_FlushRequested; // TODO: make it a real atomic
215 
216 public:
217     /*!
218      * Default constructor. Constructs the sink backend instance.
219      * Requires the backend to be default-constructible.
220      *
221      * \param start_thread If \c true, the frontend creates a thread to feed
222      *                     log records to the backend. Otherwise no thread is
223      *                     started and it is assumed that the user will call
224      *                     either \c run or \c feed_records himself.
225      */
asynchronous_sink(bool start_thread=true)226     asynchronous_sink(bool start_thread = true) :
227         base_type(true),
228         m_pBackend(boost::make_shared< sink_backend_type >()),
229         m_StopRequested(false),
230         m_FlushRequested(false)
231     {
232         if (start_thread)
233             start_feeding_thread();
234     }
235     /*!
236      * Constructor attaches user-constructed backend instance
237      *
238      * \param backend Pointer to the backend instance.
239      * \param start_thread If \c true, the frontend creates a thread to feed
240      *                     log records to the backend. Otherwise no thread is
241      *                     started and it is assumed that the user will call
242      *                     either \c run or \c feed_records himself.
243      *
244      * \pre \a backend is not \c NULL.
245      */
asynchronous_sink(shared_ptr<sink_backend_type> const & backend,bool start_thread=true)246     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
247         base_type(true),
248         m_pBackend(backend),
249         m_StopRequested(false),
250         m_FlushRequested(false)
251     {
252         if (start_thread)
253             start_feeding_thread();
254     }
255 
256     // Constructors that pass arbitrary parameters to the backend constructor
257     BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
258 
259     /*!
260      * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
261      */
~asynchronous_sink()262     ~asynchronous_sink()
263     {
264         boost::this_thread::disable_interruption no_interrupts;
265         stop();
266     }
267 
268     /*!
269      * Locking accessor to the attached backend
270      */
locked_backend()271     locked_backend_ptr locked_backend()
272     {
273         return locked_backend_ptr(m_pBackend, m_BackendMutex);
274     }
275 
276     /*!
277      * Enqueues the log record to the backend
278      */
consume(record_view const & rec)279     void consume(record_view const& rec)
280     {
281         if (m_FlushRequested)
282         {
283             unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
284             // Wait until flush is done
285             while (m_FlushRequested)
286                 m_BlockCond.wait(lock);
287         }
288         queue_base_type::enqueue(rec);
289     }
290 
291     /*!
292      * The method attempts to pass logging record to the backend
293      */
try_consume(record_view const & rec)294     bool try_consume(record_view const& rec)
295     {
296         if (!m_FlushRequested)
297         {
298             return queue_base_type::try_enqueue(rec);
299         }
300         else
301             return false;
302     }
303 
304     /*!
305      * The method starts record feeding loop and effectively blocks until either of this happens:
306      *
307      * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
308      * \li an exception is thrown while processing a log record in the backend, and the exception is
309      *     not terminated by the exception handler, if one is installed
310      *
311      * \pre The sink frontend must be constructed without spawning a dedicated thread
312      */
run()313     void run()
314     {
315         // First check that no other thread is running
316         scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
317 
318         // Now start the feeding loop
319         while (true)
320         {
321             do_feed_records();
322             if (!m_StopRequested)
323             {
324                 // Block until new record is available
325                 record_view rec;
326                 if (queue_base_type::dequeue_ready(rec))
327                     base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
328             }
329             else
330                 break;
331         }
332     }
333 
334     /*!
335      * The method softly interrupts record feeding loop. This method must be called when the \c run
336      * method execution has to be interrupted. Unlike regular thread interruption, calling
337      * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend
338      * will attempt to finish its business with the record in progress and return afterwards.
339      * This method can be called either if the sink was created with a dedicated thread,
340      * or if the feeding loop was initiated by user.
341      *
342      * \note Returning from this method does not guarantee that there are no records left buffered
343      *       in the sink frontend. It is possible that log records keep coming during and after this
344      *       method is called. At some point of execution of this method log records stop being processed,
345      *       and all records that come after this point are put into the queue. These records will be
346      *       processed upon further calls to \c run or \c feed_records.
347      */
stop()348     void stop()
349     {
350         unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
351         if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
352         {
353             try
354             {
355                 m_StopRequested = true;
356                 queue_base_type::interrupt_dequeue();
357                 while (m_StopRequested)
358                     m_BlockCond.wait(lock);
359             }
360             catch (...)
361             {
362                 m_StopRequested = false;
363                 throw;
364             }
365 
366             lock.unlock();
367             m_DedicatedFeedingThread.join();
368         }
369     }
370 
371     /*!
372      * The method feeds log records that may have been buffered to the backend and returns
373      *
374      * \pre The sink frontend must be constructed without spawning a dedicated thread
375      */
feed_records()376     void feed_records()
377     {
378         // First check that no other thread is running
379         scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
380 
381         // Now start the feeding loop
382         do_feed_records();
383     }
384 
385     /*!
386      * The method feeds all log records that may have been buffered to the backend and returns.
387      * Unlike \c feed_records, in case of ordering queueing the method also feeds records
388      * that were enqueued during the ordering window, attempting to empty the queue completely.
389      *
390      * \pre The sink frontend must be constructed without spawning a dedicated thread
391      */
flush()392     void flush()
393     {
394         unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
395         if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
396         {
397             // There is already a thread feeding records, let it do the job
398             m_FlushRequested = true;
399             queue_base_type::interrupt_dequeue();
400             while (!m_StopRequested && m_FlushRequested)
401                 m_BlockCond.wait(lock);
402 
403             // The condition may have been signalled when the feeding thread was finishing.
404             // In that case records may not have been flushed, and we do the flush ourselves.
405             if (m_FeedingThreadID != thread::id())
406                 return;
407         }
408 
409         m_FlushRequested = true;
410 
411         // Flush records ourselves. The guard releases the lock.
412         scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested);
413 
414         do_feed_records();
415     }
416 
417 private:
418 #ifndef BOOST_LOG_DOXYGEN_PASS
419     //! The method spawns record feeding thread
start_feeding_thread()420     void start_feeding_thread()
421     {
422         boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread);
423     }
424 
425     //! The record feeding loop
do_feed_records()426     void do_feed_records()
427     {
428         while (!m_StopRequested)
429         {
430             record_view rec;
431             bool dequeued = false;
432             if (!m_FlushRequested)
433                 dequeued = queue_base_type::try_dequeue_ready(rec);
434             else
435                 dequeued = queue_base_type::try_dequeue(rec);
436 
437             if (dequeued)
438                 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
439             else
440                 break;
441         }
442 
443         if (m_FlushRequested)
444         {
445             scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
446             base_type::flush_backend(m_BackendMutex, *m_pBackend);
447         }
448     }
449 #endif // BOOST_LOG_DOXYGEN_PASS
450 };
451 
452 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
453 
454 } // namespace sinks
455 
456 BOOST_LOG_CLOSE_NAMESPACE // namespace log
457 
458 } // namespace boost
459 
460 #include <boost/log/detail/footer.hpp>
461 
462 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
463