1 #ifndef BTLLIB_ORDER_QUEUE_HPP 2 #define BTLLIB_ORDER_QUEUE_HPP 3 4 #include <algorithm> 5 #include <atomic> 6 #include <condition_variable> 7 #include <mutex> 8 #include <string> 9 #include <utility> 10 #include <vector> 11 12 namespace btllib { 13 14 template<typename T> 15 class OrderQueue 16 { 17 18 public: 19 struct Block 20 { 21 Blockbtllib::OrderQueue::Block22 Block(const size_t block_size) 23 : data(block_size) 24 {} 25 26 Block(const Block& block) = default; 27 Blockbtllib::OrderQueue::Block28 Block(Block&& block) noexcept 29 : current(block.current) 30 , count(block.count) 31 , num(block.num) 32 { 33 std::swap(data, block.data); 34 block.current = 0; 35 block.count = 0; 36 block.num = 0; 37 } 38 39 Block& operator=(const Block& block) = default; 40 operator =btllib::OrderQueue::Block41 Block& operator=(Block&& block) noexcept 42 { 43 std::swap(data, block.data); 44 current = block.current; 45 count = block.count; 46 num = block.num; 47 block.current = 0; 48 block.count = 0; 49 block.num = 0; 50 return *this; 51 } 52 53 std::vector<T> data; 54 size_t current = 0; 55 size_t count = 0; 56 size_t num = 0; 57 }; 58 59 // Surrounds pieces of data in the buffer with a busy mutex 60 // for exclusive access 61 struct Slot 62 { Slotbtllib::OrderQueue::Slot63 Slot(size_t block_size) 64 : block(block_size) 65 {} Slotbtllib::OrderQueue::Slot66 Slot(const Slot& slot) 67 : block(slot.block) 68 , occupied(slot.occupied) 69 , last_tenant(slot.last_tenant) 70 {} Slotbtllib::OrderQueue::Slot71 Slot(Slot&& slot) noexcept 72 : block(slot.block) 73 , occupied(slot.occupied) 74 , last_tenant(slot.last_tenant) 75 {} 76 operator =btllib::OrderQueue::Slot77 Slot& operator=(const Slot& slot) 78 { 79 if (this == &slot) { 80 return *this; 81 } 82 block = slot.block; 83 occupied = slot.occupied; 84 last_tenant = slot.last_tenant; 85 return *this; 86 } operator =btllib::OrderQueue::Slot87 Slot& operator=(Slot&& slot) noexcept 88 { 89 block = slot.block; 90 occupied = slot.occupied; 91 last_tenant = slot.last_tenant; 92 return *this; 93 } 94 95 typename OrderQueue<T>::Block block; 96 std::mutex busy; 97 bool occupied = false; 98 std::condition_variable occupancy_changed; 99 size_t last_tenant = -1; // Required to ensure read order 100 }; 101 elements() const102 size_t elements() const { return element_count; } 103 close()104 void close() 105 { 106 closed = true; 107 for (auto& slot : this->slots) { 108 slot.occupancy_changed.notify_all(); 109 } 110 } 111 is_closed() const112 bool is_closed() const { return closed; } 113 OrderQueue(const size_t queue_size,const size_t block_size)114 OrderQueue(const size_t queue_size, const size_t block_size) 115 : slots(queue_size, Slot(block_size)) 116 , queue_size(queue_size) 117 , block_size(block_size) 118 {} 119 120 OrderQueue(const OrderQueue&) = delete; 121 OrderQueue(OrderQueue&&) = delete; 122 123 protected: 124 std::vector<Slot> slots; 125 size_t queue_size, block_size; 126 size_t read_counter = 0; 127 std::atomic<size_t> element_count{ 0 }; 128 std::atomic<bool> closed{ false }; 129 }; 130 131 #define ORDER_QUEUE_XPXC(SUFFIX, \ 132 PRE_WRITE_LOCK, \ 133 EXTRA_WRITE_LOCK_CONDS, \ 134 POST_WRITE_LOCK, \ 135 NOTIFY_WRITE, \ 136 PRE_READ_LOCK, \ 137 EXTRA_READ_LOCK_CONDS, \ 138 POST_READ_LOCK, \ 139 NOTIFY_READ, \ 140 MEMBERS) \ 141 template<typename T> \ 142 class OrderQueue##SUFFIX : public OrderQueue<T> \ 143 { \ 144 \ 145 public: \ 146 OrderQueue##SUFFIX(const size_t queue_size, const size_t block_size) \ 147 : OrderQueue<T>(queue_size, block_size) \ 148 {} \ 149 \ 150 using Block = typename OrderQueue<T>::Block; \ 151 using Slot = typename OrderQueue<T>::Slot; \ 152 \ 153 void write(Block& block) \ 154 { \ 155 PRE_WRITE_LOCK; \ 156 const auto num = block.num; \ 157 auto& target = this->slots[num % this->queue_size]; \ 158 std::unique_lock<std::mutex> busy_lock(target.busy); \ 159 target.occupancy_changed.wait(busy_lock, [&] { \ 160 return (!target.occupied EXTRA_WRITE_LOCK_CONDS) || this->closed; \ 161 }); \ 162 if (this->closed) { \ 163 return; \ 164 } \ 165 POST_WRITE_LOCK; /* NOLINT */ \ 166 target.block = std::move(block); \ 167 target.occupied = true; \ 168 target.occupancy_changed.NOTIFY_WRITE(); \ 169 ++(this->element_count); \ 170 } \ 171 \ 172 void read(Block& block) \ 173 { \ 174 PRE_READ_LOCK; \ 175 auto& target = this->slots[this->read_counter % this->queue_size]; \ 176 std::unique_lock<std::mutex> busy_lock(target.busy); \ 177 target.occupancy_changed.wait(busy_lock, [&] { \ 178 return (target.occupied EXTRA_READ_LOCK_CONDS) || this->closed; \ 179 }); \ 180 if (this->closed) { \ 181 return; \ 182 } \ 183 ++(this->read_counter); \ 184 POST_READ_LOCK; \ 185 block = std::move(target.block); \ 186 target.occupied = false; \ 187 target.occupancy_changed.NOTIFY_READ(); \ 188 --(this->element_count); \ 189 } \ 190 \ 191 private: \ 192 MEMBERS /* NOLINT */ \ 193 }; 194 195 ORDER_QUEUE_XPXC(SPSC, , , , notify_one, , , , notify_one, ) 196 ORDER_QUEUE_XPXC(MPSC, 197 , 198 &&(num - target.last_tenant <= this->queue_size), 199 target.last_tenant = num, 200 notify_all, 201 , 202 , 203 , 204 notify_all, ) 205 ORDER_QUEUE_XPXC(SPMC, 206 , 207 , 208 , 209 notify_one, 210 std::unique_lock<std::mutex> read_lock(read_mutex), 211 , 212 read_lock.unlock(), 213 notify_one, 214 std::mutex read_mutex;) 215 ORDER_QUEUE_XPXC(MPMC, 216 , 217 &&(num - target.last_tenant <= this->queue_size), 218 target.last_tenant = num, 219 notify_all, 220 std::unique_lock<std::mutex> read_lock(read_mutex), 221 , 222 read_lock.unlock(), 223 notify_all, 224 std::mutex read_mutex;) 225 226 #undef ORDER_QUEUE_XPXC 227 228 } // namespace btllib 229 230 #endif