1 /*
2 * Copyright Andrey Semashev 2007 - 2015.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7 /*!
8 * \file threadsafe_queue.cpp
9 * \author Andrey Semashev
10 * \date 05.11.2010
11 *
12 * \brief This header is the Boost.Log library implementation, see the library documentation
13 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
14 *
15 * The implementation is based on algorithms published in the "Simple, Fast,
16 * and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" article
17 * in PODC96 by Maged M. Michael and Michael L. Scott. Pseudocode is available here:
18 * http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
19 *
20 * The lock-free version of the mentioned algorithms contain a race condition and therefore
21 * were not included here.
22 */
23
24 #include <boost/log/detail/threadsafe_queue.hpp>
25
26 #ifndef BOOST_LOG_NO_THREADS
27
28 #include <new>
29 #include <boost/assert.hpp>
30 #include <boost/throw_exception.hpp>
31 #include <boost/align/aligned_alloc.hpp>
32 #include <boost/type_traits/alignment_of.hpp>
33 #include <boost/log/detail/spin_mutex.hpp>
34 #include <boost/log/detail/locks.hpp>
35 #include <boost/log/detail/header.hpp>
36
37 namespace boost {
38
39 BOOST_LOG_OPEN_NAMESPACE
40
41 namespace aux {
42
43 //! Generic queue implementation with two locks
44 class threadsafe_queue_impl_generic :
45 public threadsafe_queue_impl
46 {
47 private:
48 //! Mutex type to be used
49 typedef spin_mutex mutex_type;
50
51 /*!
52 * A structure that contains a pointer to the node and the associated mutex.
53 * The alignment below allows to eliminate false sharing, it should not be less than CPU cache line size.
54 */
BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE)55 struct BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE) pointer
56 {
57 //! Pointer to the either end of the queue
58 node_base* node;
59 //! Lock for access synchronization
60 mutex_type mutex;
61 // 128 bytes padding is chosen to mitigate false sharing for NetBurst CPUs, which load two cache lines in one go.
62 unsigned char padding[128U - (sizeof(node_base*) + sizeof(mutex_type)) % 128U];
63 };
64
65 private:
66 //! Pointer to the beginning of the queue
67 pointer m_Head;
68 //! Pointer to the end of the queue
69 pointer m_Tail;
70
71 public:
threadsafe_queue_impl_generic(node_base * first_node)72 explicit threadsafe_queue_impl_generic(node_base* first_node)
73 {
74 set_next(first_node, NULL);
75 m_Head.node = m_Tail.node = first_node;
76 }
77
~threadsafe_queue_impl_generic()78 ~threadsafe_queue_impl_generic()
79 {
80 }
81
reset_last_node()82 node_base* reset_last_node()
83 {
84 BOOST_ASSERT(m_Head.node == m_Tail.node);
85 node_base* p = m_Head.node;
86 m_Head.node = m_Tail.node = NULL;
87 return p;
88 }
89
unsafe_empty()90 bool unsafe_empty()
91 {
92 return m_Head.node == m_Tail.node;
93 }
94
push(node_base * p)95 void push(node_base* p)
96 {
97 set_next(p, NULL);
98 exclusive_lock_guard< mutex_type > _(m_Tail.mutex);
99 set_next(m_Tail.node, p);
100 m_Tail.node = p;
101 }
102
try_pop(node_base * & node_to_free,node_base * & node_with_value)103 bool try_pop(node_base*& node_to_free, node_base*& node_with_value)
104 {
105 exclusive_lock_guard< mutex_type > _(m_Head.mutex);
106 node_base* next = get_next(m_Head.node);
107 if (next)
108 {
109 // We have a node to pop
110 node_to_free = m_Head.node;
111 node_with_value = m_Head.node = next;
112 return true;
113 }
114 else
115 return false;
116 }
117
118 private:
119 // Copying and assignment are closed
120 threadsafe_queue_impl_generic(threadsafe_queue_impl_generic const&);
121 threadsafe_queue_impl_generic& operator= (threadsafe_queue_impl_generic const&);
122
set_next(node_base * p,node_base * next)123 BOOST_FORCEINLINE static void set_next(node_base* p, node_base* next)
124 {
125 p->next.data[0] = next;
126 }
get_next(node_base * p)127 BOOST_FORCEINLINE static node_base* get_next(node_base* p)
128 {
129 return static_cast< node_base* >(p->next.data[0]);
130 }
131 };
132
create(node_base * first_node)133 BOOST_LOG_API threadsafe_queue_impl* threadsafe_queue_impl::create(node_base* first_node)
134 {
135 return new threadsafe_queue_impl_generic(first_node);
136 }
137
operator new(std::size_t size)138 BOOST_LOG_API void* threadsafe_queue_impl::operator new (std::size_t size)
139 {
140 void* p = alignment::aligned_alloc(BOOST_LOG_CPU_CACHE_LINE_SIZE, size);
141 if (!p)
142 BOOST_THROW_EXCEPTION(std::bad_alloc());
143 return p;
144 }
145
operator delete(void * p,std::size_t)146 BOOST_LOG_API void threadsafe_queue_impl::operator delete (void* p, std::size_t)
147 {
148 alignment::aligned_free(p);
149 }
150
151 } // namespace aux
152
153 BOOST_LOG_CLOSE_NAMESPACE // namespace log
154
155 } // namespace boost
156
157 #include <boost/log/detail/footer.hpp>
158
159 #endif // BOOST_LOG_NO_THREADS
160