1 /*
2  * Copyright 2011-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21 
22 /*
23 
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26 
27 Features:
28 
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33 
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40 
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45 
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51 
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57 
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60 
61 Caveats:
62 
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65 
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68 
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71 
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74 
75   5. Currently x64 only, due to use of MicroSpinLock.
76 
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79 
80 Sample usage:
81 
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99 
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107 
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119 
120 #pragma once
121 
122 #include <algorithm>
123 #include <atomic>
124 #include <limits>
125 #include <memory>
126 #include <type_traits>
127 
128 #include <boost/iterator/iterator_facade.hpp>
129 #include <glog/logging.h>
130 
131 #include <folly/ConcurrentSkipList-inl.h>
132 #include <folly/Likely.h>
133 #include <folly/Memory.h>
134 #include <folly/synchronization/MicroSpinLock.h>
135 
136 namespace folly {
137 
138 template <
139     typename T,
140     typename Comp = std::less<T>,
141     // All nodes are allocated using provided SysAllocator,
142     // it should be thread-safe.
143     typename NodeAlloc = SysAllocator<void>,
144     int MAX_HEIGHT = 24>
145 class ConcurrentSkipList {
146   // MAX_HEIGHT needs to be at least 2 to suppress compiler
147   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
148   // being treated as a scalar in the compiler).
149   static_assert(
150       MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
151       "MAX_HEIGHT can only be in the range of [2, 64)");
152   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
153   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
154 
155  public:
156   typedef detail::SkipListNode<T> NodeType;
157   typedef T value_type;
158   typedef T key_type;
159 
160   typedef detail::csl_iterator<value_type, NodeType> iterator;
161   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
162 
163   class Accessor;
164   class Skipper;
165 
ConcurrentSkipList(int height,const NodeAlloc & alloc)166   explicit ConcurrentSkipList(int height, const NodeAlloc& alloc)
167       : recycler_(alloc),
168         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
169         size_(0) {}
170 
ConcurrentSkipList(int height)171   explicit ConcurrentSkipList(int height)
172       : recycler_(),
173         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
174         size_(0) {}
175 
176   // Convenient function to get an Accessor to a new instance.
create(int height,const NodeAlloc & alloc)177   static Accessor create(int height, const NodeAlloc& alloc) {
178     return Accessor(createInstance(height, alloc));
179   }
180 
181   static Accessor create(int height = 1) {
182     return Accessor(createInstance(height));
183   }
184 
185   // Create a shared_ptr skiplist object with initial head height.
createInstance(int height,const NodeAlloc & alloc)186   static std::shared_ptr<SkipListType> createInstance(
187       int height,
188       const NodeAlloc& alloc) {
189     return std::make_shared<ConcurrentSkipList>(height, alloc);
190   }
191 
192   static std::shared_ptr<SkipListType> createInstance(int height = 1) {
193     return std::make_shared<ConcurrentSkipList>(height);
194   }
195 
196   //===================================================================
197   // Below are implementation details.
198   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
199   //===================================================================
200 
~ConcurrentSkipList()201   ~ConcurrentSkipList() {
202     if /* constexpr */ (NodeType::template DestroyIsNoOp<NodeAlloc>::value) {
203       // Avoid traversing the list if using arena allocator.
204       return;
205     }
206     for (NodeType* current = head_.load(std::memory_order_relaxed); current;) {
207       NodeType* tmp = current->skip(0);
208       NodeType::destroy(recycler_.alloc(), current);
209       current = tmp;
210     }
211   }
212 
213  private:
greater(const value_type & data,const NodeType * node)214   static bool greater(const value_type& data, const NodeType* node) {
215     return node && Comp()(node->data(), data);
216   }
217 
less(const value_type & data,const NodeType * node)218   static bool less(const value_type& data, const NodeType* node) {
219     return (node == nullptr) || Comp()(data, node->data());
220   }
221 
findInsertionPoint(NodeType * cur,int cur_layer,const value_type & data,NodeType * preds[],NodeType * succs[])222   static int findInsertionPoint(
223       NodeType* cur,
224       int cur_layer,
225       const value_type& data,
226       NodeType* preds[],
227       NodeType* succs[]) {
228     int foundLayer = -1;
229     NodeType* pred = cur;
230     NodeType* foundNode = nullptr;
231     for (int layer = cur_layer; layer >= 0; --layer) {
232       NodeType* node = pred->skip(layer);
233       while (greater(data, node)) {
234         pred = node;
235         node = node->skip(layer);
236       }
237       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
238         foundLayer = layer;
239         foundNode = node;
240       }
241       preds[layer] = pred;
242 
243       // if found, succs[0..foundLayer] need to point to the cached foundNode,
244       // as foundNode might be deleted at the same time thus pred->skip() can
245       // return nullptr or another node.
246       succs[layer] = foundNode ? foundNode : node;
247     }
248     return foundLayer;
249   }
250 
size()251   size_t size() const {
252     return size_.load(std::memory_order_relaxed);
253   }
254 
height()255   int height() const {
256     return head_.load(std::memory_order_consume)->height();
257   }
258 
maxLayer()259   int maxLayer() const {
260     return height() - 1;
261   }
262 
incrementSize(int delta)263   size_t incrementSize(int delta) {
264     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
265   }
266 
267   // Returns the node if found, nullptr otherwise.
find(const value_type & data)268   NodeType* find(const value_type& data) {
269     auto ret = findNode(data);
270     if (ret.second && !ret.first->markedForRemoval()) {
271       return ret.first;
272     }
273     return nullptr;
274   }
275 
276   // lock all the necessary nodes for changing (adding or removing) the list.
277   // returns true if all the lock acquried successfully and the related nodes
278   // are all validate (not in certain pending states), false otherwise.
279   bool lockNodesForChange(
280       int nodeHeight,
281       ScopedLocker guards[MAX_HEIGHT],
282       NodeType* preds[MAX_HEIGHT],
283       NodeType* succs[MAX_HEIGHT],
284       bool adding = true) {
285     NodeType *pred, *succ, *prevPred = nullptr;
286     bool valid = true;
287     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
288       pred = preds[layer];
289       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
290                               << " nodeheight=" << nodeHeight;
291       succ = succs[layer];
292       if (pred != prevPred) {
293         guards[layer] = pred->acquireGuard();
294         prevPred = pred;
295       }
296       valid = !pred->markedForRemoval() &&
297           pred->skip(layer) == succ; // check again after locking
298 
299       if (adding) { // when adding a node, the succ shouldn't be going away
300         valid = valid && (succ == nullptr || !succ->markedForRemoval());
301       }
302     }
303 
304     return valid;
305   }
306 
307   // Returns a paired value:
308   //   pair.first always stores the pointer to the node with the same input key.
309   //     It could be either the newly added data, or the existed data in the
310   //     list with the same key.
311   //   pair.second stores whether the data is added successfully:
312   //     0 means not added, otherwise reutrns the new size.
313   template <typename U>
addOrGetData(U && data)314   std::pair<NodeType*, size_t> addOrGetData(U&& data) {
315     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
316     NodeType* newNode;
317     size_t newSize;
318     while (true) {
319       int max_layer = 0;
320       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
321 
322       if (layer >= 0) {
323         NodeType* nodeFound = succs[layer];
324         DCHECK(nodeFound != nullptr);
325         if (nodeFound->markedForRemoval()) {
326           continue; // if it's getting deleted retry finding node.
327         }
328         // wait until fully linked.
329         while (UNLIKELY(!nodeFound->fullyLinked())) {
330         }
331         return std::make_pair(nodeFound, 0);
332       }
333 
334       // need to capped at the original height -- the real height may have grown
335       int nodeHeight =
336           detail::SkipListRandomHeight::instance()->getHeight(max_layer + 1);
337 
338       ScopedLocker guards[MAX_HEIGHT];
339       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
340         continue; // give up the locks and retry until all valid
341       }
342 
343       // locks acquired and all valid, need to modify the links under the locks.
344       newNode = NodeType::create(
345           recycler_.alloc(), nodeHeight, std::forward<U>(data));
346       for (int k = 0; k < nodeHeight; ++k) {
347         newNode->setSkip(k, succs[k]);
348         preds[k]->setSkip(k, newNode);
349       }
350 
351       newNode->setFullyLinked();
352       newSize = incrementSize(1);
353       break;
354     }
355 
356     int hgt = height();
357     size_t sizeLimit =
358         detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
359 
360     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
361       growHeight(hgt + 1);
362     }
363     CHECK_GT(newSize, 0);
364     return std::make_pair(newNode, newSize);
365   }
366 
remove(const value_type & data)367   bool remove(const value_type& data) {
368     NodeType* nodeToDelete = nullptr;
369     ScopedLocker nodeGuard;
370     bool isMarked = false;
371     int nodeHeight = 0;
372     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
373 
374     while (true) {
375       int max_layer = 0;
376       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
377       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
378         return false;
379       }
380 
381       if (!isMarked) {
382         nodeToDelete = succs[layer];
383         nodeHeight = nodeToDelete->height();
384         nodeGuard = nodeToDelete->acquireGuard();
385         if (nodeToDelete->markedForRemoval()) {
386           return false;
387         }
388         nodeToDelete->setMarkedForRemoval();
389         isMarked = true;
390       }
391 
392       // acquire pred locks from bottom layer up
393       ScopedLocker guards[MAX_HEIGHT];
394       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
395         continue; // this will unlock all the locks
396       }
397 
398       for (int k = nodeHeight - 1; k >= 0; --k) {
399         preds[k]->setSkip(k, nodeToDelete->skip(k));
400       }
401 
402       incrementSize(-1);
403       break;
404     }
405     recycle(nodeToDelete);
406     return true;
407   }
408 
first()409   const value_type* first() const {
410     auto node = head_.load(std::memory_order_consume)->skip(0);
411     return node ? &node->data() : nullptr;
412   }
413 
last()414   const value_type* last() const {
415     NodeType* pred = head_.load(std::memory_order_consume);
416     NodeType* node = nullptr;
417     for (int layer = maxLayer(); layer >= 0; --layer) {
418       do {
419         node = pred->skip(layer);
420         if (node) {
421           pred = node;
422         }
423       } while (node != nullptr);
424     }
425     return pred == head_.load(std::memory_order_relaxed) ? nullptr
426                                                          : &pred->data();
427   }
428 
okToDelete(NodeType * candidate,int layer)429   static bool okToDelete(NodeType* candidate, int layer) {
430     DCHECK(candidate != nullptr);
431     return candidate->fullyLinked() && candidate->maxLayer() == layer &&
432         !candidate->markedForRemoval();
433   }
434 
435   // find node for insertion/deleting
findInsertionPointGetMaxLayer(const value_type & data,NodeType * preds[],NodeType * succs[],int * max_layer)436   int findInsertionPointGetMaxLayer(
437       const value_type& data,
438       NodeType* preds[],
439       NodeType* succs[],
440       int* max_layer) const {
441     *max_layer = maxLayer();
442     return findInsertionPoint(
443         head_.load(std::memory_order_consume), *max_layer, data, preds, succs);
444   }
445 
446   // Find node for access. Returns a paired values:
447   // pair.first = the first node that no-less than data value
448   // pair.second = 1 when the data value is founded, or 0 otherwise.
449   // This is like lower_bound, but not exact: we could have the node marked for
450   // removal so still need to check that.
findNode(const value_type & data)451   std::pair<NodeType*, int> findNode(const value_type& data) const {
452     return findNodeDownRight(data);
453   }
454 
455   // Find node by first stepping down then stepping right. Based on benchmark
456   // results, this is slightly faster than findNodeRightDown for better
457   // localality on the skipping pointers.
findNodeDownRight(const value_type & data)458   std::pair<NodeType*, int> findNodeDownRight(const value_type& data) const {
459     NodeType* pred = head_.load(std::memory_order_consume);
460     int ht = pred->height();
461     NodeType* node = nullptr;
462 
463     bool found = false;
464     while (!found) {
465       // stepping down
466       for (; ht > 0 && less(data, node = pred->skip(ht - 1)); --ht) {
467       }
468       if (ht == 0) {
469         return std::make_pair(node, 0); // not found
470       }
471       // node <= data now, but we need to fix up ht
472       --ht;
473 
474       // stepping right
475       while (greater(data, node)) {
476         pred = node;
477         node = node->skip(ht);
478       }
479       found = !less(data, node);
480     }
481     return std::make_pair(node, found);
482   }
483 
484   // find node by first stepping right then stepping down.
485   // We still keep this for reference purposes.
findNodeRightDown(const value_type & data)486   std::pair<NodeType*, int> findNodeRightDown(const value_type& data) const {
487     NodeType* pred = head_.load(std::memory_order_consume);
488     NodeType* node = nullptr;
489     auto top = maxLayer();
490     int found = 0;
491     for (int layer = top; !found && layer >= 0; --layer) {
492       node = pred->skip(layer);
493       while (greater(data, node)) {
494         pred = node;
495         node = node->skip(layer);
496       }
497       found = !less(data, node);
498     }
499     return std::make_pair(node, found);
500   }
501 
lower_bound(const value_type & data)502   NodeType* lower_bound(const value_type& data) const {
503     auto node = findNode(data).first;
504     while (node != nullptr && node->markedForRemoval()) {
505       node = node->skip(0);
506     }
507     return node;
508   }
509 
growHeight(int height)510   void growHeight(int height) {
511     NodeType* oldHead = head_.load(std::memory_order_consume);
512     if (oldHead->height() >= height) { // someone else already did this
513       return;
514     }
515 
516     NodeType* newHead =
517         NodeType::create(recycler_.alloc(), height, value_type(), true);
518 
519     { // need to guard the head node in case others are adding/removing
520       // nodes linked to the head.
521       ScopedLocker g = oldHead->acquireGuard();
522       newHead->copyHead(oldHead);
523       NodeType* expected = oldHead;
524       if (!head_.compare_exchange_strong(
525               expected, newHead, std::memory_order_release)) {
526         // if someone has already done the swap, just return.
527         NodeType::destroy(recycler_.alloc(), newHead);
528         return;
529       }
530       oldHead->setMarkedForRemoval();
531     }
532     recycle(oldHead);
533   }
534 
recycle(NodeType * node)535   void recycle(NodeType* node) {
536     recycler_.add(node);
537   }
538 
539   detail::NodeRecycler<NodeType, NodeAlloc> recycler_;
540   std::atomic<NodeType*> head_;
541   std::atomic<size_t> size_;
542 };
543 
544 template <typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
545 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Accessor {
546   typedef detail::SkipListNode<T> NodeType;
547   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
548 
549  public:
550   typedef T value_type;
551   typedef T key_type;
552   typedef T& reference;
553   typedef T* pointer;
554   typedef const T& const_reference;
555   typedef const T* const_pointer;
556   typedef size_t size_type;
557   typedef Comp key_compare;
558   typedef Comp value_compare;
559 
560   typedef typename SkipListType::iterator iterator;
561   typedef typename SkipListType::const_iterator const_iterator;
562   typedef typename SkipListType::Skipper Skipper;
563 
Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)564   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
565       : slHolder_(std::move(skip_list)) {
566     sl_ = slHolder_.get();
567     DCHECK(sl_ != nullptr);
568     sl_->recycler_.addRef();
569   }
570 
571   // Unsafe initializer: the caller assumes the responsibility to keep
572   // skip_list valid during the whole life cycle of the Acessor.
Accessor(ConcurrentSkipList * skip_list)573   explicit Accessor(ConcurrentSkipList* skip_list) : sl_(skip_list) {
574     DCHECK(sl_ != nullptr);
575     sl_->recycler_.addRef();
576   }
577 
Accessor(const Accessor & accessor)578   Accessor(const Accessor& accessor)
579       : sl_(accessor.sl_), slHolder_(accessor.slHolder_) {
580     sl_->recycler_.addRef();
581   }
582 
583   Accessor& operator=(const Accessor& accessor) {
584     if (this != &accessor) {
585       slHolder_ = accessor.slHolder_;
586       sl_->recycler_.releaseRef();
587       sl_ = accessor.sl_;
588       sl_->recycler_.addRef();
589     }
590     return *this;
591   }
592 
~Accessor()593   ~Accessor() {
594     sl_->recycler_.releaseRef();
595   }
596 
empty()597   bool empty() const {
598     return sl_->size() == 0;
599   }
size()600   size_t size() const {
601     return sl_->size();
602   }
max_size()603   size_type max_size() const {
604     return std::numeric_limits<size_type>::max();
605   }
606 
607   // returns end() if the value is not in the list, otherwise returns an
608   // iterator pointing to the data, and it's guaranteed that the data is valid
609   // as far as the Accessor is hold.
find(const key_type & value)610   iterator find(const key_type& value) {
611     return iterator(sl_->find(value));
612   }
find(const key_type & value)613   const_iterator find(const key_type& value) const {
614     return iterator(sl_->find(value));
615   }
count(const key_type & data)616   size_type count(const key_type& data) const {
617     return contains(data);
618   }
619 
begin()620   iterator begin() const {
621     NodeType* head = sl_->head_.load(std::memory_order_consume);
622     return iterator(head->next());
623   }
end()624   iterator end() const {
625     return iterator(nullptr);
626   }
cbegin()627   const_iterator cbegin() const {
628     return begin();
629   }
cend()630   const_iterator cend() const {
631     return end();
632   }
633 
634   template <
635       typename U,
636       typename =
637           typename std::enable_if<std::is_convertible<U, T>::value>::type>
insert(U && data)638   std::pair<iterator, bool> insert(U&& data) {
639     auto ret = sl_->addOrGetData(std::forward<U>(data));
640     return std::make_pair(iterator(ret.first), ret.second);
641   }
erase(const key_type & data)642   size_t erase(const key_type& data) {
643     return remove(data);
644   }
645 
lower_bound(const key_type & data)646   iterator lower_bound(const key_type& data) const {
647     return iterator(sl_->lower_bound(data));
648   }
649 
height()650   size_t height() const {
651     return sl_->height();
652   }
653 
654   // first() returns pointer to the first element in the skiplist, or
655   // nullptr if empty.
656   //
657   // last() returns the pointer to the last element in the skiplist,
658   // nullptr if list is empty.
659   //
660   // Note: As concurrent writing can happen, first() is not
661   //   guaranteed to be the min_element() in the list. Similarly
662   //   last() is not guaranteed to be the max_element(), and both of them can
663   //   be invalid (i.e. nullptr), so we name them differently from front() and
664   //   tail() here.
first()665   const key_type* first() const {
666     return sl_->first();
667   }
last()668   const key_type* last() const {
669     return sl_->last();
670   }
671 
672   // Try to remove the last element in the skip list.
673   //
674   // Returns true if we removed it, false if either the list is empty
675   // or a race condition happened (i.e. the used-to-be last element
676   // was already removed by another thread).
pop_back()677   bool pop_back() {
678     auto last = sl_->last();
679     return last ? sl_->remove(*last) : false;
680   }
681 
addOrGetData(const key_type & data)682   std::pair<key_type*, bool> addOrGetData(const key_type& data) {
683     auto ret = sl_->addOrGetData(data);
684     return std::make_pair(&ret.first->data(), ret.second);
685   }
686 
skiplist()687   SkipListType* skiplist() const {
688     return sl_;
689   }
690 
691   // legacy interfaces
692   // TODO:(xliu) remove these.
693   // Returns true if the node is added successfully, false if not, i.e. the
694   // node with the same key already existed in the list.
contains(const key_type & data)695   bool contains(const key_type& data) const {
696     return sl_->find(data);
697   }
add(const key_type & data)698   bool add(const key_type& data) {
699     return sl_->addOrGetData(data).second;
700   }
remove(const key_type & data)701   bool remove(const key_type& data) {
702     return sl_->remove(data);
703   }
704 
705  private:
706   SkipListType* sl_;
707   std::shared_ptr<SkipListType> slHolder_;
708 };
709 
710 // implements forward iterator concept.
711 template <typename ValT, typename NodeT>
712 class detail::csl_iterator : public boost::iterator_facade<
713                                  csl_iterator<ValT, NodeT>,
714                                  ValT,
715                                  boost::forward_traversal_tag> {
716  public:
717   typedef ValT value_type;
718   typedef value_type& reference;
719   typedef value_type* pointer;
720   typedef ptrdiff_t difference_type;
721 
node_(node)722   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
723 
724   template <typename OtherVal, typename OtherNode>
725   csl_iterator(
726       const csl_iterator<OtherVal, OtherNode>& other,
727       typename std::enable_if<
728           std::is_convertible<OtherVal, ValT>::value>::type* = nullptr)
729       : node_(other.node_) {}
730 
nodeSize()731   size_t nodeSize() const {
732     return node_ == nullptr ? 0
733                             : node_->height() * sizeof(NodeT*) + sizeof(*this);
734   }
735 
good()736   bool good() const {
737     return node_ != nullptr;
738   }
739 
740  private:
741   friend class boost::iterator_core_access;
742   template <class, class>
743   friend class csl_iterator;
744 
increment()745   void increment() {
746     node_ = node_->next();
747   }
equal(const csl_iterator & other)748   bool equal(const csl_iterator& other) const {
749     return node_ == other.node_;
750   }
dereference()751   value_type& dereference() const {
752     return node_->data();
753   }
754 
755   NodeT* node_;
756 };
757 
758 // Skipper interface
759 template <typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
760 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Skipper {
761   typedef detail::SkipListNode<T> NodeType;
762   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
763   typedef typename SkipListType::Accessor Accessor;
764 
765  public:
766   typedef T value_type;
767   typedef T& reference;
768   typedef T* pointer;
769   typedef ptrdiff_t difference_type;
770 
Skipper(const std::shared_ptr<SkipListType> & skipList)771   Skipper(const std::shared_ptr<SkipListType>& skipList) : accessor_(skipList) {
772     init();
773   }
774 
Skipper(const Accessor & accessor)775   Skipper(const Accessor& accessor) : accessor_(accessor) {
776     init();
777   }
778 
init()779   void init() {
780     // need to cache the head node
781     NodeType* head_node = head();
782     headHeight_ = head_node->height();
783     for (int i = 0; i < headHeight_; ++i) {
784       preds_[i] = head_node;
785       succs_[i] = head_node->skip(i);
786     }
787     int max_layer = maxLayer();
788     for (int i = 0; i < max_layer; ++i) {
789       hints_[i] = uint8_t(i + 1);
790     }
791     hints_[max_layer] = max_layer;
792   }
793 
794   // advance to the next node in the list.
795   Skipper& operator++() {
796     preds_[0] = succs_[0];
797     succs_[0] = preds_[0]->skip(0);
798     int height = curHeight();
799     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
800       preds_[i] = succs_[i];
801       succs_[i] = preds_[i]->skip(i);
802     }
803     return *this;
804   }
805 
good()806   bool good() const {
807     return succs_[0] != nullptr;
808   }
809 
maxLayer()810   int maxLayer() const {
811     return headHeight_ - 1;
812   }
813 
curHeight()814   int curHeight() const {
815     // need to cap the height to the cached head height, as the current node
816     // might be some newly inserted node and also during the time period the
817     // head height may have grown.
818     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
819   }
820 
data()821   const value_type& data() const {
822     DCHECK(succs_[0] != nullptr);
823     return succs_[0]->data();
824   }
825 
826   value_type& operator*() const {
827     DCHECK(succs_[0] != nullptr);
828     return succs_[0]->data();
829   }
830 
831   value_type* operator->() {
832     DCHECK(succs_[0] != nullptr);
833     return &succs_[0]->data();
834   }
835 
836   /*
837    * Skip to the position whose data is no less than the parameter.
838    * (I.e. the lower_bound).
839    *
840    * Returns true if the data is found, false otherwise.
841    */
to(const value_type & data)842   bool to(const value_type& data) {
843     int layer = curHeight() - 1;
844     if (layer < 0) {
845       return false; // reaches the end of the list
846     }
847 
848     int lyr = hints_[layer];
849     int max_layer = maxLayer();
850     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
851       ++lyr;
852     }
853     hints_[layer] = lyr; // update the hint
854 
855     int foundLayer = SkipListType::findInsertionPoint(
856         preds_[lyr], lyr, data, preds_, succs_);
857     if (foundLayer < 0) {
858       return false;
859     }
860 
861     DCHECK(succs_[0] != nullptr)
862         << "lyr=" << lyr << "; max_layer=" << max_layer;
863     return !succs_[0]->markedForRemoval();
864   }
865 
866  private:
head()867   NodeType* head() const {
868     return accessor_.skiplist()->head_.load(std::memory_order_consume);
869   }
870 
871   Accessor accessor_;
872   int headHeight_;
873   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
874   uint8_t hints_[MAX_HEIGHT];
875 };
876 
877 } // namespace folly
878