1 /*****************************************************************************
2 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All Rights Reserved.
3 
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7 
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation.  The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14 
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 GNU General Public License, version 2.0, for more details.
19 
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 
24 *****************************************************************************/
25 
26 #ifndef ut0mpmcbq_h
27 #define ut0mpmcbq_h
28 
29 #include "ut0cpu_cache.h"
30 
31 #include <atomic>
32 
33 /** Multiple producer consumer, bounded queue
34  Implementation of Dmitry Vyukov's MPMC algorithm
35  http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue */
36 template <typename T>
37 class mpmc_bq {
38  public:
39   /** Constructor
40   @param[in]	n_elems		Max number of elements allowed */
mpmc_bq(size_t n_elems)41   explicit mpmc_bq(size_t n_elems)
42       : m_ring(reinterpret_cast<Cell *>(UT_NEW_ARRAY_NOKEY(Aligned, n_elems))),
43         m_capacity(n_elems - 1) {
44     /* Should be a power of 2 */
45     ut_a((n_elems >= 2) && ((n_elems & (n_elems - 1)) == 0));
46 
47     for (size_t i = 0; i < n_elems; ++i) {
48       m_ring[i].m_pos.store(i, std::memory_order_relaxed);
49     }
50 
51     m_enqueue_pos.store(0, std::memory_order_relaxed);
52     m_dequeue_pos.store(0, std::memory_order_relaxed);
53   }
54 
55   /** Destructor */
~mpmc_bq()56   ~mpmc_bq() { UT_DELETE_ARRAY(m_ring); }
57 
58   /** Enqueue an element
59   @param[in]	data		Element to insert, it will be copied
60   @return true on success */
enqueue(T const & data)61   bool enqueue(T const &data) MY_ATTRIBUTE((warn_unused_result)) {
62     /* m_enqueue_pos only wraps at MAX(m_enqueue_pos), instead
63     we use the capacity to convert the sequence to an array
64     index. This is why the ring buffer must be a size which
65     is a power of 2. This also allows the sequence to double
66     as a ticket/lock. */
67 
68     size_t pos = m_enqueue_pos.load(std::memory_order_relaxed);
69 
70     Cell *cell;
71 
72     for (;;) {
73       cell = &m_ring[pos & m_capacity];
74 
75       size_t seq;
76 
77       seq = cell->m_pos.load(std::memory_order_acquire);
78 
79       intptr_t diff = (intptr_t)seq - (intptr_t)pos;
80 
81       /* If they are the same then it means this cell is empty */
82 
83       if (diff == 0) {
84         /* Claim our spot by moving head. If head isn't the same as we last
85         checked then that means someone beat us to the punch. Weak compare is
86         faster, but can return spurious results which in this instance is OK,
87         because it's in the loop */
88 
89         if (m_enqueue_pos.compare_exchange_weak(pos, pos + 1,
90                                                 std::memory_order_relaxed)) {
91           break;
92         }
93 
94       } else if (diff < 0) {
95         /* The queue is full */
96 
97         return (false);
98 
99       } else {
100         pos = m_enqueue_pos.load(std::memory_order_relaxed);
101       }
102     }
103 
104     cell->m_data = data;
105 
106     /* Increment the sequence so that the tail knows it's accessible */
107 
108     cell->m_pos.store(pos + 1, std::memory_order_release);
109 
110     return (true);
111   }
112 
113   /** Dequeue an element
114   @param[out]	data		Element read from the queue
115   @return true on success */
dequeue(T & data)116   bool dequeue(T &data) MY_ATTRIBUTE((warn_unused_result)) {
117     Cell *cell;
118     size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
119 
120     for (;;) {
121       cell = &m_ring[pos & m_capacity];
122 
123       size_t seq = cell->m_pos.load(std::memory_order_acquire);
124 
125       auto diff = (intptr_t)seq - (intptr_t)(pos + 1);
126 
127       if (diff == 0) {
128         /* Claim our spot by moving the head. If head isn't the same as we last
129         checked then that means someone beat us to the punch. Weak compare is
130         faster, but can return spurious results. Which in this instance is
131         OK, because it's in the loop. */
132 
133         if (m_dequeue_pos.compare_exchange_weak(pos, pos + 1,
134                                                 std::memory_order_relaxed)) {
135           break;
136         }
137 
138       } else if (diff < 0) {
139         /* The queue is empty */
140         return (false);
141 
142       } else {
143         /* Under normal circumstances this branch should never be taken. */
144         pos = m_dequeue_pos.load(std::memory_order_relaxed);
145       }
146     }
147 
148     data = cell->m_data;
149 
150     /* Set the sequence to what the head sequence should be next
151     time around */
152 
153     cell->m_pos.store(pos + m_capacity + 1, std::memory_order_release);
154 
155     return (true);
156   }
157 
158   /** @return the capacity of the queue */
capacity()159   size_t capacity() const MY_ATTRIBUTE((warn_unused_result)) {
160     return (m_capacity + 1);
161   }
162 
163   /** @return true if the queue is empty. */
empty()164   bool empty() const MY_ATTRIBUTE((warn_unused_result)) {
165     size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
166 
167     for (;;) {
168       auto cell = &m_ring[pos & m_capacity];
169 
170       size_t seq = cell->m_pos.load(std::memory_order_acquire);
171 
172       auto diff = (intptr_t)seq - (intptr_t)(pos + 1);
173 
174       if (diff == 0) {
175         return (false);
176       } else if (diff < 0) {
177         return (true);
178       } else {
179         pos = m_dequeue_pos.load(std::memory_order_relaxed);
180       }
181     }
182 
183     return (false);
184   }
185 
186  private:
187   using Pad = byte[ut::INNODB_CACHE_LINE_SIZE];
188 
189   struct Cell {
190     std::atomic<size_t> m_pos;
191     T m_data;
192   };
193 
194   using Aligned =
195       typename std::aligned_storage<sizeof(Cell),
196                                     std::alignment_of<Cell>::value>::type;
197 
198   Pad m_pad0;
199   Cell *const m_ring;
200   size_t const m_capacity;
201   Pad m_pad1;
202   std::atomic<size_t> m_enqueue_pos;
203   Pad m_pad2;
204   std::atomic<size_t> m_dequeue_pos;
205   Pad m_pad3;
206 
207   mpmc_bq(mpmc_bq &&) = delete;
208   mpmc_bq(const mpmc_bq &) = delete;
209   mpmc_bq &operator=(mpmc_bq &&) = delete;
210   mpmc_bq &operator=(const mpmc_bq &) = delete;
211 };
212 
213 #endif /* ut0mpmcbq_h */
214