1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #pragma once
8 
9 #include "td/utils/port/EventFd.h"
10 #include "td/utils/port/thread.h"
11 
12 #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
13 
14 #include <atomic>
15 #include <type_traits>
16 #include <utility>
17 
18 namespace td {
19 
20 namespace detail {
21 class Backoff {
22   int cnt = 0;
23 
24  public:
next()25   bool next() {
26     // TODO: find out better strategy
27     // TODO: try adaptive backoff
28     // TODO: different strategy one core cpu
29     // return false;
30 
31     cnt++;
32     if (cnt < 1) {  // 50
33       return true;
34     } else {
35       td::this_thread::yield();
36       return cnt < 3;  // 500
37     }
38   }
39 };
40 
41 class InfBackoff {
42   int cnt = 0;
43 
44  public:
next()45   bool next() {
46     cnt++;
47     if (cnt < 50) {
48       return true;
49     } else {
50       td::this_thread::yield();
51       return true;
52     }
53   }
54 };
55 
56 }  // namespace detail
57 
58 template <class T, int P = 10>
59 class SPSCBlockQueue {
60  public:
61   using ValueType = T;
62 
63  private:
buffer_size()64   static constexpr int buffer_size() {
65     static_assert(P >= 1 && P <= 20, "Bad size of BlockQueue");
66     return 1 << P;
67   }
68 
69   struct Position {
70     std::atomic<uint32> i{0};
71     char pad[64 - sizeof(std::atomic<uint32>)];
72     uint32 local_writer_i;
73     char pad2[64 - sizeof(uint32)];
74     uint32 local_reader_i;
75     char pad3[64 - sizeof(uint32)];
76 
initPosition77     void init() {
78       i = 0;
79       local_reader_i = 0;
80       local_writer_i = 0;
81     }
82   };
83 
84   typename std::aligned_storage<sizeof(ValueType)>::type data_[buffer_size()];
85   Position writer_;
86   Position reader_;
87 
fix_i(int i)88   static int fix_i(int i) {
89     return i & (buffer_size() - 1);
90   }
91 
at_ptr(int i)92   ValueType *at_ptr(int i) {
93     return reinterpret_cast<ValueType *>(&data_[fix_i(i)]);
94   }
95 
at(int i)96   ValueType &at(int i) {
97     return *at_ptr(i);
98   }
99 
100  public:
init()101   void init() {
102     writer_.init();
103     reader_.init();
104   }
105 
destroy()106   void destroy() {
107   }
108 
writer_size()109   int writer_size() {
110     return static_cast<int>(writer_.local_reader_i + buffer_size() - writer_.local_writer_i);
111   }
112 
writer_empty()113   bool writer_empty() {
114     return writer_.local_reader_i + buffer_size() == writer_.local_writer_i;
115   }
116 
117   template <class PutValueType>
writer_put_unsafe(PutValueType && value)118   void writer_put_unsafe(PutValueType &&value) {
119     at(writer_.local_writer_i++) = std::forward<PutValueType>(value);
120   }
121 
writer_update()122   int writer_update() {
123     writer_.local_reader_i = reader_.i.load(std::memory_order_acquire);
124     return writer_size();
125   }
126 
writer_flush()127   void writer_flush() {
128     writer_.i.store(writer_.local_writer_i, std::memory_order_release);
129   }
130 
reader_size()131   int reader_size() {
132     return static_cast<int>(reader_.local_writer_i - reader_.local_reader_i);
133   }
134 
reader_empty()135   int reader_empty() {
136     return reader_.local_writer_i == reader_.local_reader_i;
137   }
138 
reader_get_unsafe()139   ValueType reader_get_unsafe() {
140     return std::move(at(reader_.local_reader_i++));
141   }
142 
reader_update()143   int reader_update() {
144     reader_.local_writer_i = writer_.i.load(std::memory_order_acquire);
145     return reader_size();
146   }
147 
reader_flush()148   void reader_flush() {
149     reader_.i.store(reader_.local_reader_i, std::memory_order_release);
150   }
151 };
152 
153 template <class T, class BlockQueueT = SPSCBlockQueue<T>>
154 class SPSCChainQueue {
155  public:
156   using ValueType = T;
157 
init()158   void init() {
159     head_ = tail_ = create_node();
160   }
161 
162   SPSCChainQueue() = default;
163   SPSCChainQueue(const SPSCChainQueue &) = delete;
164   SPSCChainQueue &operator=(const SPSCChainQueue &) = delete;
165   SPSCChainQueue(SPSCChainQueue &&) = delete;
166   SPSCChainQueue &operator=(SPSCChainQueue &&) = delete;
~SPSCChainQueue()167   ~SPSCChainQueue() {
168     destroy();
169   }
170 
destroy()171   void destroy() {
172     while (head_ != nullptr) {
173       Node *to_delete = head_;
174       head_ = head_->next_;
175       delete_node(to_delete);
176     }
177     tail_ = nullptr;
178   }
179 
writer_size()180   int writer_size() {
181     return tail_->q_.writer_size();
182   }
183 
writer_empty()184   bool writer_empty() {
185     return tail_->q_.writer_empty();
186   }
187 
188   template <class PutValueType>
writer_put_unsafe(PutValueType && value)189   void writer_put_unsafe(PutValueType &&value) {
190     tail_->q_.writer_put_unsafe(std::forward<PutValueType>(value));
191   }
192 
writer_update()193   int writer_update() {
194     int res = tail_->q_.writer_update();
195     if (res != 0) {
196       return res;
197     }
198 
199     writer_flush();
200 
201     Node *new_tail = create_node();
202     tail_->next_ = new_tail;
203     tail_->is_closed_.store(true, std::memory_order_release);
204     tail_ = new_tail;
205     return tail_->q_.writer_update();
206   }
207 
writer_flush()208   void writer_flush() {
209     tail_->q_.writer_flush();
210   }
211 
reader_size()212   int reader_size() {
213     return head_->q_.reader_size();
214   }
215 
reader_empty()216   int reader_empty() {
217     return head_->q_.reader_empty();
218   }
219 
reader_get_unsafe()220   ValueType reader_get_unsafe() {
221     return std::move(head_->q_.reader_get_unsafe());
222   }
223 
reader_update()224   int reader_update() {
225     int res = head_->q_.reader_update();
226     if (res != 0) {
227       return res;
228     }
229 
230     if (!head_->is_closed_.load(std::memory_order_acquire)) {
231       return 0;
232     }
233 
234     res = head_->q_.reader_update();
235     if (res != 0) {
236       return res;
237     }
238 
239     // reader_flush();
240 
241     Node *old_head = head_;
242     head_ = head_->next_;
243     delete_node(old_head);
244 
245     return head_->q_.reader_update();
246   }
247 
reader_flush()248   void reader_flush() {
249     head_->q_.reader_flush();
250   }
251 
252  private:
253   struct Node {
254     BlockQueueT q_;
255     std::atomic<bool> is_closed_{false};
256     Node *next_;
257 
initNode258     void init() {
259       q_.init();
260       is_closed_ = false;
261       next_ = nullptr;
262     }
263 
destroyNode264     void destroy() {
265       q_.destroy();
266       next_ = nullptr;
267     }
268   };
269 
270   Node *head_;
271   char pad[64 - sizeof(Node *)];
272   Node *tail_;
273   char pad2[64 - sizeof(Node *)];
274 
create_node()275   Node *create_node() {
276     Node *res = new Node();
277     res->init();
278     return res;
279   }
280 
delete_node(Node * node)281   void delete_node(Node *node) {
282     node->destroy();
283     delete node;
284   }
285 };
286 
287 template <class T, class QueueT = SPSCChainQueue<T>, class BackoffT = detail::Backoff>
288 class BackoffQueue : public QueueT {
289  public:
290   using ValueType = T;
291 
292   template <class PutValueType>
writer_put(PutValueType && value)293   void writer_put(PutValueType &&value) {
294     if (this->writer_empty()) {
295       int sz = this->writer_update();
296       CHECK(sz != 0);
297     }
298     this->writer_put_unsafe(std::forward<PutValueType>(value));
299   }
300 
reader_wait()301   int reader_wait() {
302     BackoffT backoff;
303     int res = 0;
304     do {
305       res = this->reader_update();
306     } while (res == 0 && backoff.next());
307     return res;
308   }
309 };
310 
311 template <class T, class QueueT = SPSCChainQueue<T>>
312 using InfBackoffQueue = BackoffQueue<T, QueueT, detail::InfBackoff>;
313 
314 template <class T, class QueueT = BackoffQueue<T>>
315 class PollQueue final : public QueueT {
316  public:
317   using ValueType = T;
318   using QueueType = QueueT;
319 
init()320   void init() {
321     QueueType::init();
322     event_fd_.init();
323     wait_state_ = 0;
324     writer_wait_state_ = 0;
325   }
326 
327   PollQueue() = default;
328   PollQueue(const PollQueue &) = delete;
329   PollQueue &operator=(const PollQueue &) = delete;
330   PollQueue(PollQueue &&) = delete;
331   PollQueue &operator=(PollQueue &&) = delete;
~PollQueue()332   ~PollQueue() {
333     destroy_impl();
334   }
destroy()335   void destroy() {
336     destroy_impl();
337     QueueType::destroy();
338   }
339 
writer_flush()340   void writer_flush() {
341     int old_wait_state = get_wait_state();
342 
343     std::atomic_thread_fence(std::memory_order_seq_cst);
344 
345     QueueType::writer_flush();
346 
347     std::atomic_thread_fence(std::memory_order_seq_cst);
348 
349     int wait_state = get_wait_state();
350     if ((wait_state & 1) && wait_state != writer_wait_state_) {
351       event_fd_.release();
352       writer_wait_state_ = old_wait_state;
353     }
354   }
355 
reader_get_event_fd()356   EventFd &reader_get_event_fd() {
357     return event_fd_;
358   }
359 
360   // if 0 is returned than it is useless to rerun it before fd is
361   // ready to read.
reader_wait_nonblock()362   int reader_wait_nonblock() {
363     int res;
364 
365     if ((get_wait_state() & 1) == 0) {
366       res = this->QueueType::reader_wait();
367       if (res != 0) {
368         return res;
369       }
370 
371       inc_wait_state();
372 
373       std::atomic_thread_fence(std::memory_order_seq_cst);
374 
375       res = this->reader_update();
376       if (res != 0) {
377         inc_wait_state();
378         return res;
379       }
380     }
381 
382     event_fd_.acquire();
383     std::atomic_thread_fence(std::memory_order_seq_cst);
384     res = this->reader_update();
385     if (res != 0) {
386       inc_wait_state();
387     }
388     return res;
389   }
390 
391   // Just an example of usage
reader_wait()392   int reader_wait() {
393     int res;
394     while ((res = reader_wait_nonblock()) == 0) {
395       reader_get_event_fd().wait(1000);
396     }
397     return res;
398   }
399 
400  private:
401   EventFd event_fd_;
402   std::atomic<int> wait_state_{0};
403   int writer_wait_state_;
404 
get_wait_state()405   int get_wait_state() {
406     return wait_state_.load(std::memory_order_relaxed);
407   }
408 
inc_wait_state()409   void inc_wait_state() {
410     wait_state_.store(get_wait_state() + 1, std::memory_order_relaxed);
411   }
412 
destroy_impl()413   void destroy_impl() {
414     if (!event_fd_.empty()) {
415       event_fd_.close();
416     }
417   }
418 };
419 
420 }  // namespace td
421 
422 #else
423 
424 #include "td/utils/common.h"
425 
426 namespace td {
427 
428 // dummy implementation which shouldn't be used
429 
430 template <class T>
431 class PollQueue {
432  public:
433   using ValueType = T;
434 
init()435   void init() {
436     UNREACHABLE();
437   }
438 
439   template <class PutValueType>
writer_put(PutValueType && value)440   void writer_put(PutValueType &&value) {
441     UNREACHABLE();
442   }
443 
writer_flush()444   void writer_flush() {
445     UNREACHABLE();
446   }
447 
reader_wait_nonblock()448   int reader_wait_nonblock() {
449     UNREACHABLE();
450     return 0;
451   }
452 
reader_get_unsafe()453   ValueType reader_get_unsafe() {
454     UNREACHABLE();
455     return ValueType();
456   }
457 
reader_flush()458   void reader_flush() {
459     UNREACHABLE();
460   }
461 
462   PollQueue() = default;
463   PollQueue(const PollQueue &) = delete;
464   PollQueue &operator=(const PollQueue &) = delete;
465   PollQueue(PollQueue &&) = delete;
466   PollQueue &operator=(PollQueue &&) = delete;
467   ~PollQueue() = default;
468 };
469 
470 }  // namespace td
471 
472 #endif
473