1 #ifndef BTLLIB_ORDER_QUEUE_HPP
2 #define BTLLIB_ORDER_QUEUE_HPP
3 
4 #include <algorithm>
5 #include <atomic>
6 #include <condition_variable>
7 #include <mutex>
8 #include <string>
9 #include <utility>
10 #include <vector>
11 
12 namespace btllib {
13 
14 template<typename T>
15 class OrderQueue
16 {
17 
18 public:
19   struct Block
20   {
21 
Blockbtllib::OrderQueue::Block22     Block(const size_t block_size)
23       : data(block_size)
24     {}
25 
26     Block(const Block& block) = default;
27 
Blockbtllib::OrderQueue::Block28     Block(Block&& block) noexcept
29       : current(block.current)
30       , count(block.count)
31       , num(block.num)
32     {
33       std::swap(data, block.data);
34       block.current = 0;
35       block.count = 0;
36       block.num = 0;
37     }
38 
39     Block& operator=(const Block& block) = default;
40 
operator =btllib::OrderQueue::Block41     Block& operator=(Block&& block) noexcept
42     {
43       std::swap(data, block.data);
44       current = block.current;
45       count = block.count;
46       num = block.num;
47       block.current = 0;
48       block.count = 0;
49       block.num = 0;
50       return *this;
51     }
52 
53     std::vector<T> data;
54     size_t current = 0;
55     size_t count = 0;
56     size_t num = 0;
57   };
58 
59   // Surrounds pieces of data in the buffer with a busy mutex
60   // for exclusive access
61   struct Slot
62   {
Slotbtllib::OrderQueue::Slot63     Slot(size_t block_size)
64       : block(block_size)
65     {}
Slotbtllib::OrderQueue::Slot66     Slot(const Slot& slot)
67       : block(slot.block)
68       , occupied(slot.occupied)
69       , last_tenant(slot.last_tenant)
70     {}
Slotbtllib::OrderQueue::Slot71     Slot(Slot&& slot) noexcept
72       : block(slot.block)
73       , occupied(slot.occupied)
74       , last_tenant(slot.last_tenant)
75     {}
76 
operator =btllib::OrderQueue::Slot77     Slot& operator=(const Slot& slot)
78     {
79       if (this == &slot) {
80         return *this;
81       }
82       block = slot.block;
83       occupied = slot.occupied;
84       last_tenant = slot.last_tenant;
85       return *this;
86     }
operator =btllib::OrderQueue::Slot87     Slot& operator=(Slot&& slot) noexcept
88     {
89       block = slot.block;
90       occupied = slot.occupied;
91       last_tenant = slot.last_tenant;
92       return *this;
93     }
94 
95     typename OrderQueue<T>::Block block;
96     std::mutex busy;
97     bool occupied = false;
98     std::condition_variable occupancy_changed;
99     size_t last_tenant = -1; // Required to ensure read order
100   };
101 
elements() const102   size_t elements() const { return element_count; }
103 
close()104   void close()
105   {
106     closed = true;
107     for (auto& slot : this->slots) {
108       slot.occupancy_changed.notify_all();
109     }
110   }
111 
is_closed() const112   bool is_closed() const { return closed; }
113 
OrderQueue(const size_t queue_size,const size_t block_size)114   OrderQueue(const size_t queue_size, const size_t block_size)
115     : slots(queue_size, Slot(block_size))
116     , queue_size(queue_size)
117     , block_size(block_size)
118   {}
119 
120   OrderQueue(const OrderQueue&) = delete;
121   OrderQueue(OrderQueue&&) = delete;
122 
123 protected:
124   std::vector<Slot> slots;
125   size_t queue_size, block_size;
126   size_t read_counter = 0;
127   std::atomic<size_t> element_count{ 0 };
128   std::atomic<bool> closed{ false };
129 };
130 
131 #define ORDER_QUEUE_XPXC(SUFFIX,                                               \
132                          PRE_WRITE_LOCK,                                       \
133                          EXTRA_WRITE_LOCK_CONDS,                               \
134                          POST_WRITE_LOCK,                                      \
135                          NOTIFY_WRITE,                                         \
136                          PRE_READ_LOCK,                                        \
137                          EXTRA_READ_LOCK_CONDS,                                \
138                          POST_READ_LOCK,                                       \
139                          NOTIFY_READ,                                          \
140                          MEMBERS)                                              \
141   template<typename T>                                                         \
142   class OrderQueue##SUFFIX : public OrderQueue<T>                              \
143   {                                                                            \
144                                                                                \
145   public:                                                                      \
146     OrderQueue##SUFFIX(const size_t queue_size, const size_t block_size)       \
147       : OrderQueue<T>(queue_size, block_size)                                  \
148     {}                                                                         \
149                                                                                \
150     using Block = typename OrderQueue<T>::Block;                               \
151     using Slot = typename OrderQueue<T>::Slot;                                 \
152                                                                                \
153     void write(Block& block)                                                   \
154     {                                                                          \
155       PRE_WRITE_LOCK;                                                          \
156       const auto num = block.num;                                              \
157       auto& target = this->slots[num % this->queue_size];                      \
158       std::unique_lock<std::mutex> busy_lock(target.busy);                     \
159       target.occupancy_changed.wait(busy_lock, [&] {                           \
160         return (!target.occupied EXTRA_WRITE_LOCK_CONDS) || this->closed;      \
161       });                                                                      \
162       if (this->closed) {                                                      \
163         return;                                                                \
164       }                                                                        \
165       POST_WRITE_LOCK; /* NOLINT */                                            \
166       target.block = std::move(block);                                         \
167       target.occupied = true;                                                  \
168       target.occupancy_changed.NOTIFY_WRITE();                                 \
169       ++(this->element_count);                                                 \
170     }                                                                          \
171                                                                                \
172     void read(Block& block)                                                    \
173     {                                                                          \
174       PRE_READ_LOCK;                                                           \
175       auto& target = this->slots[this->read_counter % this->queue_size];       \
176       std::unique_lock<std::mutex> busy_lock(target.busy);                     \
177       target.occupancy_changed.wait(busy_lock, [&] {                           \
178         return (target.occupied EXTRA_READ_LOCK_CONDS) || this->closed;        \
179       });                                                                      \
180       if (this->closed) {                                                      \
181         return;                                                                \
182       }                                                                        \
183       ++(this->read_counter);                                                  \
184       POST_READ_LOCK;                                                          \
185       block = std::move(target.block);                                         \
186       target.occupied = false;                                                 \
187       target.occupancy_changed.NOTIFY_READ();                                  \
188       --(this->element_count);                                                 \
189     }                                                                          \
190                                                                                \
191   private:                                                                     \
192     MEMBERS /* NOLINT */                                                       \
193   };
194 
195 ORDER_QUEUE_XPXC(SPSC, , , , notify_one, , , , notify_one, )
196 ORDER_QUEUE_XPXC(MPSC,
197                  ,
198                  &&(num - target.last_tenant <= this->queue_size),
199                  target.last_tenant = num,
200                  notify_all,
201                  ,
202                  ,
203                  ,
204                  notify_all, )
205 ORDER_QUEUE_XPXC(SPMC,
206                  ,
207                  ,
208                  ,
209                  notify_one,
210                  std::unique_lock<std::mutex> read_lock(read_mutex),
211                  ,
212                  read_lock.unlock(),
213                  notify_one,
214                  std::mutex read_mutex;)
215 ORDER_QUEUE_XPXC(MPMC,
216                  ,
217                  &&(num - target.last_tenant <= this->queue_size),
218                  target.last_tenant = num,
219                  notify_all,
220                  std::unique_lock<std::mutex> read_lock(read_mutex),
221                  ,
222                  read_lock.unlock(),
223                  notify_all,
224                  std::mutex read_mutex;)
225 
226 #undef ORDER_QUEUE_XPXC
227 
228 } // namespace btllib
229 
230 #endif