1 /*
2  *          Copyright Andrey Semashev 2007 - 2015.
3  * Distributed under the Boost Software License, Version 1.0.
4  *    (See accompanying file LICENSE_1_0.txt or copy at
5  *          http://www.boost.org/LICENSE_1_0.txt)
6  */
7 /*!
8  * \file   threadsafe_queue.cpp
9  * \author Andrey Semashev
10  * \date   05.11.2010
11  *
12  * \brief  This header is the Boost.Log library implementation, see the library documentation
13  *         at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
14  *
15  * The implementation is based on algorithms published in the "Simple, Fast,
16  * and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" article
17  * in PODC96 by Maged M. Michael and Michael L. Scott. Pseudocode is available here:
18  * http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
19  *
20  * The lock-free version of the mentioned algorithms contain a race condition and therefore
21  * were not included here.
22  */
23 
24 #include <boost/log/detail/threadsafe_queue.hpp>
25 
26 #ifndef BOOST_LOG_NO_THREADS
27 
28 #include <new>
29 #include <boost/assert.hpp>
30 #include <boost/throw_exception.hpp>
31 #include <boost/align/aligned_alloc.hpp>
32 #include <boost/type_traits/alignment_of.hpp>
33 #include <boost/log/detail/spin_mutex.hpp>
34 #include <boost/log/detail/locks.hpp>
35 #include <boost/log/detail/header.hpp>
36 
37 namespace boost {
38 
39 BOOST_LOG_OPEN_NAMESPACE
40 
41 namespace aux {
42 
43 //! Generic queue implementation with two locks
44 class threadsafe_queue_impl_generic :
45     public threadsafe_queue_impl
46 {
47 private:
48     //! Mutex type to be used
49     typedef spin_mutex mutex_type;
50 
51     /*!
52      * A structure that contains a pointer to the node and the associated mutex.
53      * The alignment below allows to eliminate false sharing, it should not be less than CPU cache line size.
54      */
BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE)55     struct BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE) pointer
56     {
57         //! Pointer to the either end of the queue
58         node_base* node;
59         //! Lock for access synchronization
60         mutex_type mutex;
61         //  128 bytes padding is chosen to mitigate false sharing for NetBurst CPUs, which load two cache lines in one go.
62         unsigned char padding[128U - (sizeof(node_base*) + sizeof(mutex_type)) % 128U];
63     };
64 
65 private:
66     //! Pointer to the beginning of the queue
67     pointer m_Head;
68     //! Pointer to the end of the queue
69     pointer m_Tail;
70 
71 public:
threadsafe_queue_impl_generic(node_base * first_node)72     explicit threadsafe_queue_impl_generic(node_base* first_node)
73     {
74         set_next(first_node, NULL);
75         m_Head.node = m_Tail.node = first_node;
76     }
77 
~threadsafe_queue_impl_generic()78     ~threadsafe_queue_impl_generic()
79     {
80     }
81 
reset_last_node()82     node_base* reset_last_node()
83     {
84         BOOST_ASSERT(m_Head.node == m_Tail.node);
85         node_base* p = m_Head.node;
86         m_Head.node = m_Tail.node = NULL;
87         return p;
88     }
89 
unsafe_empty()90     bool unsafe_empty()
91     {
92         return m_Head.node == m_Tail.node;
93     }
94 
push(node_base * p)95     void push(node_base* p)
96     {
97         set_next(p, NULL);
98         exclusive_lock_guard< mutex_type > _(m_Tail.mutex);
99         set_next(m_Tail.node, p);
100         m_Tail.node = p;
101     }
102 
try_pop(node_base * & node_to_free,node_base * & node_with_value)103     bool try_pop(node_base*& node_to_free, node_base*& node_with_value)
104     {
105         exclusive_lock_guard< mutex_type > _(m_Head.mutex);
106         node_base* next = get_next(m_Head.node);
107         if (next)
108         {
109             // We have a node to pop
110             node_to_free = m_Head.node;
111             node_with_value = m_Head.node = next;
112             return true;
113         }
114         else
115             return false;
116     }
117 
118 private:
119     // Copying and assignment are closed
120     threadsafe_queue_impl_generic(threadsafe_queue_impl_generic const&);
121     threadsafe_queue_impl_generic& operator= (threadsafe_queue_impl_generic const&);
122 
set_next(node_base * p,node_base * next)123     BOOST_FORCEINLINE static void set_next(node_base* p, node_base* next)
124     {
125         p->next.data[0] = next;
126     }
get_next(node_base * p)127     BOOST_FORCEINLINE static node_base* get_next(node_base* p)
128     {
129         return static_cast< node_base* >(p->next.data[0]);
130     }
131 };
132 
create(node_base * first_node)133 BOOST_LOG_API threadsafe_queue_impl* threadsafe_queue_impl::create(node_base* first_node)
134 {
135     return new threadsafe_queue_impl_generic(first_node);
136 }
137 
operator new(std::size_t size)138 BOOST_LOG_API void* threadsafe_queue_impl::operator new (std::size_t size)
139 {
140     void* p = alignment::aligned_alloc(BOOST_LOG_CPU_CACHE_LINE_SIZE, size);
141     if (!p)
142         BOOST_THROW_EXCEPTION(std::bad_alloc());
143     return p;
144 }
145 
operator delete(void * p,std::size_t)146 BOOST_LOG_API void threadsafe_queue_impl::operator delete (void* p, std::size_t)
147 {
148     alignment::aligned_free(p);
149 }
150 
151 } // namespace aux
152 
153 BOOST_LOG_CLOSE_NAMESPACE // namespace log
154 
155 } // namespace boost
156 
157 #include <boost/log/detail/footer.hpp>
158 
159 #endif // BOOST_LOG_NO_THREADS
160