1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <algorithm> 20 #include <atomic> 21 #include <climits> 22 #include <cmath> 23 #include <iomanip> 24 #include <iostream> 25 #include <mutex> 26 27 #include <folly/Random.h> 28 #include <folly/SpinLock.h> 29 #include <folly/ThreadLocal.h> 30 #include <folly/detail/Futex.h> 31 #include <folly/lang/Align.h> 32 #include <folly/synchronization/Hazptr.h> 33 #include <folly/synchronization/WaitOptions.h> 34 #include <folly/synchronization/detail/Spin.h> 35 36 /// ------ Concurrent Priority Queue Implementation ------ 37 // The concurrent priority queue implementation is based on the 38 // Mound data structure (Mounds: Array-Based Concurrent Priority Queues 39 // by Yujie Liu and Michael Spear, ICPP 2012) 40 // 41 /// --- Overview --- 42 // This relaxed implementation extends the Mound algorithm, and provides 43 // following features: 44 // - Arbitrary priorities. 45 // - Unbounded size. 46 // - Push, pop, empty, size functions. [TODO: Non-waiting and timed wait pop] 47 // - Supports blocking. 48 // - Fast and Scalable. 49 // 50 /// --- Mound --- 51 // A Mound is a heap where each element is a sorted linked list. 52 // First nodes in the lists maintain the heap property. Push randomly 53 // selects a leaf at the bottom level, then uses binary search to find 54 // a place to insert the new node to the head of the list. Pop gets 55 // the node from the head of the list at the root, then swap the 56 // list down until the heap feature holds. To use Mound in our 57 // implementation, we need to solve the following problems: 58 // - 1. Lack of general relaxed implementations. Mound is appealing 59 // for relaxed priority queue implementation because pop the whole 60 // list from the root is straightforward. One thread pops the list 61 // and following threads can pop from the list until its empty. 62 // Those pops only trigger one swap done operation. Thus reduce 63 // the latency for pop and reduce the contention for Mound. 64 // The difficulty is to provide a scalable and fast mechanism 65 // to let threads concurrently get elements from the list. 66 // - 2. Lack of control of list length. The length for every 67 // lists is critical for the performance. Mound suffers from not 68 // only the extreme cases(Push with increasing priorities, Mound 69 // becomes a sorted linked list; Push with decreasing priorities, 70 // Mound becomes to a regular heap), but also the common case(for 71 // random generated priorities, Mound degrades to the regular heap 72 // after millions of push/pop operations). The difficulty is to 73 // stabilize the list length without losing the accuracy and performance. 74 // - 3. Does not support blocking. Blocking is an important feature. 75 // Mound paper does not mention it. Designing the new algorithm for 76 // efficient blocking is challenging. 77 // - 4. Memory management. Mound allows optimistic reads. We need to 78 // protect the node from been reclaimed. 79 // 80 /// --- Design --- 81 // Our implementation extends Mound algorithm to support 82 // efficient relaxed pop. We employ a shared buffer algorithm to 83 // share the popped list. Our algorithm makes popping from shared 84 // buffer as fast as fetch_and_add. We improve the performance 85 // and compact the heap structure by stabilizing the size of each list. 86 // The implementation exposes the template parameter to set the 87 // preferred list length. Under the hood, we provide algorithms for 88 // fast inserting, pruning, and merging. The blocking algorithm is 89 // tricky. It allows one producer only wakes one consumer at a time. 90 // It also does not block the producer. For optimistic read, we use 91 // hazard pointer to protect the node from been reclaimed. We optimize the 92 // check-lock-check pattern by using test-test-and-set spin lock. 93 94 /// --- Template Parameters: --- 95 // 1. PopBatch could be 0 or a positive integer. 96 // If it is 0, only pop one node at a time. 97 // This is the strict implementation. It guarantees the return 98 // priority is alway the highest. If it is > 0, we keep 99 // up to that number of nodes in a shared buffer to be consumed by 100 // subsequent pop operations. 101 // 102 // 2. ListTargetSize represents the minimal length for the list. It 103 // solves the problem when inserting to Mound with 104 // decreasing priority order (degrade to a heap). Moreover, 105 // it maintains the Mound structure stable after trillions of 106 // operations, which causes unbalanced problem in the original 107 // Mound algorithm. We set the prunning length and merging lengtyh 108 // based on this parameter. 109 // 110 /// --- Interface --- 111 // void push(const T& val) 112 // void pop(T& val) 113 // size_t size() 114 // bool empty() 115 116 namespace folly { 117 118 template < 119 typename T, 120 bool MayBlock = false, 121 bool SupportsSize = false, 122 size_t PopBatch = 16, 123 size_t ListTargetSize = 25, 124 typename Mutex = folly::SpinLock, 125 template <typename> class Atom = std::atomic> 126 class RelaxedConcurrentPriorityQueue { 127 // Max height of the tree 128 static constexpr uint32_t MAX_LEVELS = 32; 129 // The default minimum value 130 static constexpr T MIN_VALUE = std::numeric_limits<T>::min(); 131 132 // Align size for the shared buffer node 133 static constexpr size_t Align = 1u << 7; 134 static constexpr int LevelForForceInsert = 3; 135 static constexpr int LevelForTraverseParent = 7; 136 137 static_assert(PopBatch <= 256, "PopBatch must be <= 256"); 138 static_assert( 139 ListTargetSize >= 1 && ListTargetSize <= 256, 140 "TargetSize must be in the range [1, 256]"); 141 142 // The maximal length for the list 143 static constexpr size_t PruningSize = ListTargetSize * 2; 144 // When pop from Mound, tree elements near the leaf 145 // level are likely be very small (the length of the list). When 146 // swapping down after pop a list, we check the size of the 147 // children to decide whether to merge them to their parent. 148 static constexpr size_t MergingSize = ListTargetSize; 149 150 /// List Node structure 151 struct Node : public folly::hazptr_obj_base<Node, Atom> { 152 Node* next; 153 T val; 154 }; 155 156 /// Mound Element (Tree node), head points to a linked list 157 struct MoundElement { 158 // Reading (head, size) without acquiring the lock 159 Atom<Node*> head; 160 Atom<size_t> size; 161 alignas(Align) Mutex lock; MoundElementMoundElement162 MoundElement() { // initializer 163 head.store(nullptr, std::memory_order_relaxed); 164 size.store(0, std::memory_order_relaxed); 165 } 166 }; 167 168 /// The pos strcture simplify the implementation 169 struct Position { 170 uint32_t level; 171 uint32_t index; 172 }; 173 174 /// Node for shared buffer should be aligned 175 struct BufferNode { 176 alignas(Align) Atom<Node*> pnode; 177 }; 178 179 /// Data members 180 181 // Mound structure -> 2D array to represent a tree 182 MoundElement* levels_[MAX_LEVELS]; 183 // Record the current leaf level (root is 0) 184 Atom<uint32_t> bottom_; 185 // It is used when expanding the tree 186 Atom<uint32_t> guard_; 187 188 // Mound with shared buffer 189 // Following two members are accessed by consumers 190 std::unique_ptr<BufferNode[]> shared_buffer_; 191 alignas(Align) Atom<int> top_loc_; 192 193 /// Blocking algorithm 194 // Numbers of futexs in the array 195 static constexpr size_t NumFutex = 128; 196 // The index gap for accessing futex in the array 197 static constexpr size_t Stride = 33; 198 std::unique_ptr<folly::detail::Futex<Atom>[]> futex_array_; 199 alignas(Align) Atom<uint32_t> cticket_; 200 alignas(Align) Atom<uint32_t> pticket_; 201 202 // Two counters to calculate size of the queue 203 alignas(Align) Atom<size_t> counter_p_; 204 alignas(Align) Atom<size_t> counter_c_; 205 206 public: 207 /// Constructor RelaxedConcurrentPriorityQueue()208 RelaxedConcurrentPriorityQueue() 209 : cticket_(1), pticket_(1), counter_p_(0), counter_c_(0) { 210 if (MayBlock) { 211 futex_array_.reset(new folly::detail::Futex<Atom>[NumFutex]); 212 } 213 214 if (PopBatch > 0) { 215 top_loc_ = -1; 216 shared_buffer_.reset(new BufferNode[PopBatch]); 217 for (size_t i = 0; i < PopBatch; i++) { 218 shared_buffer_[i].pnode = nullptr; 219 } 220 } 221 bottom_.store(0, std::memory_order_relaxed); 222 guard_.store(0, std::memory_order_relaxed); 223 // allocate the root MoundElement and initialize Mound 224 levels_[0] = new MoundElement[1]; // default MM for MoundElement 225 for (uint32_t i = 1; i < MAX_LEVELS; i++) { 226 levels_[i] = nullptr; 227 } 228 } 229 ~RelaxedConcurrentPriorityQueue()230 ~RelaxedConcurrentPriorityQueue() { 231 if (PopBatch > 0) { 232 deleteSharedBuffer(); 233 } 234 if (MayBlock) { 235 futex_array_.reset(); 236 } 237 Position pos; 238 pos.level = pos.index = 0; 239 deleteAllNodes(pos); 240 // default MM for MoundElement 241 for (int i = getBottomLevel(); i >= 0; i--) { 242 delete[] levels_[i]; 243 } 244 } 245 push(const T & val)246 void push(const T& val) { 247 moundPush(val); 248 if (SupportsSize) { 249 counter_p_.fetch_add(1, std::memory_order_relaxed); 250 } 251 } 252 pop(T & val)253 void pop(T& val) { 254 moundPop(val); 255 if (SupportsSize) { 256 counter_c_.fetch_add(1, std::memory_order_relaxed); 257 } 258 } 259 260 /// Note: size() and empty() are guaranteed to be accurate only if 261 /// the queue is not changed concurrently. 262 /// Returns an estimate of the size of the queue size()263 size_t size() { 264 DCHECK(SupportsSize); 265 size_t p = counter_p_.load(std::memory_order_acquire); 266 size_t c = counter_c_.load(std::memory_order_acquire); 267 return (p > c) ? p - c : 0; 268 } 269 270 /// Returns true only if the queue was empty during the call. empty()271 bool empty() { return isEmpty(); } 272 273 private: getBottomLevel()274 uint32_t getBottomLevel() { return bottom_.load(std::memory_order_acquire); } 275 276 /// This function is only called by the destructor deleteSharedBuffer()277 void deleteSharedBuffer() { 278 DCHECK(PopBatch > 0); 279 // delete nodes in the buffer 280 int loc = top_loc_.load(std::memory_order_relaxed); 281 while (loc >= 0) { 282 Node* n = shared_buffer_[loc--].pnode.load(std::memory_order_relaxed); 283 delete n; 284 } 285 // delete buffer 286 shared_buffer_.reset(); 287 } 288 289 /// This function is only called by the destructor deleteAllNodes(const Position & pos)290 void deleteAllNodes(const Position& pos) { 291 if (getElementSize(pos) == 0) { 292 // current list is empty, do not need to check 293 // its children again. 294 return; 295 } 296 297 Node* curList = getList(pos); 298 setTreeNode(pos, nullptr); 299 while (curList != nullptr) { // reclaim nodes 300 Node* n = curList; 301 curList = curList->next; 302 delete n; 303 } 304 305 if (!isLeaf(pos)) { 306 deleteAllNodes(leftOf(pos)); 307 deleteAllNodes(rightOf(pos)); 308 } 309 } 310 311 /// Check the first node in TreeElement keeps the heap structure. isHeap(const Position & pos)312 bool isHeap(const Position& pos) { 313 if (isLeaf(pos)) { 314 return true; 315 } 316 Position lchild = leftOf(pos); 317 Position rchild = rightOf(pos); 318 return isHeap(lchild) && isHeap(rchild) && 319 readValue(pos) >= readValue(lchild) && 320 readValue(pos) >= readValue(rchild); 321 } 322 323 /// Current position is leaf? isLeaf(const Position & pos)324 FOLLY_ALWAYS_INLINE bool isLeaf(const Position& pos) { 325 return pos.level == getBottomLevel(); 326 } 327 328 /// Current element is the root? isRoot(const Position & pos)329 FOLLY_ALWAYS_INLINE bool isRoot(const Position& pos) { 330 return pos.level == 0; 331 } 332 333 /// Locate the parent node parentOf(const Position & pos)334 FOLLY_ALWAYS_INLINE Position parentOf(const Position& pos) { 335 Position res; 336 res.level = pos.level - 1; 337 res.index = pos.index / 2; 338 return res; 339 } 340 341 /// Locate the left child leftOf(const Position & pos)342 FOLLY_ALWAYS_INLINE Position leftOf(const Position& pos) { 343 Position res; 344 res.level = pos.level + 1; 345 res.index = pos.index * 2; 346 return res; 347 } 348 349 /// Locate the right child rightOf(const Position & pos)350 FOLLY_ALWAYS_INLINE Position rightOf(const Position& pos) { 351 Position res; 352 res.level = pos.level + 1; 353 res.index = pos.index * 2 + 1; 354 return res; 355 } 356 357 /// get the list size in current MoundElement getElementSize(const Position & pos)358 FOLLY_ALWAYS_INLINE size_t getElementSize(const Position& pos) { 359 return levels_[pos.level][pos.index].size.load(std::memory_order_relaxed); 360 } 361 362 /// Set the size of current MoundElement setElementSize(const Position & pos,const uint32_t & v)363 FOLLY_ALWAYS_INLINE void setElementSize( 364 const Position& pos, const uint32_t& v) { 365 levels_[pos.level][pos.index].size.store(v, std::memory_order_relaxed); 366 } 367 368 /// Extend the tree level grow(uint32_t btm)369 void grow(uint32_t btm) { 370 while (true) { 371 if (guard_.fetch_add(1, std::memory_order_acq_rel) == 0) { 372 break; 373 } 374 // someone already expanded the tree 375 if (btm != getBottomLevel()) { 376 return; 377 } 378 std::this_thread::yield(); 379 } 380 // double check the bottom has not changed yet 381 if (btm != getBottomLevel()) { 382 guard_.store(0, std::memory_order_release); 383 return; 384 } 385 // create and initialize the new level 386 uint32_t tmp_btm = getBottomLevel(); 387 uint32_t size = 1 << (tmp_btm + 1); 388 MoundElement* new_level = new MoundElement[size]; // MM 389 levels_[tmp_btm + 1] = new_level; 390 bottom_.store(tmp_btm + 1, std::memory_order_release); 391 guard_.store(0, std::memory_order_release); 392 } 393 394 /// TODO: optimization 395 // This function is important, it selects a position to insert the 396 // node, there are two execution paths when this function returns. 397 // 1. It returns a position with head node has lower priority than the target. 398 // Thus it could be potentially used as the starting element to do the binary 399 // search to find the fit position. (slow path) 400 // 2. It returns a position, which is not the best fit. 401 // But it prevents aggressively grow the Mound. (fast path) selectPosition(const T & val,bool & path,uint32_t & seed,folly::hazptr_holder<Atom> & hptr)402 Position selectPosition( 403 const T& val, 404 bool& path, 405 uint32_t& seed, 406 folly::hazptr_holder<Atom>& hptr) { 407 while (true) { 408 uint32_t b = getBottomLevel(); 409 int bound = 1 << b; // number of elements in this level 410 int steps = 1 + b * b; // probe the length 411 ++seed; 412 uint32_t index = seed % bound; 413 414 for (int i = 0; i < steps; i++) { 415 int loc = (index + i) % bound; 416 Position pos; 417 pos.level = b; 418 pos.index = loc; 419 // the first round, we do the quick check 420 if (optimisticReadValue(pos, hptr) <= val) { 421 path = false; 422 seed = ++loc; 423 return pos; 424 } else if ( 425 b > LevelForForceInsert && getElementSize(pos) < ListTargetSize) { 426 // [fast path] conservative implementation 427 // it makes sure every tree element should 428 // have more than the given number of nodes. 429 seed = ++loc; 430 path = true; 431 return pos; 432 } 433 if (b != getBottomLevel()) { 434 break; 435 } 436 } 437 // failed too many times grow 438 if (b == getBottomLevel()) { 439 grow(b); 440 } 441 } 442 } 443 444 /// Swap two Tree Elements (head, size) swapList(const Position & a,const Position & b)445 void swapList(const Position& a, const Position& b) { 446 Node* tmp = getList(a); 447 setTreeNode(a, getList(b)); 448 setTreeNode(b, tmp); 449 450 // need to swap the tree node meta-data 451 uint32_t sa = getElementSize(a); 452 uint32_t sb = getElementSize(b); 453 setElementSize(a, sb); 454 setElementSize(b, sa); 455 } 456 lockNode(const Position & pos)457 FOLLY_ALWAYS_INLINE void lockNode(const Position& pos) { 458 levels_[pos.level][pos.index].lock.lock(); 459 } 460 unlockNode(const Position & pos)461 FOLLY_ALWAYS_INLINE void unlockNode(const Position& pos) { 462 levels_[pos.level][pos.index].lock.unlock(); 463 } 464 trylockNode(const Position & pos)465 FOLLY_ALWAYS_INLINE bool trylockNode(const Position& pos) { 466 return levels_[pos.level][pos.index].lock.try_lock(); 467 } 468 469 FOLLY_ALWAYS_INLINE T optimisticReadValue(const Position & pos,folly::hazptr_holder<Atom> & hptr)470 optimisticReadValue(const Position& pos, folly::hazptr_holder<Atom>& hptr) { 471 Node* tmp = hptr.protect(levels_[pos.level][pos.index].head); 472 return (tmp == nullptr) ? MIN_VALUE : tmp->val; 473 } 474 475 // Get the value from the head of the list as the elementvalue readValue(const Position & pos)476 FOLLY_ALWAYS_INLINE T readValue(const Position& pos) { 477 Node* tmp = getList(pos); 478 return (tmp == nullptr) ? MIN_VALUE : tmp->val; 479 } 480 getList(const Position & pos)481 FOLLY_ALWAYS_INLINE Node* getList(const Position& pos) { 482 return levels_[pos.level][pos.index].head.load(std::memory_order_acquire); 483 } 484 setTreeNode(const Position & pos,Node * t)485 FOLLY_ALWAYS_INLINE void setTreeNode(const Position& pos, Node* t) { 486 levels_[pos.level][pos.index].head.store(t, std::memory_order_release); 487 } 488 489 // Merge two sorted lists mergeList(Node * base,Node * source)490 Node* mergeList(Node* base, Node* source) { 491 if (base == nullptr) { 492 return source; 493 } else if (source == nullptr) { 494 return base; 495 } 496 497 Node *res, *p; 498 // choose the head node 499 if (base->val >= source->val) { 500 res = base; 501 base = base->next; 502 p = res; 503 } else { 504 res = source; 505 source = source->next; 506 p = res; 507 } 508 509 while (base != nullptr && source != nullptr) { 510 if (base->val >= source->val) { 511 p->next = base; 512 base = base->next; 513 } else { 514 p->next = source; 515 source = source->next; 516 } 517 p = p->next; 518 } 519 if (base == nullptr) { 520 p->next = source; 521 } else { 522 p->next = base; 523 } 524 return res; 525 } 526 527 /// Merge list t to the Element Position mergeListTo(const Position & pos,Node * t,const size_t & list_length)528 void mergeListTo(const Position& pos, Node* t, const size_t& list_length) { 529 Node* head = getList(pos); 530 setTreeNode(pos, mergeList(head, t)); 531 uint32_t ns = getElementSize(pos) + list_length; 532 setElementSize(pos, ns); 533 } 534 pruningLeaf(const Position & pos)535 bool pruningLeaf(const Position& pos) { 536 if (getElementSize(pos) <= PruningSize) { 537 unlockNode(pos); 538 return true; 539 } 540 541 int b = getBottomLevel(); 542 int leaves = 1 << b; 543 int cnodes = 0; 544 for (int i = 0; i < leaves; i++) { 545 Position tmp; 546 tmp.level = b; 547 tmp.index = i; 548 if (getElementSize(tmp) != 0) { 549 cnodes++; 550 } 551 if (cnodes > leaves * 2 / 3) { 552 break; 553 } 554 } 555 556 if (cnodes <= leaves * 2 / 3) { 557 unlockNode(pos); 558 return true; 559 } 560 return false; 561 } 562 563 /// Split the current list into two lists, 564 /// then split the tail list and merge to two children. startPruning(const Position & pos)565 void startPruning(const Position& pos) { 566 if (isLeaf(pos) && pruningLeaf(pos)) { 567 return; 568 } 569 570 // split the list, record the tail 571 Node* pruning_head = getList(pos); 572 int steps = ListTargetSize; // keep in the original list 573 for (int i = 0; i < steps - 1; i++) { 574 pruning_head = pruning_head->next; 575 } 576 Node* t = pruning_head; 577 pruning_head = pruning_head->next; 578 t->next = nullptr; 579 int tail_length = getElementSize(pos) - steps; 580 setElementSize(pos, steps); 581 582 // split the tail list into two lists 583 // evenly merge to two children 584 if (pos.level != getBottomLevel()) { 585 // split the rest into two lists 586 int left_length = (tail_length + 1) / 2; 587 int right_length = tail_length - left_length; 588 Node *to_right, *to_left = pruning_head; 589 for (int i = 0; i < left_length - 1; i++) { 590 pruning_head = pruning_head->next; 591 } 592 to_right = pruning_head->next; 593 pruning_head->next = nullptr; 594 595 Position lchild = leftOf(pos); 596 Position rchild = rightOf(pos); 597 if (left_length != 0) { 598 lockNode(lchild); 599 mergeListTo(lchild, to_left, left_length); 600 } 601 if (right_length != 0) { 602 lockNode(rchild); 603 mergeListTo(rchild, to_right, right_length); 604 } 605 unlockNode(pos); 606 if (left_length != 0 && getElementSize(lchild) > PruningSize) { 607 startPruning(lchild); 608 } else if (left_length != 0) { 609 unlockNode(lchild); 610 } 611 if (right_length != 0 && getElementSize(rchild) > PruningSize) { 612 startPruning(rchild); 613 } else if (right_length != 0) { 614 unlockNode(rchild); 615 } 616 } else { // time to grow the Mound 617 grow(pos.level); 618 // randomly choose a child to insert 619 if (steps % 2 == 1) { 620 Position rchild = rightOf(pos); 621 lockNode(rchild); 622 mergeListTo(rchild, pruning_head, tail_length); 623 unlockNode(pos); 624 unlockNode(rchild); 625 } else { 626 Position lchild = leftOf(pos); 627 lockNode(lchild); 628 mergeListTo(lchild, pruning_head, tail_length); 629 unlockNode(pos); 630 unlockNode(lchild); 631 } 632 } 633 } 634 635 // This function insert the new node (always) at the head of the 636 // current list. It needs to lock the parent & current 637 // This function may cause the list becoming tooooo long, so we 638 // provide pruning algorithm. regularInsert(const Position & pos,const T & val,Node * newNode)639 bool regularInsert(const Position& pos, const T& val, Node* newNode) { 640 // insert to the root node 641 if (isRoot(pos)) { 642 lockNode(pos); 643 T nv = readValue(pos); 644 if (LIKELY(nv <= val)) { 645 newNode->next = getList(pos); 646 setTreeNode(pos, newNode); 647 uint32_t sz = getElementSize(pos); 648 setElementSize(pos, sz + 1); 649 if (UNLIKELY(sz > PruningSize)) { 650 startPruning(pos); 651 } else { 652 unlockNode(pos); 653 } 654 return true; 655 } 656 unlockNode(pos); 657 return false; 658 } 659 660 // insert to an inner node 661 Position parent = parentOf(pos); 662 if (!trylockNode(parent)) { 663 return false; 664 } 665 if (!trylockNode(pos)) { 666 unlockNode(parent); 667 return false; 668 } 669 T pv = readValue(parent); 670 T nv = readValue(pos); 671 if (LIKELY(pv > val && nv <= val)) { 672 // improve the accuracy by getting the node(R) with less priority than the 673 // new value from parent level, insert the new node to the parent list 674 // and insert R to the current list. 675 // It only happens at >= LevelForTraverseParent for reducing contention 676 uint32_t sz = getElementSize(pos); 677 if (pos.level >= LevelForTraverseParent) { 678 Node* start = getList(parent); 679 while (start->next != nullptr && start->next->val >= val) { 680 start = start->next; 681 } 682 if (start->next != nullptr) { 683 newNode->next = start->next; 684 start->next = newNode; 685 while (start->next->next != nullptr) { 686 start = start->next; 687 } 688 newNode = start->next; 689 start->next = nullptr; 690 } 691 unlockNode(parent); 692 693 Node* curList = getList(pos); 694 if (curList == nullptr) { 695 newNode->next = nullptr; 696 setTreeNode(pos, newNode); 697 } else { 698 Node* p = curList; 699 if (p->val <= newNode->val) { 700 newNode->next = curList; 701 setTreeNode(pos, newNode); 702 } else { 703 while (p->next != nullptr && p->next->val >= newNode->val) { 704 p = p->next; 705 } 706 newNode->next = p->next; 707 p->next = newNode; 708 } 709 } 710 setElementSize(pos, sz + 1); 711 } else { 712 unlockNode(parent); 713 newNode->next = getList(pos); 714 setTreeNode(pos, newNode); 715 setElementSize(pos, sz + 1); 716 } 717 if (UNLIKELY(sz > PruningSize)) { 718 startPruning(pos); 719 } else { 720 unlockNode(pos); 721 } 722 return true; 723 } 724 unlockNode(parent); 725 unlockNode(pos); 726 return false; 727 } 728 forceInsertToRoot(Node * newNode)729 bool forceInsertToRoot(Node* newNode) { 730 Position pos; 731 pos.level = pos.index = 0; 732 std::unique_lock<Mutex> lck( 733 levels_[pos.level][pos.index].lock, std::try_to_lock); 734 if (!lck.owns_lock()) { 735 return false; 736 } 737 uint32_t sz = getElementSize(pos); 738 if (sz >= ListTargetSize) { 739 return false; 740 } 741 742 Node* curList = getList(pos); 743 if (curList == nullptr) { 744 newNode->next = nullptr; 745 setTreeNode(pos, newNode); 746 } else { 747 Node* p = curList; 748 if (p->val <= newNode->val) { 749 newNode->next = curList; 750 setTreeNode(pos, newNode); 751 } else { 752 while (p->next != nullptr && p->next->val >= newNode->val) { 753 p = p->next; 754 } 755 newNode->next = p->next; 756 p->next = newNode; 757 } 758 } 759 setElementSize(pos, sz + 1); 760 return true; 761 } 762 763 // This function forces the new node inserting to the current position 764 // if the element does not hold the enough nodes. It is safe to 765 // lock just one position to insert, because it won't be the first 766 // node to sustain the heap structure. forceInsert(const Position & pos,const T & val,Node * newNode)767 bool forceInsert(const Position& pos, const T& val, Node* newNode) { 768 if (isRoot(pos)) { 769 return forceInsertToRoot(newNode); 770 } 771 772 while (true) { 773 std::unique_lock<Mutex> lck( 774 levels_[pos.level][pos.index].lock, std::try_to_lock); 775 if (!lck.owns_lock()) { 776 if (getElementSize(pos) < ListTargetSize && readValue(pos) >= val) { 777 continue; 778 } else { 779 return false; 780 } 781 } 782 T nv = readValue(pos); 783 uint32_t sz = getElementSize(pos); 784 // do not allow the new node to be the first one 785 // do not allow the list size tooooo big 786 if (UNLIKELY(nv < val || sz >= ListTargetSize)) { 787 return false; 788 } 789 790 Node* p = getList(pos); 791 // find a place to insert the node 792 while (p->next != nullptr && p->next->val > val) { 793 p = p->next; 794 } 795 newNode->next = p->next; 796 p->next = newNode; 797 // do not forget to change the metadata 798 setElementSize(pos, sz + 1); 799 return true; 800 } 801 } 802 binarySearchPosition(Position & cur,const T & val,folly::hazptr_holder<Atom> & hptr)803 void binarySearchPosition( 804 Position& cur, const T& val, folly::hazptr_holder<Atom>& hptr) { 805 Position parent, mid; 806 if (cur.level == 0) { 807 return; 808 } 809 // start from the root 810 parent.level = parent.index = 0; 811 812 while (true) { // binary search 813 mid.level = (cur.level + parent.level) / 2; 814 mid.index = cur.index >> (cur.level - mid.level); 815 816 T mv = optimisticReadValue(mid, hptr); 817 if (val < mv) { 818 parent = mid; 819 } else { 820 cur = mid; 821 } 822 823 if (mid.level == 0 || // the root 824 ((parent.level + 1 == cur.level) && parent.level != 0)) { 825 return; 826 } 827 } 828 } 829 830 // The push keeps the length of each element stable moundPush(const T & val)831 void moundPush(const T& val) { 832 Position cur; 833 folly::hazptr_holder<Atom> hptr = folly::make_hazard_pointer<Atom>(); 834 Node* newNode = new Node; 835 newNode->val = val; 836 uint32_t seed = folly::Random::rand32() % (1 << 21); 837 838 while (true) { 839 // shell we go the fast path? 840 bool go_fast_path = false; 841 // chooice the right node to start 842 cur = selectPosition(val, go_fast_path, seed, hptr); 843 if (go_fast_path) { 844 if (LIKELY(forceInsert(cur, val, newNode))) { 845 if (MayBlock) { 846 blockingPushImpl(); 847 } 848 return; 849 } else { 850 continue; 851 } 852 } 853 854 binarySearchPosition(cur, val, hptr); 855 if (LIKELY(regularInsert(cur, val, newNode))) { 856 if (MayBlock) { 857 blockingPushImpl(); 858 } 859 return; 860 } 861 } 862 } 863 popToSharedBuffer(const uint32_t rsize,Node * head)864 int popToSharedBuffer(const uint32_t rsize, Node* head) { 865 Position pos; 866 pos.level = pos.index = 0; 867 868 int num = std::min(rsize, (uint32_t)PopBatch); 869 for (int i = num - 1; i >= 0; i--) { 870 // wait until this block is empty 871 while (shared_buffer_[i].pnode.load(std::memory_order_relaxed) != nullptr) 872 ; 873 shared_buffer_[i].pnode.store(head, std::memory_order_relaxed); 874 head = head->next; 875 } 876 if (num > 0) { 877 top_loc_.store(num - 1, std::memory_order_release); 878 } 879 setTreeNode(pos, head); 880 return rsize - num; 881 } 882 mergeDown(const Position & pos)883 void mergeDown(const Position& pos) { 884 if (isLeaf(pos)) { 885 unlockNode(pos); 886 return; 887 } 888 889 // acquire locks for L and R and compare 890 Position lchild = leftOf(pos); 891 Position rchild = rightOf(pos); 892 lockNode(lchild); 893 lockNode(rchild); 894 // read values 895 T nv = readValue(pos); 896 T lv = readValue(lchild); 897 T rv = readValue(rchild); 898 if (nv >= lv && nv >= rv) { 899 unlockNode(pos); 900 unlockNode(lchild); 901 unlockNode(rchild); 902 return; 903 } 904 905 // If two children contains nodes less than the 906 // threshold, we merge two children to the parent 907 // and do merge down on both of them. 908 size_t sum = 909 getElementSize(rchild) + getElementSize(lchild) + getElementSize(pos); 910 if (sum <= MergingSize) { 911 Node* l1 = mergeList(getList(rchild), getList(lchild)); 912 setTreeNode(pos, mergeList(l1, getList(pos))); 913 setElementSize(pos, sum); 914 setTreeNode(lchild, nullptr); 915 setElementSize(lchild, 0); 916 setTreeNode(rchild, nullptr); 917 setElementSize(rchild, 0); 918 unlockNode(pos); 919 mergeDown(lchild); 920 mergeDown(rchild); 921 return; 922 } 923 // pull from right 924 if (rv >= lv && rv > nv) { 925 swapList(rchild, pos); 926 unlockNode(pos); 927 unlockNode(lchild); 928 mergeDown(rchild); 929 } else if (lv >= rv && lv > nv) { 930 // pull from left 931 swapList(lchild, pos); 932 unlockNode(pos); 933 unlockNode(rchild); 934 mergeDown(lchild); 935 } 936 } 937 deferSettingRootSize(Position & pos)938 bool deferSettingRootSize(Position& pos) { 939 if (isLeaf(pos)) { 940 setElementSize(pos, 0); 941 unlockNode(pos); 942 return true; 943 } 944 945 // acquire locks for L and R and compare 946 Position lchild = leftOf(pos); 947 Position rchild = rightOf(pos); 948 lockNode(lchild); 949 lockNode(rchild); 950 if (getElementSize(lchild) == 0 && getElementSize(rchild) == 0) { 951 setElementSize(pos, 0); 952 unlockNode(pos); 953 unlockNode(lchild); 954 unlockNode(rchild); 955 return true; 956 } else { 957 // read values 958 T lv = readValue(lchild); 959 T rv = readValue(rchild); 960 if (lv >= rv) { 961 swapList(lchild, pos); 962 setElementSize(lchild, 0); 963 unlockNode(pos); 964 unlockNode(rchild); 965 pos = lchild; 966 } else { 967 swapList(rchild, pos); 968 setElementSize(rchild, 0); 969 unlockNode(pos); 970 unlockNode(lchild); 971 pos = rchild; 972 } 973 return false; 974 } 975 } 976 moundPopMany(T & val)977 bool moundPopMany(T& val) { 978 // pop from the root 979 Position pos; 980 pos.level = pos.index = 0; 981 // the root is nullptr, return false 982 Node* head = getList(pos); 983 if (head == nullptr) { 984 unlockNode(pos); 985 return false; 986 } 987 988 // shared buffer already filled by other threads 989 if (PopBatch > 0 && top_loc_.load(std::memory_order_acquire) >= 0) { 990 unlockNode(pos); 991 return false; 992 } 993 994 uint32_t sz = getElementSize(pos); 995 // get the one node first 996 val = head->val; 997 Node* p = head; 998 head = head->next; 999 sz--; 1000 1001 if (PopBatch > 0) { 1002 sz = popToSharedBuffer(sz, head); 1003 } else { 1004 setTreeNode(pos, head); 1005 } 1006 1007 bool done = false; 1008 if (LIKELY(sz == 0)) { 1009 done = deferSettingRootSize(pos); 1010 } else { 1011 setElementSize(pos, sz); 1012 } 1013 1014 if (LIKELY(!done)) { 1015 mergeDown(pos); 1016 } 1017 1018 p->retire(); 1019 return true; 1020 } 1021 blockingPushImpl()1022 void blockingPushImpl() { 1023 auto p = pticket_.fetch_add(1, std::memory_order_acq_rel); 1024 auto loc = getFutexArrayLoc(p); 1025 uint32_t curfutex = futex_array_[loc].load(std::memory_order_acquire); 1026 1027 while (true) { 1028 uint32_t ready = p << 1; // get the lower 31 bits 1029 // avoid the situation that push has larger ticket already set the value 1030 if (UNLIKELY( 1031 ready + 1 < curfutex || 1032 ((curfutex > ready) && (curfutex - ready > 0x40000000)))) { 1033 return; 1034 } 1035 1036 if (futex_array_[loc].compare_exchange_strong(curfutex, ready)) { 1037 if (curfutex & 1038 1) { // One or more consumers may be blocked on this futex 1039 detail::futexWake(&futex_array_[loc]); 1040 } 1041 return; 1042 } else { 1043 curfutex = futex_array_[loc].load(std::memory_order_acquire); 1044 } 1045 } 1046 } 1047 1048 // This could guarentee the Mound is empty isMoundEmpty()1049 FOLLY_ALWAYS_INLINE bool isMoundEmpty() { 1050 Position pos; 1051 pos.level = pos.index = 0; 1052 return getElementSize(pos) == 0; 1053 } 1054 1055 // Return true if the shared buffer is empty isSharedBufferEmpty()1056 FOLLY_ALWAYS_INLINE bool isSharedBufferEmpty() { 1057 return top_loc_.load(std::memory_order_acquire) < 0; 1058 } 1059 isEmpty()1060 FOLLY_ALWAYS_INLINE bool isEmpty() { 1061 if (PopBatch > 0) { 1062 return isMoundEmpty() && isSharedBufferEmpty(); 1063 } 1064 return isMoundEmpty(); 1065 } 1066 futexIsReady(const size_t & curticket)1067 FOLLY_ALWAYS_INLINE bool futexIsReady(const size_t& curticket) { 1068 auto loc = getFutexArrayLoc(curticket); 1069 auto curfutex = futex_array_[loc].load(std::memory_order_acquire); 1070 uint32_t short_cticket = curticket & 0x7FFFFFFF; 1071 uint32_t futex_ready = curfutex >> 1; 1072 // handle unsigned 31 bits overflow 1073 return futex_ready >= short_cticket || 1074 short_cticket - futex_ready > 0x40000000; 1075 } 1076 1077 template <typename Clock, typename Duration> 1078 FOLLY_NOINLINE bool trySpinBeforeBlock( 1079 const size_t& curticket, 1080 const std::chrono::time_point<Clock, Duration>& deadline, 1081 const folly::WaitOptions& opt = wait_options()) { 1082 return folly::detail::spin_pause_until(deadline, opt, [=] { 1083 return futexIsReady(curticket); 1084 }) == folly::detail::spin_result::success; 1085 } 1086 tryBlockingPop(const size_t & curticket)1087 void tryBlockingPop(const size_t& curticket) { 1088 auto loc = getFutexArrayLoc(curticket); 1089 auto curfutex = futex_array_[loc].load(std::memory_order_acquire); 1090 if (curfutex & 1091 1) { /// The last round consumers are still waiting, go to sleep 1092 detail::futexWait(&futex_array_[loc], curfutex); 1093 } 1094 if (trySpinBeforeBlock( 1095 curticket, 1096 std::chrono::time_point<std::chrono::steady_clock>::max())) { 1097 return; /// Spin until the push ticket is ready 1098 } 1099 while (true) { 1100 curfutex = futex_array_[loc].load(std::memory_order_acquire); 1101 if (curfutex & 1102 1) { /// The last round consumers are still waiting, go to sleep 1103 detail::futexWait(&futex_array_[loc], curfutex); 1104 } else if (!futexIsReady(curticket)) { // current ticket < pop ticket 1105 uint32_t blocking_futex = curfutex + 1; 1106 if (futex_array_[loc].compare_exchange_strong( 1107 curfutex, blocking_futex)) { 1108 detail::futexWait(&futex_array_[loc], blocking_futex); 1109 } 1110 } else { 1111 return; 1112 } 1113 } 1114 } 1115 blockingPopImpl()1116 void blockingPopImpl() { 1117 auto ct = cticket_.fetch_add(1, std::memory_order_acq_rel); 1118 // fast path check 1119 if (futexIsReady(ct)) { 1120 return; 1121 } 1122 // Blocking 1123 tryBlockingPop(ct); 1124 } 1125 tryPopFromMound(T & val)1126 bool tryPopFromMound(T& val) { 1127 if (isMoundEmpty()) { 1128 return false; 1129 } 1130 Position pos; 1131 pos.level = pos.index = 0; 1132 1133 // lock the root 1134 if (trylockNode(pos)) { 1135 return moundPopMany(val); 1136 } 1137 return false; 1138 } 1139 wait_options()1140 FOLLY_ALWAYS_INLINE static folly::WaitOptions wait_options() { return {}; } 1141 1142 template <typename Clock, typename Duration> 1143 FOLLY_NOINLINE bool tryWait( 1144 const std::chrono::time_point<Clock, Duration>& deadline, 1145 const folly::WaitOptions& opt = wait_options()) { 1146 // Fast path, by quick check the status 1147 switch (folly::detail::spin_pause_until( 1148 deadline, opt, [=] { return !isEmpty(); })) { 1149 case folly::detail::spin_result::success: 1150 return true; 1151 case folly::detail::spin_result::timeout: 1152 return false; 1153 case folly::detail::spin_result::advance: 1154 break; 1155 } 1156 1157 // Spinning strategy 1158 while (true) { 1159 auto res = 1160 folly::detail::spin_yield_until(deadline, [=] { return !isEmpty(); }); 1161 if (res == folly::detail::spin_result::success) { 1162 return true; 1163 } else if (res == folly::detail::spin_result::timeout) { 1164 return false; 1165 } 1166 } 1167 return true; 1168 } 1169 tryPopFromSharedBuffer(T & val)1170 bool tryPopFromSharedBuffer(T& val) { 1171 int get_or = -1; 1172 if (!isSharedBufferEmpty()) { 1173 get_or = top_loc_.fetch_sub(1, std::memory_order_acq_rel); 1174 if (get_or >= 0) { 1175 Node* c = shared_buffer_[get_or].pnode.load(std::memory_order_relaxed); 1176 shared_buffer_[get_or].pnode.store(nullptr, std::memory_order_release); 1177 val = c->val; 1178 c->retire(); 1179 return true; 1180 } 1181 } 1182 return false; 1183 } 1184 getFutexArrayLoc(size_t s)1185 size_t getFutexArrayLoc(size_t s) { 1186 return ((s - 1) * Stride) & (NumFutex - 1); 1187 } 1188 moundPop(T & val)1189 void moundPop(T& val) { 1190 if (MayBlock) { 1191 blockingPopImpl(); 1192 } 1193 1194 if (PopBatch > 0) { 1195 if (tryPopFromSharedBuffer(val)) { 1196 return; 1197 } 1198 } 1199 1200 while (true) { 1201 if (LIKELY(tryPopFromMound(val))) { 1202 return; 1203 } 1204 tryWait(std::chrono::time_point<std::chrono::steady_clock>::max()); 1205 if (PopBatch > 0 && tryPopFromSharedBuffer(val)) { 1206 return; 1207 } 1208 } 1209 } 1210 }; 1211 1212 } // namespace folly 1213