1 // 2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 // 7 #pragma once 8 9 #include "td/utils/port/EventFd.h" 10 #include "td/utils/port/thread.h" 11 12 #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED 13 14 #include <atomic> 15 #include <type_traits> 16 #include <utility> 17 18 namespace td { 19 20 namespace detail { 21 class Backoff { 22 int cnt = 0; 23 24 public: next()25 bool next() { 26 // TODO: find out better strategy 27 // TODO: try adaptive backoff 28 // TODO: different strategy one core cpu 29 // return false; 30 31 cnt++; 32 if (cnt < 1) { // 50 33 return true; 34 } else { 35 td::this_thread::yield(); 36 return cnt < 3; // 500 37 } 38 } 39 }; 40 41 class InfBackoff { 42 int cnt = 0; 43 44 public: next()45 bool next() { 46 cnt++; 47 if (cnt < 50) { 48 return true; 49 } else { 50 td::this_thread::yield(); 51 return true; 52 } 53 } 54 }; 55 56 } // namespace detail 57 58 template <class T, int P = 10> 59 class SPSCBlockQueue { 60 public: 61 using ValueType = T; 62 63 private: buffer_size()64 static constexpr int buffer_size() { 65 static_assert(P >= 1 && P <= 20, "Bad size of BlockQueue"); 66 return 1 << P; 67 } 68 69 struct Position { 70 std::atomic<uint32> i{0}; 71 char pad[64 - sizeof(std::atomic<uint32>)]; 72 uint32 local_writer_i; 73 char pad2[64 - sizeof(uint32)]; 74 uint32 local_reader_i; 75 char pad3[64 - sizeof(uint32)]; 76 initPosition77 void init() { 78 i = 0; 79 local_reader_i = 0; 80 local_writer_i = 0; 81 } 82 }; 83 84 typename std::aligned_storage<sizeof(ValueType)>::type data_[buffer_size()]; 85 Position writer_; 86 Position reader_; 87 fix_i(int i)88 static int fix_i(int i) { 89 return i & (buffer_size() - 1); 90 } 91 at_ptr(int i)92 ValueType *at_ptr(int i) { 93 return reinterpret_cast<ValueType *>(&data_[fix_i(i)]); 94 } 95 at(int i)96 ValueType &at(int i) { 97 return *at_ptr(i); 98 } 99 100 public: init()101 void init() { 102 writer_.init(); 103 reader_.init(); 104 } 105 destroy()106 void destroy() { 107 } 108 writer_size()109 int writer_size() { 110 return static_cast<int>(writer_.local_reader_i + buffer_size() - writer_.local_writer_i); 111 } 112 writer_empty()113 bool writer_empty() { 114 return writer_.local_reader_i + buffer_size() == writer_.local_writer_i; 115 } 116 117 template <class PutValueType> writer_put_unsafe(PutValueType && value)118 void writer_put_unsafe(PutValueType &&value) { 119 at(writer_.local_writer_i++) = std::forward<PutValueType>(value); 120 } 121 writer_update()122 int writer_update() { 123 writer_.local_reader_i = reader_.i.load(std::memory_order_acquire); 124 return writer_size(); 125 } 126 writer_flush()127 void writer_flush() { 128 writer_.i.store(writer_.local_writer_i, std::memory_order_release); 129 } 130 reader_size()131 int reader_size() { 132 return static_cast<int>(reader_.local_writer_i - reader_.local_reader_i); 133 } 134 reader_empty()135 int reader_empty() { 136 return reader_.local_writer_i == reader_.local_reader_i; 137 } 138 reader_get_unsafe()139 ValueType reader_get_unsafe() { 140 return std::move(at(reader_.local_reader_i++)); 141 } 142 reader_update()143 int reader_update() { 144 reader_.local_writer_i = writer_.i.load(std::memory_order_acquire); 145 return reader_size(); 146 } 147 reader_flush()148 void reader_flush() { 149 reader_.i.store(reader_.local_reader_i, std::memory_order_release); 150 } 151 }; 152 153 template <class T, class BlockQueueT = SPSCBlockQueue<T>> 154 class SPSCChainQueue { 155 public: 156 using ValueType = T; 157 init()158 void init() { 159 head_ = tail_ = create_node(); 160 } 161 162 SPSCChainQueue() = default; 163 SPSCChainQueue(const SPSCChainQueue &) = delete; 164 SPSCChainQueue &operator=(const SPSCChainQueue &) = delete; 165 SPSCChainQueue(SPSCChainQueue &&) = delete; 166 SPSCChainQueue &operator=(SPSCChainQueue &&) = delete; ~SPSCChainQueue()167 ~SPSCChainQueue() { 168 destroy(); 169 } 170 destroy()171 void destroy() { 172 while (head_ != nullptr) { 173 Node *to_delete = head_; 174 head_ = head_->next_; 175 delete_node(to_delete); 176 } 177 tail_ = nullptr; 178 } 179 writer_size()180 int writer_size() { 181 return tail_->q_.writer_size(); 182 } 183 writer_empty()184 bool writer_empty() { 185 return tail_->q_.writer_empty(); 186 } 187 188 template <class PutValueType> writer_put_unsafe(PutValueType && value)189 void writer_put_unsafe(PutValueType &&value) { 190 tail_->q_.writer_put_unsafe(std::forward<PutValueType>(value)); 191 } 192 writer_update()193 int writer_update() { 194 int res = tail_->q_.writer_update(); 195 if (res != 0) { 196 return res; 197 } 198 199 writer_flush(); 200 201 Node *new_tail = create_node(); 202 tail_->next_ = new_tail; 203 tail_->is_closed_.store(true, std::memory_order_release); 204 tail_ = new_tail; 205 return tail_->q_.writer_update(); 206 } 207 writer_flush()208 void writer_flush() { 209 tail_->q_.writer_flush(); 210 } 211 reader_size()212 int reader_size() { 213 return head_->q_.reader_size(); 214 } 215 reader_empty()216 int reader_empty() { 217 return head_->q_.reader_empty(); 218 } 219 reader_get_unsafe()220 ValueType reader_get_unsafe() { 221 return std::move(head_->q_.reader_get_unsafe()); 222 } 223 reader_update()224 int reader_update() { 225 int res = head_->q_.reader_update(); 226 if (res != 0) { 227 return res; 228 } 229 230 if (!head_->is_closed_.load(std::memory_order_acquire)) { 231 return 0; 232 } 233 234 res = head_->q_.reader_update(); 235 if (res != 0) { 236 return res; 237 } 238 239 // reader_flush(); 240 241 Node *old_head = head_; 242 head_ = head_->next_; 243 delete_node(old_head); 244 245 return head_->q_.reader_update(); 246 } 247 reader_flush()248 void reader_flush() { 249 head_->q_.reader_flush(); 250 } 251 252 private: 253 struct Node { 254 BlockQueueT q_; 255 std::atomic<bool> is_closed_{false}; 256 Node *next_; 257 initNode258 void init() { 259 q_.init(); 260 is_closed_ = false; 261 next_ = nullptr; 262 } 263 destroyNode264 void destroy() { 265 q_.destroy(); 266 next_ = nullptr; 267 } 268 }; 269 270 Node *head_; 271 char pad[64 - sizeof(Node *)]; 272 Node *tail_; 273 char pad2[64 - sizeof(Node *)]; 274 create_node()275 Node *create_node() { 276 Node *res = new Node(); 277 res->init(); 278 return res; 279 } 280 delete_node(Node * node)281 void delete_node(Node *node) { 282 node->destroy(); 283 delete node; 284 } 285 }; 286 287 template <class T, class QueueT = SPSCChainQueue<T>, class BackoffT = detail::Backoff> 288 class BackoffQueue : public QueueT { 289 public: 290 using ValueType = T; 291 292 template <class PutValueType> writer_put(PutValueType && value)293 void writer_put(PutValueType &&value) { 294 if (this->writer_empty()) { 295 int sz = this->writer_update(); 296 CHECK(sz != 0); 297 } 298 this->writer_put_unsafe(std::forward<PutValueType>(value)); 299 } 300 reader_wait()301 int reader_wait() { 302 BackoffT backoff; 303 int res = 0; 304 do { 305 res = this->reader_update(); 306 } while (res == 0 && backoff.next()); 307 return res; 308 } 309 }; 310 311 template <class T, class QueueT = SPSCChainQueue<T>> 312 using InfBackoffQueue = BackoffQueue<T, QueueT, detail::InfBackoff>; 313 314 template <class T, class QueueT = BackoffQueue<T>> 315 class PollQueue final : public QueueT { 316 public: 317 using ValueType = T; 318 using QueueType = QueueT; 319 init()320 void init() { 321 QueueType::init(); 322 event_fd_.init(); 323 wait_state_ = 0; 324 writer_wait_state_ = 0; 325 } 326 327 PollQueue() = default; 328 PollQueue(const PollQueue &) = delete; 329 PollQueue &operator=(const PollQueue &) = delete; 330 PollQueue(PollQueue &&) = delete; 331 PollQueue &operator=(PollQueue &&) = delete; ~PollQueue()332 ~PollQueue() { 333 destroy_impl(); 334 } destroy()335 void destroy() { 336 destroy_impl(); 337 QueueType::destroy(); 338 } 339 writer_flush()340 void writer_flush() { 341 int old_wait_state = get_wait_state(); 342 343 std::atomic_thread_fence(std::memory_order_seq_cst); 344 345 QueueType::writer_flush(); 346 347 std::atomic_thread_fence(std::memory_order_seq_cst); 348 349 int wait_state = get_wait_state(); 350 if ((wait_state & 1) && wait_state != writer_wait_state_) { 351 event_fd_.release(); 352 writer_wait_state_ = old_wait_state; 353 } 354 } 355 reader_get_event_fd()356 EventFd &reader_get_event_fd() { 357 return event_fd_; 358 } 359 360 // if 0 is returned than it is useless to rerun it before fd is 361 // ready to read. reader_wait_nonblock()362 int reader_wait_nonblock() { 363 int res; 364 365 if ((get_wait_state() & 1) == 0) { 366 res = this->QueueType::reader_wait(); 367 if (res != 0) { 368 return res; 369 } 370 371 inc_wait_state(); 372 373 std::atomic_thread_fence(std::memory_order_seq_cst); 374 375 res = this->reader_update(); 376 if (res != 0) { 377 inc_wait_state(); 378 return res; 379 } 380 } 381 382 event_fd_.acquire(); 383 std::atomic_thread_fence(std::memory_order_seq_cst); 384 res = this->reader_update(); 385 if (res != 0) { 386 inc_wait_state(); 387 } 388 return res; 389 } 390 391 // Just an example of usage reader_wait()392 int reader_wait() { 393 int res; 394 while ((res = reader_wait_nonblock()) == 0) { 395 reader_get_event_fd().wait(1000); 396 } 397 return res; 398 } 399 400 private: 401 EventFd event_fd_; 402 std::atomic<int> wait_state_{0}; 403 int writer_wait_state_; 404 get_wait_state()405 int get_wait_state() { 406 return wait_state_.load(std::memory_order_relaxed); 407 } 408 inc_wait_state()409 void inc_wait_state() { 410 wait_state_.store(get_wait_state() + 1, std::memory_order_relaxed); 411 } 412 destroy_impl()413 void destroy_impl() { 414 if (!event_fd_.empty()) { 415 event_fd_.close(); 416 } 417 } 418 }; 419 420 } // namespace td 421 422 #else 423 424 #include "td/utils/common.h" 425 426 namespace td { 427 428 // dummy implementation which shouldn't be used 429 430 template <class T> 431 class PollQueue { 432 public: 433 using ValueType = T; 434 init()435 void init() { 436 UNREACHABLE(); 437 } 438 439 template <class PutValueType> writer_put(PutValueType && value)440 void writer_put(PutValueType &&value) { 441 UNREACHABLE(); 442 } 443 writer_flush()444 void writer_flush() { 445 UNREACHABLE(); 446 } 447 reader_wait_nonblock()448 int reader_wait_nonblock() { 449 UNREACHABLE(); 450 return 0; 451 } 452 reader_get_unsafe()453 ValueType reader_get_unsafe() { 454 UNREACHABLE(); 455 return ValueType(); 456 } 457 reader_flush()458 void reader_flush() { 459 UNREACHABLE(); 460 } 461 462 PollQueue() = default; 463 PollQueue(const PollQueue &) = delete; 464 PollQueue &operator=(const PollQueue &) = delete; 465 PollQueue(PollQueue &&) = delete; 466 PollQueue &operator=(PollQueue &&) = delete; 467 ~PollQueue() = default; 468 }; 469 470 } // namespace td 471 472 #endif 473