1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <climits>
22 #include <cmath>
23 #include <iomanip>
24 #include <iostream>
25 #include <mutex>
26 
27 #include <folly/Random.h>
28 #include <folly/SpinLock.h>
29 #include <folly/ThreadLocal.h>
30 #include <folly/detail/Futex.h>
31 #include <folly/lang/Align.h>
32 #include <folly/synchronization/Hazptr.h>
33 #include <folly/synchronization/WaitOptions.h>
34 #include <folly/synchronization/detail/Spin.h>
35 
36 /// ------ Concurrent Priority Queue Implementation ------
37 // The concurrent priority queue implementation is based on the
38 // Mound data structure (Mounds: Array-Based Concurrent Priority Queues
39 // by Yujie Liu and Michael Spear, ICPP 2012)
40 //
41 /// --- Overview ---
42 // This relaxed implementation extends the Mound algorithm, and provides
43 // following features:
44 // - Arbitrary priorities.
45 // - Unbounded size.
46 // - Push, pop, empty, size functions. [TODO: Non-waiting and timed wait pop]
47 // - Supports blocking.
48 // - Fast and Scalable.
49 //
50 /// --- Mound ---
51 // A Mound is a heap where each element is a sorted linked list.
52 // First nodes in the lists maintain the heap property. Push randomly
53 // selects a leaf at the bottom level, then uses binary search to find
54 // a place to insert the new node to the head of the list. Pop gets
55 // the node from the head of the list at the root, then swap the
56 // list down until the heap feature holds. To use Mound in our
57 // implementation, we need to solve the following problems:
58 // - 1. Lack of general relaxed implementations. Mound is appealing
59 // for relaxed priority queue implementation because pop the whole
60 // list from the root is straightforward. One thread pops the list
61 // and following threads can pop from the list until its empty.
62 // Those pops only trigger one swap done operation. Thus reduce
63 // the latency for pop and reduce the contention for Mound.
64 // The difficulty is to provide a scalable and fast mechanism
65 // to let threads concurrently get elements from the list.
66 // - 2. Lack of control of list length. The length for every
67 // lists is critical for the performance. Mound suffers from not
68 // only the extreme cases(Push with increasing priorities, Mound
69 // becomes a sorted linked list; Push with decreasing priorities,
70 // Mound becomes to a regular heap), but also the common case(for
71 // random generated priorities, Mound degrades to the regular heap
72 // after millions of push/pop operations). The difficulty is to
73 // stabilize the list length without losing the accuracy and performance.
74 // - 3. Does not support blocking. Blocking is an important feature.
75 // Mound paper does not mention it. Designing the new algorithm for
76 // efficient blocking is challenging.
77 // - 4. Memory management. Mound allows optimistic reads. We need to
78 // protect the node from been reclaimed.
79 //
80 /// --- Design ---
81 // Our implementation extends Mound algorithm to support
82 // efficient relaxed pop. We employ a shared buffer algorithm to
83 // share the popped list. Our algorithm makes popping from shared
84 // buffer as fast as fetch_and_add. We improve the performance
85 // and compact the heap structure by stabilizing the size of each list.
86 // The implementation exposes the template parameter to set the
87 // preferred list length. Under the hood, we provide algorithms for
88 // fast inserting, pruning, and merging. The blocking algorithm is
89 // tricky. It allows one producer only wakes one consumer at a time.
90 // It also does not block the producer. For optimistic read, we use
91 // hazard pointer to protect the node from been reclaimed. We optimize the
92 // check-lock-check pattern by using test-test-and-set spin lock.
93 
94 /// --- Template Parameters: ---
95 // 1. PopBatch could be 0 or a positive integer.
96 // If it is 0, only pop one node at a time.
97 // This is the strict implementation. It guarantees the return
98 // priority is alway the highest.  If it is > 0, we keep
99 // up to that number of nodes in a shared buffer to be consumed by
100 // subsequent pop operations.
101 //
102 // 2. ListTargetSize represents the minimal length for the list. It
103 // solves the problem when inserting to Mound with
104 // decreasing priority order (degrade to a heap).  Moreover,
105 // it maintains the Mound structure stable after trillions of
106 // operations, which causes unbalanced problem in the original
107 // Mound algorithm. We set the prunning length and merging lengtyh
108 // based on this parameter.
109 //
110 /// --- Interface ---
111 //  void push(const T& val)
112 //  void pop(T& val)
113 //  size_t size()
114 //  bool empty()
115 
116 namespace folly {
117 
118 template <
119     typename T,
120     bool MayBlock = false,
121     bool SupportsSize = false,
122     size_t PopBatch = 16,
123     size_t ListTargetSize = 25,
124     typename Mutex = folly::SpinLock,
125     template <typename> class Atom = std::atomic>
126 class RelaxedConcurrentPriorityQueue {
127   // Max height of the tree
128   static constexpr uint32_t MAX_LEVELS = 32;
129   // The default minimum value
130   static constexpr T MIN_VALUE = std::numeric_limits<T>::min();
131 
132   // Align size for the shared buffer node
133   static constexpr size_t Align = 1u << 7;
134   static constexpr int LevelForForceInsert = 3;
135   static constexpr int LevelForTraverseParent = 7;
136 
137   static_assert(PopBatch <= 256, "PopBatch must be <= 256");
138   static_assert(
139       ListTargetSize >= 1 && ListTargetSize <= 256,
140       "TargetSize must be in the range [1, 256]");
141 
142   // The maximal length for the list
143   static constexpr size_t PruningSize = ListTargetSize * 2;
144   // When pop from Mound, tree elements near the leaf
145   // level are likely be very small (the length of the list). When
146   // swapping down after pop a list, we check the size of the
147   // children to decide whether to merge them to their parent.
148   static constexpr size_t MergingSize = ListTargetSize;
149 
150   /// List Node structure
151   struct Node : public folly::hazptr_obj_base<Node, Atom> {
152     Node* next;
153     T val;
154   };
155 
156   /// Mound Element (Tree node), head points to a linked list
157   struct MoundElement {
158     // Reading (head, size) without acquiring the lock
159     Atom<Node*> head;
160     Atom<size_t> size;
161     alignas(Align) Mutex lock;
MoundElementMoundElement162     MoundElement() { // initializer
163       head.store(nullptr, std::memory_order_relaxed);
164       size.store(0, std::memory_order_relaxed);
165     }
166   };
167 
168   /// The pos strcture simplify the implementation
169   struct Position {
170     uint32_t level;
171     uint32_t index;
172   };
173 
174   /// Node for shared buffer should be aligned
175   struct BufferNode {
176     alignas(Align) Atom<Node*> pnode;
177   };
178 
179   /// Data members
180 
181   // Mound structure -> 2D array to represent a tree
182   MoundElement* levels_[MAX_LEVELS];
183   // Record the current leaf level (root is 0)
184   Atom<uint32_t> bottom_;
185   // It is used when expanding the tree
186   Atom<uint32_t> guard_;
187 
188   // Mound with shared buffer
189   // Following two members are accessed by consumers
190   std::unique_ptr<BufferNode[]> shared_buffer_;
191   alignas(Align) Atom<int> top_loc_;
192 
193   /// Blocking algorithm
194   // Numbers of futexs in the array
195   static constexpr size_t NumFutex = 128;
196   // The index gap for accessing futex in the array
197   static constexpr size_t Stride = 33;
198   std::unique_ptr<folly::detail::Futex<Atom>[]> futex_array_;
199   alignas(Align) Atom<uint32_t> cticket_;
200   alignas(Align) Atom<uint32_t> pticket_;
201 
202   // Two counters to calculate size of the queue
203   alignas(Align) Atom<size_t> counter_p_;
204   alignas(Align) Atom<size_t> counter_c_;
205 
206  public:
207   /// Constructor
RelaxedConcurrentPriorityQueue()208   RelaxedConcurrentPriorityQueue()
209       : cticket_(1), pticket_(1), counter_p_(0), counter_c_(0) {
210     if (MayBlock) {
211       futex_array_.reset(new folly::detail::Futex<Atom>[NumFutex]);
212     }
213 
214     if (PopBatch > 0) {
215       top_loc_ = -1;
216       shared_buffer_.reset(new BufferNode[PopBatch]);
217       for (size_t i = 0; i < PopBatch; i++) {
218         shared_buffer_[i].pnode = nullptr;
219       }
220     }
221     bottom_.store(0, std::memory_order_relaxed);
222     guard_.store(0, std::memory_order_relaxed);
223     // allocate the root MoundElement and initialize Mound
224     levels_[0] = new MoundElement[1]; // default MM for MoundElement
225     for (uint32_t i = 1; i < MAX_LEVELS; i++) {
226       levels_[i] = nullptr;
227     }
228   }
229 
~RelaxedConcurrentPriorityQueue()230   ~RelaxedConcurrentPriorityQueue() {
231     if (PopBatch > 0) {
232       deleteSharedBuffer();
233     }
234     if (MayBlock) {
235       futex_array_.reset();
236     }
237     Position pos;
238     pos.level = pos.index = 0;
239     deleteAllNodes(pos);
240     // default MM for MoundElement
241     for (int i = getBottomLevel(); i >= 0; i--) {
242       delete[] levels_[i];
243     }
244   }
245 
push(const T & val)246   void push(const T& val) {
247     moundPush(val);
248     if (SupportsSize) {
249       counter_p_.fetch_add(1, std::memory_order_relaxed);
250     }
251   }
252 
pop(T & val)253   void pop(T& val) {
254     moundPop(val);
255     if (SupportsSize) {
256       counter_c_.fetch_add(1, std::memory_order_relaxed);
257     }
258   }
259 
260   /// Note: size() and empty() are guaranteed to be accurate only if
261   ///       the queue is not changed concurrently.
262   /// Returns an estimate of the size of the queue
size()263   size_t size() {
264     DCHECK(SupportsSize);
265     size_t p = counter_p_.load(std::memory_order_acquire);
266     size_t c = counter_c_.load(std::memory_order_acquire);
267     return (p > c) ? p - c : 0;
268   }
269 
270   /// Returns true only if the queue was empty during the call.
empty()271   bool empty() { return isEmpty(); }
272 
273  private:
getBottomLevel()274   uint32_t getBottomLevel() { return bottom_.load(std::memory_order_acquire); }
275 
276   /// This function is only called by the destructor
deleteSharedBuffer()277   void deleteSharedBuffer() {
278     DCHECK(PopBatch > 0);
279     // delete nodes in the buffer
280     int loc = top_loc_.load(std::memory_order_relaxed);
281     while (loc >= 0) {
282       Node* n = shared_buffer_[loc--].pnode.load(std::memory_order_relaxed);
283       delete n;
284     }
285     // delete buffer
286     shared_buffer_.reset();
287   }
288 
289   /// This function is only called by the destructor
deleteAllNodes(const Position & pos)290   void deleteAllNodes(const Position& pos) {
291     if (getElementSize(pos) == 0) {
292       // current list is empty, do not need to check
293       // its children again.
294       return;
295     }
296 
297     Node* curList = getList(pos);
298     setTreeNode(pos, nullptr);
299     while (curList != nullptr) { // reclaim nodes
300       Node* n = curList;
301       curList = curList->next;
302       delete n;
303     }
304 
305     if (!isLeaf(pos)) {
306       deleteAllNodes(leftOf(pos));
307       deleteAllNodes(rightOf(pos));
308     }
309   }
310 
311   /// Check the first node in TreeElement keeps the heap structure.
isHeap(const Position & pos)312   bool isHeap(const Position& pos) {
313     if (isLeaf(pos)) {
314       return true;
315     }
316     Position lchild = leftOf(pos);
317     Position rchild = rightOf(pos);
318     return isHeap(lchild) && isHeap(rchild) &&
319         readValue(pos) >= readValue(lchild) &&
320         readValue(pos) >= readValue(rchild);
321   }
322 
323   /// Current position is leaf?
isLeaf(const Position & pos)324   FOLLY_ALWAYS_INLINE bool isLeaf(const Position& pos) {
325     return pos.level == getBottomLevel();
326   }
327 
328   /// Current element is the root?
isRoot(const Position & pos)329   FOLLY_ALWAYS_INLINE bool isRoot(const Position& pos) {
330     return pos.level == 0;
331   }
332 
333   /// Locate the parent node
parentOf(const Position & pos)334   FOLLY_ALWAYS_INLINE Position parentOf(const Position& pos) {
335     Position res;
336     res.level = pos.level - 1;
337     res.index = pos.index / 2;
338     return res;
339   }
340 
341   /// Locate the left child
leftOf(const Position & pos)342   FOLLY_ALWAYS_INLINE Position leftOf(const Position& pos) {
343     Position res;
344     res.level = pos.level + 1;
345     res.index = pos.index * 2;
346     return res;
347   }
348 
349   /// Locate the right child
rightOf(const Position & pos)350   FOLLY_ALWAYS_INLINE Position rightOf(const Position& pos) {
351     Position res;
352     res.level = pos.level + 1;
353     res.index = pos.index * 2 + 1;
354     return res;
355   }
356 
357   /// get the list size in current MoundElement
getElementSize(const Position & pos)358   FOLLY_ALWAYS_INLINE size_t getElementSize(const Position& pos) {
359     return levels_[pos.level][pos.index].size.load(std::memory_order_relaxed);
360   }
361 
362   /// Set the size of current MoundElement
setElementSize(const Position & pos,const uint32_t & v)363   FOLLY_ALWAYS_INLINE void setElementSize(
364       const Position& pos, const uint32_t& v) {
365     levels_[pos.level][pos.index].size.store(v, std::memory_order_relaxed);
366   }
367 
368   /// Extend the tree level
grow(uint32_t btm)369   void grow(uint32_t btm) {
370     while (true) {
371       if (guard_.fetch_add(1, std::memory_order_acq_rel) == 0) {
372         break;
373       }
374       // someone already expanded the tree
375       if (btm != getBottomLevel()) {
376         return;
377       }
378       std::this_thread::yield();
379     }
380     // double check the bottom has not changed yet
381     if (btm != getBottomLevel()) {
382       guard_.store(0, std::memory_order_release);
383       return;
384     }
385     // create and initialize the new level
386     uint32_t tmp_btm = getBottomLevel();
387     uint32_t size = 1 << (tmp_btm + 1);
388     MoundElement* new_level = new MoundElement[size]; // MM
389     levels_[tmp_btm + 1] = new_level;
390     bottom_.store(tmp_btm + 1, std::memory_order_release);
391     guard_.store(0, std::memory_order_release);
392   }
393 
394   /// TODO: optimization
395   // This function is important, it selects a position to insert the
396   // node, there are two execution paths when this function returns.
397   // 1. It returns a position with head node has lower priority than the target.
398   // Thus it could be potentially used as the starting element to do the binary
399   // search to find the fit position.  (slow path)
400   // 2. It returns a position, which is not the best fit.
401   // But it prevents aggressively grow the Mound. (fast path)
selectPosition(const T & val,bool & path,uint32_t & seed,folly::hazptr_holder<Atom> & hptr)402   Position selectPosition(
403       const T& val,
404       bool& path,
405       uint32_t& seed,
406       folly::hazptr_holder<Atom>& hptr) {
407     while (true) {
408       uint32_t b = getBottomLevel();
409       int bound = 1 << b; // number of elements in this level
410       int steps = 1 + b * b; // probe the length
411       ++seed;
412       uint32_t index = seed % bound;
413 
414       for (int i = 0; i < steps; i++) {
415         int loc = (index + i) % bound;
416         Position pos;
417         pos.level = b;
418         pos.index = loc;
419         // the first round, we do the quick check
420         if (optimisticReadValue(pos, hptr) <= val) {
421           path = false;
422           seed = ++loc;
423           return pos;
424         } else if (
425             b > LevelForForceInsert && getElementSize(pos) < ListTargetSize) {
426           // [fast path] conservative implementation
427           // it makes sure every tree element should
428           // have more than the given number of nodes.
429           seed = ++loc;
430           path = true;
431           return pos;
432         }
433         if (b != getBottomLevel()) {
434           break;
435         }
436       }
437       // failed too many times grow
438       if (b == getBottomLevel()) {
439         grow(b);
440       }
441     }
442   }
443 
444   /// Swap two Tree Elements (head, size)
swapList(const Position & a,const Position & b)445   void swapList(const Position& a, const Position& b) {
446     Node* tmp = getList(a);
447     setTreeNode(a, getList(b));
448     setTreeNode(b, tmp);
449 
450     // need to swap the tree node meta-data
451     uint32_t sa = getElementSize(a);
452     uint32_t sb = getElementSize(b);
453     setElementSize(a, sb);
454     setElementSize(b, sa);
455   }
456 
lockNode(const Position & pos)457   FOLLY_ALWAYS_INLINE void lockNode(const Position& pos) {
458     levels_[pos.level][pos.index].lock.lock();
459   }
460 
unlockNode(const Position & pos)461   FOLLY_ALWAYS_INLINE void unlockNode(const Position& pos) {
462     levels_[pos.level][pos.index].lock.unlock();
463   }
464 
trylockNode(const Position & pos)465   FOLLY_ALWAYS_INLINE bool trylockNode(const Position& pos) {
466     return levels_[pos.level][pos.index].lock.try_lock();
467   }
468 
469   FOLLY_ALWAYS_INLINE T
optimisticReadValue(const Position & pos,folly::hazptr_holder<Atom> & hptr)470   optimisticReadValue(const Position& pos, folly::hazptr_holder<Atom>& hptr) {
471     Node* tmp = hptr.protect(levels_[pos.level][pos.index].head);
472     return (tmp == nullptr) ? MIN_VALUE : tmp->val;
473   }
474 
475   // Get the value from the head of the list as the elementvalue
readValue(const Position & pos)476   FOLLY_ALWAYS_INLINE T readValue(const Position& pos) {
477     Node* tmp = getList(pos);
478     return (tmp == nullptr) ? MIN_VALUE : tmp->val;
479   }
480 
getList(const Position & pos)481   FOLLY_ALWAYS_INLINE Node* getList(const Position& pos) {
482     return levels_[pos.level][pos.index].head.load(std::memory_order_acquire);
483   }
484 
setTreeNode(const Position & pos,Node * t)485   FOLLY_ALWAYS_INLINE void setTreeNode(const Position& pos, Node* t) {
486     levels_[pos.level][pos.index].head.store(t, std::memory_order_release);
487   }
488 
489   // Merge two sorted lists
mergeList(Node * base,Node * source)490   Node* mergeList(Node* base, Node* source) {
491     if (base == nullptr) {
492       return source;
493     } else if (source == nullptr) {
494       return base;
495     }
496 
497     Node *res, *p;
498     // choose the head node
499     if (base->val >= source->val) {
500       res = base;
501       base = base->next;
502       p = res;
503     } else {
504       res = source;
505       source = source->next;
506       p = res;
507     }
508 
509     while (base != nullptr && source != nullptr) {
510       if (base->val >= source->val) {
511         p->next = base;
512         base = base->next;
513       } else {
514         p->next = source;
515         source = source->next;
516       }
517       p = p->next;
518     }
519     if (base == nullptr) {
520       p->next = source;
521     } else {
522       p->next = base;
523     }
524     return res;
525   }
526 
527   /// Merge list t to the Element Position
mergeListTo(const Position & pos,Node * t,const size_t & list_length)528   void mergeListTo(const Position& pos, Node* t, const size_t& list_length) {
529     Node* head = getList(pos);
530     setTreeNode(pos, mergeList(head, t));
531     uint32_t ns = getElementSize(pos) + list_length;
532     setElementSize(pos, ns);
533   }
534 
pruningLeaf(const Position & pos)535   bool pruningLeaf(const Position& pos) {
536     if (getElementSize(pos) <= PruningSize) {
537       unlockNode(pos);
538       return true;
539     }
540 
541     int b = getBottomLevel();
542     int leaves = 1 << b;
543     int cnodes = 0;
544     for (int i = 0; i < leaves; i++) {
545       Position tmp;
546       tmp.level = b;
547       tmp.index = i;
548       if (getElementSize(tmp) != 0) {
549         cnodes++;
550       }
551       if (cnodes > leaves * 2 / 3) {
552         break;
553       }
554     }
555 
556     if (cnodes <= leaves * 2 / 3) {
557       unlockNode(pos);
558       return true;
559     }
560     return false;
561   }
562 
563   /// Split the current list into two lists,
564   /// then split the tail list and merge to two children.
startPruning(const Position & pos)565   void startPruning(const Position& pos) {
566     if (isLeaf(pos) && pruningLeaf(pos)) {
567       return;
568     }
569 
570     // split the list, record the tail
571     Node* pruning_head = getList(pos);
572     int steps = ListTargetSize; // keep in the original list
573     for (int i = 0; i < steps - 1; i++) {
574       pruning_head = pruning_head->next;
575     }
576     Node* t = pruning_head;
577     pruning_head = pruning_head->next;
578     t->next = nullptr;
579     int tail_length = getElementSize(pos) - steps;
580     setElementSize(pos, steps);
581 
582     // split the tail list into two lists
583     // evenly merge to two children
584     if (pos.level != getBottomLevel()) {
585       // split the rest into two lists
586       int left_length = (tail_length + 1) / 2;
587       int right_length = tail_length - left_length;
588       Node *to_right, *to_left = pruning_head;
589       for (int i = 0; i < left_length - 1; i++) {
590         pruning_head = pruning_head->next;
591       }
592       to_right = pruning_head->next;
593       pruning_head->next = nullptr;
594 
595       Position lchild = leftOf(pos);
596       Position rchild = rightOf(pos);
597       if (left_length != 0) {
598         lockNode(lchild);
599         mergeListTo(lchild, to_left, left_length);
600       }
601       if (right_length != 0) {
602         lockNode(rchild);
603         mergeListTo(rchild, to_right, right_length);
604       }
605       unlockNode(pos);
606       if (left_length != 0 && getElementSize(lchild) > PruningSize) {
607         startPruning(lchild);
608       } else if (left_length != 0) {
609         unlockNode(lchild);
610       }
611       if (right_length != 0 && getElementSize(rchild) > PruningSize) {
612         startPruning(rchild);
613       } else if (right_length != 0) {
614         unlockNode(rchild);
615       }
616     } else { // time to grow the Mound
617       grow(pos.level);
618       // randomly choose a child to insert
619       if (steps % 2 == 1) {
620         Position rchild = rightOf(pos);
621         lockNode(rchild);
622         mergeListTo(rchild, pruning_head, tail_length);
623         unlockNode(pos);
624         unlockNode(rchild);
625       } else {
626         Position lchild = leftOf(pos);
627         lockNode(lchild);
628         mergeListTo(lchild, pruning_head, tail_length);
629         unlockNode(pos);
630         unlockNode(lchild);
631       }
632     }
633   }
634 
635   // This function insert the new node (always) at the head of the
636   // current list. It needs to lock the parent & current
637   // This function may cause the list becoming tooooo long, so we
638   // provide pruning algorithm.
regularInsert(const Position & pos,const T & val,Node * newNode)639   bool regularInsert(const Position& pos, const T& val, Node* newNode) {
640     // insert to the root node
641     if (isRoot(pos)) {
642       lockNode(pos);
643       T nv = readValue(pos);
644       if (LIKELY(nv <= val)) {
645         newNode->next = getList(pos);
646         setTreeNode(pos, newNode);
647         uint32_t sz = getElementSize(pos);
648         setElementSize(pos, sz + 1);
649         if (UNLIKELY(sz > PruningSize)) {
650           startPruning(pos);
651         } else {
652           unlockNode(pos);
653         }
654         return true;
655       }
656       unlockNode(pos);
657       return false;
658     }
659 
660     // insert to an inner node
661     Position parent = parentOf(pos);
662     if (!trylockNode(parent)) {
663       return false;
664     }
665     if (!trylockNode(pos)) {
666       unlockNode(parent);
667       return false;
668     }
669     T pv = readValue(parent);
670     T nv = readValue(pos);
671     if (LIKELY(pv > val && nv <= val)) {
672       // improve the accuracy by getting the node(R) with less priority than the
673       // new value from parent level, insert the new node to the parent list
674       // and insert R to the current list.
675       // It only happens at >= LevelForTraverseParent for reducing contention
676       uint32_t sz = getElementSize(pos);
677       if (pos.level >= LevelForTraverseParent) {
678         Node* start = getList(parent);
679         while (start->next != nullptr && start->next->val >= val) {
680           start = start->next;
681         }
682         if (start->next != nullptr) {
683           newNode->next = start->next;
684           start->next = newNode;
685           while (start->next->next != nullptr) {
686             start = start->next;
687           }
688           newNode = start->next;
689           start->next = nullptr;
690         }
691         unlockNode(parent);
692 
693         Node* curList = getList(pos);
694         if (curList == nullptr) {
695           newNode->next = nullptr;
696           setTreeNode(pos, newNode);
697         } else {
698           Node* p = curList;
699           if (p->val <= newNode->val) {
700             newNode->next = curList;
701             setTreeNode(pos, newNode);
702           } else {
703             while (p->next != nullptr && p->next->val >= newNode->val) {
704               p = p->next;
705             }
706             newNode->next = p->next;
707             p->next = newNode;
708           }
709         }
710         setElementSize(pos, sz + 1);
711       } else {
712         unlockNode(parent);
713         newNode->next = getList(pos);
714         setTreeNode(pos, newNode);
715         setElementSize(pos, sz + 1);
716       }
717       if (UNLIKELY(sz > PruningSize)) {
718         startPruning(pos);
719       } else {
720         unlockNode(pos);
721       }
722       return true;
723     }
724     unlockNode(parent);
725     unlockNode(pos);
726     return false;
727   }
728 
forceInsertToRoot(Node * newNode)729   bool forceInsertToRoot(Node* newNode) {
730     Position pos;
731     pos.level = pos.index = 0;
732     std::unique_lock<Mutex> lck(
733         levels_[pos.level][pos.index].lock, std::try_to_lock);
734     if (!lck.owns_lock()) {
735       return false;
736     }
737     uint32_t sz = getElementSize(pos);
738     if (sz >= ListTargetSize) {
739       return false;
740     }
741 
742     Node* curList = getList(pos);
743     if (curList == nullptr) {
744       newNode->next = nullptr;
745       setTreeNode(pos, newNode);
746     } else {
747       Node* p = curList;
748       if (p->val <= newNode->val) {
749         newNode->next = curList;
750         setTreeNode(pos, newNode);
751       } else {
752         while (p->next != nullptr && p->next->val >= newNode->val) {
753           p = p->next;
754         }
755         newNode->next = p->next;
756         p->next = newNode;
757       }
758     }
759     setElementSize(pos, sz + 1);
760     return true;
761   }
762 
763   // This function forces the new node inserting to the current position
764   // if the element does not hold the enough nodes. It is safe to
765   // lock just one position to insert, because it won't be the first
766   // node to sustain the heap structure.
forceInsert(const Position & pos,const T & val,Node * newNode)767   bool forceInsert(const Position& pos, const T& val, Node* newNode) {
768     if (isRoot(pos)) {
769       return forceInsertToRoot(newNode);
770     }
771 
772     while (true) {
773       std::unique_lock<Mutex> lck(
774           levels_[pos.level][pos.index].lock, std::try_to_lock);
775       if (!lck.owns_lock()) {
776         if (getElementSize(pos) < ListTargetSize && readValue(pos) >= val) {
777           continue;
778         } else {
779           return false;
780         }
781       }
782       T nv = readValue(pos);
783       uint32_t sz = getElementSize(pos);
784       // do not allow the new node to be the first one
785       // do not allow the list size tooooo big
786       if (UNLIKELY(nv < val || sz >= ListTargetSize)) {
787         return false;
788       }
789 
790       Node* p = getList(pos);
791       // find a place to insert the node
792       while (p->next != nullptr && p->next->val > val) {
793         p = p->next;
794       }
795       newNode->next = p->next;
796       p->next = newNode;
797       // do not forget to change the metadata
798       setElementSize(pos, sz + 1);
799       return true;
800     }
801   }
802 
binarySearchPosition(Position & cur,const T & val,folly::hazptr_holder<Atom> & hptr)803   void binarySearchPosition(
804       Position& cur, const T& val, folly::hazptr_holder<Atom>& hptr) {
805     Position parent, mid;
806     if (cur.level == 0) {
807       return;
808     }
809     // start from the root
810     parent.level = parent.index = 0;
811 
812     while (true) { // binary search
813       mid.level = (cur.level + parent.level) / 2;
814       mid.index = cur.index >> (cur.level - mid.level);
815 
816       T mv = optimisticReadValue(mid, hptr);
817       if (val < mv) {
818         parent = mid;
819       } else {
820         cur = mid;
821       }
822 
823       if (mid.level == 0 || // the root
824           ((parent.level + 1 == cur.level) && parent.level != 0)) {
825         return;
826       }
827     }
828   }
829 
830   // The push keeps the length of each element stable
moundPush(const T & val)831   void moundPush(const T& val) {
832     Position cur;
833     folly::hazptr_holder<Atom> hptr = folly::make_hazard_pointer<Atom>();
834     Node* newNode = new Node;
835     newNode->val = val;
836     uint32_t seed = folly::Random::rand32() % (1 << 21);
837 
838     while (true) {
839       // shell we go the fast path?
840       bool go_fast_path = false;
841       // chooice the right node to start
842       cur = selectPosition(val, go_fast_path, seed, hptr);
843       if (go_fast_path) {
844         if (LIKELY(forceInsert(cur, val, newNode))) {
845           if (MayBlock) {
846             blockingPushImpl();
847           }
848           return;
849         } else {
850           continue;
851         }
852       }
853 
854       binarySearchPosition(cur, val, hptr);
855       if (LIKELY(regularInsert(cur, val, newNode))) {
856         if (MayBlock) {
857           blockingPushImpl();
858         }
859         return;
860       }
861     }
862   }
863 
popToSharedBuffer(const uint32_t rsize,Node * head)864   int popToSharedBuffer(const uint32_t rsize, Node* head) {
865     Position pos;
866     pos.level = pos.index = 0;
867 
868     int num = std::min(rsize, (uint32_t)PopBatch);
869     for (int i = num - 1; i >= 0; i--) {
870       // wait until this block is empty
871       while (shared_buffer_[i].pnode.load(std::memory_order_relaxed) != nullptr)
872         ;
873       shared_buffer_[i].pnode.store(head, std::memory_order_relaxed);
874       head = head->next;
875     }
876     if (num > 0) {
877       top_loc_.store(num - 1, std::memory_order_release);
878     }
879     setTreeNode(pos, head);
880     return rsize - num;
881   }
882 
mergeDown(const Position & pos)883   void mergeDown(const Position& pos) {
884     if (isLeaf(pos)) {
885       unlockNode(pos);
886       return;
887     }
888 
889     // acquire locks for L and R and compare
890     Position lchild = leftOf(pos);
891     Position rchild = rightOf(pos);
892     lockNode(lchild);
893     lockNode(rchild);
894     // read values
895     T nv = readValue(pos);
896     T lv = readValue(lchild);
897     T rv = readValue(rchild);
898     if (nv >= lv && nv >= rv) {
899       unlockNode(pos);
900       unlockNode(lchild);
901       unlockNode(rchild);
902       return;
903     }
904 
905     // If two children contains nodes less than the
906     // threshold, we merge two children to the parent
907     // and do merge down on both of them.
908     size_t sum =
909         getElementSize(rchild) + getElementSize(lchild) + getElementSize(pos);
910     if (sum <= MergingSize) {
911       Node* l1 = mergeList(getList(rchild), getList(lchild));
912       setTreeNode(pos, mergeList(l1, getList(pos)));
913       setElementSize(pos, sum);
914       setTreeNode(lchild, nullptr);
915       setElementSize(lchild, 0);
916       setTreeNode(rchild, nullptr);
917       setElementSize(rchild, 0);
918       unlockNode(pos);
919       mergeDown(lchild);
920       mergeDown(rchild);
921       return;
922     }
923     // pull from right
924     if (rv >= lv && rv > nv) {
925       swapList(rchild, pos);
926       unlockNode(pos);
927       unlockNode(lchild);
928       mergeDown(rchild);
929     } else if (lv >= rv && lv > nv) {
930       // pull from left
931       swapList(lchild, pos);
932       unlockNode(pos);
933       unlockNode(rchild);
934       mergeDown(lchild);
935     }
936   }
937 
deferSettingRootSize(Position & pos)938   bool deferSettingRootSize(Position& pos) {
939     if (isLeaf(pos)) {
940       setElementSize(pos, 0);
941       unlockNode(pos);
942       return true;
943     }
944 
945     // acquire locks for L and R and compare
946     Position lchild = leftOf(pos);
947     Position rchild = rightOf(pos);
948     lockNode(lchild);
949     lockNode(rchild);
950     if (getElementSize(lchild) == 0 && getElementSize(rchild) == 0) {
951       setElementSize(pos, 0);
952       unlockNode(pos);
953       unlockNode(lchild);
954       unlockNode(rchild);
955       return true;
956     } else {
957       // read values
958       T lv = readValue(lchild);
959       T rv = readValue(rchild);
960       if (lv >= rv) {
961         swapList(lchild, pos);
962         setElementSize(lchild, 0);
963         unlockNode(pos);
964         unlockNode(rchild);
965         pos = lchild;
966       } else {
967         swapList(rchild, pos);
968         setElementSize(rchild, 0);
969         unlockNode(pos);
970         unlockNode(lchild);
971         pos = rchild;
972       }
973       return false;
974     }
975   }
976 
moundPopMany(T & val)977   bool moundPopMany(T& val) {
978     // pop from the root
979     Position pos;
980     pos.level = pos.index = 0;
981     // the root is nullptr, return false
982     Node* head = getList(pos);
983     if (head == nullptr) {
984       unlockNode(pos);
985       return false;
986     }
987 
988     // shared buffer already filled by other threads
989     if (PopBatch > 0 && top_loc_.load(std::memory_order_acquire) >= 0) {
990       unlockNode(pos);
991       return false;
992     }
993 
994     uint32_t sz = getElementSize(pos);
995     // get the one node first
996     val = head->val;
997     Node* p = head;
998     head = head->next;
999     sz--;
1000 
1001     if (PopBatch > 0) {
1002       sz = popToSharedBuffer(sz, head);
1003     } else {
1004       setTreeNode(pos, head);
1005     }
1006 
1007     bool done = false;
1008     if (LIKELY(sz == 0)) {
1009       done = deferSettingRootSize(pos);
1010     } else {
1011       setElementSize(pos, sz);
1012     }
1013 
1014     if (LIKELY(!done)) {
1015       mergeDown(pos);
1016     }
1017 
1018     p->retire();
1019     return true;
1020   }
1021 
blockingPushImpl()1022   void blockingPushImpl() {
1023     auto p = pticket_.fetch_add(1, std::memory_order_acq_rel);
1024     auto loc = getFutexArrayLoc(p);
1025     uint32_t curfutex = futex_array_[loc].load(std::memory_order_acquire);
1026 
1027     while (true) {
1028       uint32_t ready = p << 1; // get the lower 31 bits
1029       // avoid the situation that push has larger ticket already set the value
1030       if (UNLIKELY(
1031               ready + 1 < curfutex ||
1032               ((curfutex > ready) && (curfutex - ready > 0x40000000)))) {
1033         return;
1034       }
1035 
1036       if (futex_array_[loc].compare_exchange_strong(curfutex, ready)) {
1037         if (curfutex &
1038             1) { // One or more consumers may be blocked on this futex
1039           detail::futexWake(&futex_array_[loc]);
1040         }
1041         return;
1042       } else {
1043         curfutex = futex_array_[loc].load(std::memory_order_acquire);
1044       }
1045     }
1046   }
1047 
1048   // This could guarentee the Mound is empty
isMoundEmpty()1049   FOLLY_ALWAYS_INLINE bool isMoundEmpty() {
1050     Position pos;
1051     pos.level = pos.index = 0;
1052     return getElementSize(pos) == 0;
1053   }
1054 
1055   // Return true if the shared buffer is empty
isSharedBufferEmpty()1056   FOLLY_ALWAYS_INLINE bool isSharedBufferEmpty() {
1057     return top_loc_.load(std::memory_order_acquire) < 0;
1058   }
1059 
isEmpty()1060   FOLLY_ALWAYS_INLINE bool isEmpty() {
1061     if (PopBatch > 0) {
1062       return isMoundEmpty() && isSharedBufferEmpty();
1063     }
1064     return isMoundEmpty();
1065   }
1066 
futexIsReady(const size_t & curticket)1067   FOLLY_ALWAYS_INLINE bool futexIsReady(const size_t& curticket) {
1068     auto loc = getFutexArrayLoc(curticket);
1069     auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
1070     uint32_t short_cticket = curticket & 0x7FFFFFFF;
1071     uint32_t futex_ready = curfutex >> 1;
1072     // handle unsigned 31 bits overflow
1073     return futex_ready >= short_cticket ||
1074         short_cticket - futex_ready > 0x40000000;
1075   }
1076 
1077   template <typename Clock, typename Duration>
1078   FOLLY_NOINLINE bool trySpinBeforeBlock(
1079       const size_t& curticket,
1080       const std::chrono::time_point<Clock, Duration>& deadline,
1081       const folly::WaitOptions& opt = wait_options()) {
1082     return folly::detail::spin_pause_until(deadline, opt, [=] {
1083              return futexIsReady(curticket);
1084            }) == folly::detail::spin_result::success;
1085   }
1086 
tryBlockingPop(const size_t & curticket)1087   void tryBlockingPop(const size_t& curticket) {
1088     auto loc = getFutexArrayLoc(curticket);
1089     auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
1090     if (curfutex &
1091         1) { /// The last round consumers are still waiting, go to sleep
1092       detail::futexWait(&futex_array_[loc], curfutex);
1093     }
1094     if (trySpinBeforeBlock(
1095             curticket,
1096             std::chrono::time_point<std::chrono::steady_clock>::max())) {
1097       return; /// Spin until the push ticket is ready
1098     }
1099     while (true) {
1100       curfutex = futex_array_[loc].load(std::memory_order_acquire);
1101       if (curfutex &
1102           1) { /// The last round consumers are still waiting, go to sleep
1103         detail::futexWait(&futex_array_[loc], curfutex);
1104       } else if (!futexIsReady(curticket)) { // current ticket < pop ticket
1105         uint32_t blocking_futex = curfutex + 1;
1106         if (futex_array_[loc].compare_exchange_strong(
1107                 curfutex, blocking_futex)) {
1108           detail::futexWait(&futex_array_[loc], blocking_futex);
1109         }
1110       } else {
1111         return;
1112       }
1113     }
1114   }
1115 
blockingPopImpl()1116   void blockingPopImpl() {
1117     auto ct = cticket_.fetch_add(1, std::memory_order_acq_rel);
1118     // fast path check
1119     if (futexIsReady(ct)) {
1120       return;
1121     }
1122     // Blocking
1123     tryBlockingPop(ct);
1124   }
1125 
tryPopFromMound(T & val)1126   bool tryPopFromMound(T& val) {
1127     if (isMoundEmpty()) {
1128       return false;
1129     }
1130     Position pos;
1131     pos.level = pos.index = 0;
1132 
1133     // lock the root
1134     if (trylockNode(pos)) {
1135       return moundPopMany(val);
1136     }
1137     return false;
1138   }
1139 
wait_options()1140   FOLLY_ALWAYS_INLINE static folly::WaitOptions wait_options() { return {}; }
1141 
1142   template <typename Clock, typename Duration>
1143   FOLLY_NOINLINE bool tryWait(
1144       const std::chrono::time_point<Clock, Duration>& deadline,
1145       const folly::WaitOptions& opt = wait_options()) {
1146     // Fast path, by quick check the status
1147     switch (folly::detail::spin_pause_until(
1148         deadline, opt, [=] { return !isEmpty(); })) {
1149       case folly::detail::spin_result::success:
1150         return true;
1151       case folly::detail::spin_result::timeout:
1152         return false;
1153       case folly::detail::spin_result::advance:
1154         break;
1155     }
1156 
1157     // Spinning strategy
1158     while (true) {
1159       auto res =
1160           folly::detail::spin_yield_until(deadline, [=] { return !isEmpty(); });
1161       if (res == folly::detail::spin_result::success) {
1162         return true;
1163       } else if (res == folly::detail::spin_result::timeout) {
1164         return false;
1165       }
1166     }
1167     return true;
1168   }
1169 
tryPopFromSharedBuffer(T & val)1170   bool tryPopFromSharedBuffer(T& val) {
1171     int get_or = -1;
1172     if (!isSharedBufferEmpty()) {
1173       get_or = top_loc_.fetch_sub(1, std::memory_order_acq_rel);
1174       if (get_or >= 0) {
1175         Node* c = shared_buffer_[get_or].pnode.load(std::memory_order_relaxed);
1176         shared_buffer_[get_or].pnode.store(nullptr, std::memory_order_release);
1177         val = c->val;
1178         c->retire();
1179         return true;
1180       }
1181     }
1182     return false;
1183   }
1184 
getFutexArrayLoc(size_t s)1185   size_t getFutexArrayLoc(size_t s) {
1186     return ((s - 1) * Stride) & (NumFutex - 1);
1187   }
1188 
moundPop(T & val)1189   void moundPop(T& val) {
1190     if (MayBlock) {
1191       blockingPopImpl();
1192     }
1193 
1194     if (PopBatch > 0) {
1195       if (tryPopFromSharedBuffer(val)) {
1196         return;
1197       }
1198     }
1199 
1200     while (true) {
1201       if (LIKELY(tryPopFromMound(val))) {
1202         return;
1203       }
1204       tryWait(std::chrono::time_point<std::chrono::steady_clock>::max());
1205       if (PopBatch > 0 && tryPopFromSharedBuffer(val)) {
1206         return;
1207       }
1208     }
1209   }
1210 };
1211 
1212 } // namespace folly
1213