1 /***************************************************************************** 2 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All Rights Reserved. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License, version 2.0, 6 as published by the Free Software Foundation. 7 8 This program is also distributed with certain software (including 9 but not limited to OpenSSL) that is licensed under separate terms, 10 as designated in a particular file or component or in included license 11 documentation. The authors of MySQL hereby grant you an additional 12 permission to link the program and your derivative works with the 13 separately licensed software that they have included with MySQL. 14 15 This program is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License, version 2.0, for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with this program; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 24 *****************************************************************************/ 25 26 #ifndef ut0mpmcbq_h 27 #define ut0mpmcbq_h 28 29 #include "ut0cpu_cache.h" 30 31 #include <atomic> 32 33 /** Multiple producer consumer, bounded queue 34 Implementation of Dmitry Vyukov's MPMC algorithm 35 http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue */ 36 template <typename T> 37 class mpmc_bq { 38 public: 39 /** Constructor 40 @param[in] n_elems Max number of elements allowed */ mpmc_bq(size_t n_elems)41 explicit mpmc_bq(size_t n_elems) 42 : m_ring(reinterpret_cast<Cell *>(UT_NEW_ARRAY_NOKEY(Aligned, n_elems))), 43 m_capacity(n_elems - 1) { 44 /* Should be a power of 2 */ 45 ut_a((n_elems >= 2) && ((n_elems & (n_elems - 1)) == 0)); 46 47 for (size_t i = 0; i < n_elems; ++i) { 48 m_ring[i].m_pos.store(i, std::memory_order_relaxed); 49 } 50 51 m_enqueue_pos.store(0, std::memory_order_relaxed); 52 m_dequeue_pos.store(0, std::memory_order_relaxed); 53 } 54 55 /** Destructor */ ~mpmc_bq()56 ~mpmc_bq() { UT_DELETE_ARRAY(m_ring); } 57 58 /** Enqueue an element 59 @param[in] data Element to insert, it will be copied 60 @return true on success */ enqueue(T const & data)61 bool enqueue(T const &data) MY_ATTRIBUTE((warn_unused_result)) { 62 /* m_enqueue_pos only wraps at MAX(m_enqueue_pos), instead 63 we use the capacity to convert the sequence to an array 64 index. This is why the ring buffer must be a size which 65 is a power of 2. This also allows the sequence to double 66 as a ticket/lock. */ 67 68 size_t pos = m_enqueue_pos.load(std::memory_order_relaxed); 69 70 Cell *cell; 71 72 for (;;) { 73 cell = &m_ring[pos & m_capacity]; 74 75 size_t seq; 76 77 seq = cell->m_pos.load(std::memory_order_acquire); 78 79 intptr_t diff = (intptr_t)seq - (intptr_t)pos; 80 81 /* If they are the same then it means this cell is empty */ 82 83 if (diff == 0) { 84 /* Claim our spot by moving head. If head isn't the same as we last 85 checked then that means someone beat us to the punch. Weak compare is 86 faster, but can return spurious results which in this instance is OK, 87 because it's in the loop */ 88 89 if (m_enqueue_pos.compare_exchange_weak(pos, pos + 1, 90 std::memory_order_relaxed)) { 91 break; 92 } 93 94 } else if (diff < 0) { 95 /* The queue is full */ 96 97 return (false); 98 99 } else { 100 pos = m_enqueue_pos.load(std::memory_order_relaxed); 101 } 102 } 103 104 cell->m_data = data; 105 106 /* Increment the sequence so that the tail knows it's accessible */ 107 108 cell->m_pos.store(pos + 1, std::memory_order_release); 109 110 return (true); 111 } 112 113 /** Dequeue an element 114 @param[out] data Element read from the queue 115 @return true on success */ dequeue(T & data)116 bool dequeue(T &data) MY_ATTRIBUTE((warn_unused_result)) { 117 Cell *cell; 118 size_t pos = m_dequeue_pos.load(std::memory_order_relaxed); 119 120 for (;;) { 121 cell = &m_ring[pos & m_capacity]; 122 123 size_t seq = cell->m_pos.load(std::memory_order_acquire); 124 125 auto diff = (intptr_t)seq - (intptr_t)(pos + 1); 126 127 if (diff == 0) { 128 /* Claim our spot by moving the head. If head isn't the same as we last 129 checked then that means someone beat us to the punch. Weak compare is 130 faster, but can return spurious results. Which in this instance is 131 OK, because it's in the loop. */ 132 133 if (m_dequeue_pos.compare_exchange_weak(pos, pos + 1, 134 std::memory_order_relaxed)) { 135 break; 136 } 137 138 } else if (diff < 0) { 139 /* The queue is empty */ 140 return (false); 141 142 } else { 143 /* Under normal circumstances this branch should never be taken. */ 144 pos = m_dequeue_pos.load(std::memory_order_relaxed); 145 } 146 } 147 148 data = cell->m_data; 149 150 /* Set the sequence to what the head sequence should be next 151 time around */ 152 153 cell->m_pos.store(pos + m_capacity + 1, std::memory_order_release); 154 155 return (true); 156 } 157 158 /** @return the capacity of the queue */ capacity()159 size_t capacity() const MY_ATTRIBUTE((warn_unused_result)) { 160 return (m_capacity + 1); 161 } 162 163 /** @return true if the queue is empty. */ empty()164 bool empty() const MY_ATTRIBUTE((warn_unused_result)) { 165 size_t pos = m_dequeue_pos.load(std::memory_order_relaxed); 166 167 for (;;) { 168 auto cell = &m_ring[pos & m_capacity]; 169 170 size_t seq = cell->m_pos.load(std::memory_order_acquire); 171 172 auto diff = (intptr_t)seq - (intptr_t)(pos + 1); 173 174 if (diff == 0) { 175 return (false); 176 } else if (diff < 0) { 177 return (true); 178 } else { 179 pos = m_dequeue_pos.load(std::memory_order_relaxed); 180 } 181 } 182 183 return (false); 184 } 185 186 private: 187 using Pad = byte[ut::INNODB_CACHE_LINE_SIZE]; 188 189 struct Cell { 190 std::atomic<size_t> m_pos; 191 T m_data; 192 }; 193 194 using Aligned = 195 typename std::aligned_storage<sizeof(Cell), 196 std::alignment_of<Cell>::value>::type; 197 198 Pad m_pad0; 199 Cell *const m_ring; 200 size_t const m_capacity; 201 Pad m_pad1; 202 std::atomic<size_t> m_enqueue_pos; 203 Pad m_pad2; 204 std::atomic<size_t> m_dequeue_pos; 205 Pad m_pad3; 206 207 mpmc_bq(mpmc_bq &&) = delete; 208 mpmc_bq(const mpmc_bq &) = delete; 209 mpmc_bq &operator=(mpmc_bq &&) = delete; 210 mpmc_bq &operator=(const mpmc_bq &) = delete; 211 }; 212 213 #endif /* ut0mpmcbq_h */ 214