1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.  Use of
7 // this source code is governed by a BSD-style license that can be found
8 // in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // InlineSkipList is derived from SkipList (skiplist.h), but it optimizes
11 // the memory layout by requiring that the key storage be allocated through
12 // the skip list instance.  For the common case of SkipList<const char*,
13 // Cmp> this saves 1 pointer per skip list node and gives better cache
14 // locality, at the expense of wasted padding from using AllocateAligned
15 // instead of Allocate for the keys.  The unused padding will be from
16 // 0 to sizeof(void*)-1 bytes, and the space savings are sizeof(void*)
17 // bytes, so despite the padding the space used is always less than
18 // SkipList<const char*, ..>.
19 //
20 // Thread safety -------------
21 //
22 // Writes via Insert require external synchronization, most likely a mutex.
23 // InsertConcurrently can be safely called concurrently with reads and
24 // with other concurrent inserts.  Reads require a guarantee that the
25 // InlineSkipList will not be destroyed while the read is in progress.
26 // Apart from that, reads progress without any internal locking or
27 // synchronization.
28 //
29 // Invariants:
30 //
31 // (1) Allocated nodes are never deleted until the InlineSkipList is
32 // destroyed.  This is trivially guaranteed by the code since we never
33 // delete any skip list nodes.
34 //
35 // (2) The contents of a Node except for the next/prev pointers are
36 // immutable after the Node has been linked into the InlineSkipList.
37 // Only Insert() modifies the list, and it is careful to initialize a
38 // node and use release-stores to publish the nodes in one or more lists.
39 //
40 // ... prev vs. next pointer ordering ...
41 //
42 
43 #pragma once
44 #include <assert.h>
45 #include <stdlib.h>
46 #include <algorithm>
47 #include <atomic>
48 #include <type_traits>
49 #include "memory/allocator.h"
50 #include "port/likely.h"
51 #include "port/port.h"
52 #include "rocksdb/slice.h"
53 #include "util/coding.h"
54 #include "util/random.h"
55 
56 namespace ROCKSDB_NAMESPACE {
57 
58 template <class Comparator>
59 class InlineSkipList {
60  private:
61   struct Node;
62   struct Splice;
63 
64  public:
65   using DecodedKey = \
66     typename std::remove_reference<Comparator>::type::DecodedType;
67 
68   static const uint16_t kMaxPossibleHeight = 32;
69 
70   // Create a new InlineSkipList object that will use "cmp" for comparing
71   // keys, and will allocate memory using "*allocator".  Objects allocated
72   // in the allocator must remain allocated for the lifetime of the
73   // skiplist object.
74   explicit InlineSkipList(Comparator cmp, Allocator* allocator,
75                           int32_t max_height = 12,
76                           int32_t branching_factor = 4);
77   // No copying allowed
78   InlineSkipList(const InlineSkipList&) = delete;
79   InlineSkipList& operator=(const InlineSkipList&) = delete;
80 
81   // Allocates a key and a skip-list node, returning a pointer to the key
82   // portion of the node.  This method is thread-safe if the allocator
83   // is thread-safe.
84   char* AllocateKey(size_t key_size);
85 
86   // Allocate a splice using allocator.
87   Splice* AllocateSplice();
88 
89   // Allocate a splice on heap.
90   Splice* AllocateSpliceOnHeap();
91 
92   // Inserts a key allocated by AllocateKey, after the actual key value
93   // has been filled in.
94   //
95   // REQUIRES: nothing that compares equal to key is currently in the list.
96   // REQUIRES: no concurrent calls to any of inserts.
97   bool Insert(const char* key);
98 
99   // Inserts a key allocated by AllocateKey with a hint of last insert
100   // position in the skip-list. If hint points to nullptr, a new hint will be
101   // populated, which can be used in subsequent calls.
102   //
103   // It can be used to optimize the workload where there are multiple groups
104   // of keys, and each key is likely to insert to a location close to the last
105   // inserted key in the same group. One example is sequential inserts.
106   //
107   // REQUIRES: nothing that compares equal to key is currently in the list.
108   // REQUIRES: no concurrent calls to any of inserts.
109   bool InsertWithHint(const char* key, void** hint);
110 
111   // Like InsertConcurrently, but with a hint
112   //
113   // REQUIRES: nothing that compares equal to key is currently in the list.
114   // REQUIRES: no concurrent calls that use same hint
115   bool InsertWithHintConcurrently(const char* key, void** hint);
116 
117   // Like Insert, but external synchronization is not required.
118   bool InsertConcurrently(const char* key);
119 
120   // Inserts a node into the skip list.  key must have been allocated by
121   // AllocateKey and then filled in by the caller.  If UseCAS is true,
122   // then external synchronization is not required, otherwise this method
123   // may not be called concurrently with any other insertions.
124   //
125   // Regardless of whether UseCAS is true, the splice must be owned
126   // exclusively by the current thread.  If allow_partial_splice_fix is
127   // true, then the cost of insertion is amortized O(log D), where D is
128   // the distance from the splice to the inserted key (measured as the
129   // number of intervening nodes).  Note that this bound is very good for
130   // sequential insertions!  If allow_partial_splice_fix is false then
131   // the existing splice will be ignored unless the current key is being
132   // inserted immediately after the splice.  allow_partial_splice_fix ==
133   // false has worse running time for the non-sequential case O(log N),
134   // but a better constant factor.
135   template <bool UseCAS>
136   bool Insert(const char* key, Splice* splice, bool allow_partial_splice_fix);
137 
138   // Returns true iff an entry that compares equal to key is in the list.
139   bool Contains(const char* key) const;
140 
141   // Return estimated number of entries smaller than `key`.
142   uint64_t EstimateCount(const char* key) const;
143 
144   // Validate correctness of the skip-list.
145   void TEST_Validate() const;
146 
147   // Iteration over the contents of a skip list
148   class Iterator {
149    public:
150     // Initialize an iterator over the specified list.
151     // The returned iterator is not valid.
152     explicit Iterator(const InlineSkipList* list);
153 
154     // Change the underlying skiplist used for this iterator
155     // This enables us not changing the iterator without deallocating
156     // an old one and then allocating a new one
157     void SetList(const InlineSkipList* list);
158 
159     // Returns true iff the iterator is positioned at a valid node.
160     bool Valid() const;
161 
162     // Returns the key at the current position.
163     // REQUIRES: Valid()
164     const char* key() const;
165 
166     // Advances to the next position.
167     // REQUIRES: Valid()
168     void Next();
169 
170     // Advances to the previous position.
171     // REQUIRES: Valid()
172     void Prev();
173 
174     // Advance to the first entry with a key >= target
175     void Seek(const char* target);
176 
177     // Retreat to the last entry with a key <= target
178     void SeekForPrev(const char* target);
179 
180     // Advance to a random entry in the list.
181     void RandomSeek();
182 
183     // Position at the first entry in list.
184     // Final state of iterator is Valid() iff list is not empty.
185     void SeekToFirst();
186 
187     // Position at the last entry in list.
188     // Final state of iterator is Valid() iff list is not empty.
189     void SeekToLast();
190 
191    private:
192     const InlineSkipList* list_;
193     Node* node_;
194     // Intentionally copyable
195   };
196 
197  private:
198   const uint16_t kMaxHeight_;
199   const uint16_t kBranching_;
200   const uint32_t kScaledInverseBranching_;
201 
202   Allocator* const allocator_;  // Allocator used for allocations of nodes
203   // Immutable after construction
204   Comparator const compare_;
205   Node* const head_;
206 
207   // Modified only by Insert().  Read racily by readers, but stale
208   // values are ok.
209   std::atomic<int> max_height_;  // Height of the entire list
210 
211   // seq_splice_ is a Splice used for insertions in the non-concurrent
212   // case.  It caches the prev and next found during the most recent
213   // non-concurrent insertion.
214   Splice* seq_splice_;
215 
GetMaxHeight()216   inline int GetMaxHeight() const {
217     return max_height_.load(std::memory_order_relaxed);
218   }
219 
220   int RandomHeight();
221 
222   Node* AllocateNode(size_t key_size, int height);
223 
Equal(const char * a,const char * b)224   bool Equal(const char* a, const char* b) const {
225     return (compare_(a, b) == 0);
226   }
227 
LessThan(const char * a,const char * b)228   bool LessThan(const char* a, const char* b) const {
229     return (compare_(a, b) < 0);
230   }
231 
232   // Return true if key is greater than the data stored in "n".  Null n
233   // is considered infinite.  n should not be head_.
234   bool KeyIsAfterNode(const char* key, Node* n) const;
235   bool KeyIsAfterNode(const DecodedKey& key, Node* n) const;
236 
237   // Returns the earliest node with a key >= key.
238   // Return nullptr if there is no such node.
239   Node* FindGreaterOrEqual(const char* key) const;
240 
241   // Return the latest node with a key < key.
242   // Return head_ if there is no such node.
243   // Fills prev[level] with pointer to previous node at "level" for every
244   // level in [0..max_height_-1], if prev is non-null.
245   Node* FindLessThan(const char* key, Node** prev = nullptr) const;
246 
247   // Return the latest node with a key < key on bottom_level. Start searching
248   // from root node on the level below top_level.
249   // Fills prev[level] with pointer to previous node at "level" for every
250   // level in [bottom_level..top_level-1], if prev is non-null.
251   Node* FindLessThan(const char* key, Node** prev, Node* root, int top_level,
252                      int bottom_level) const;
253 
254   // Return the last node in the list.
255   // Return head_ if list is empty.
256   Node* FindLast() const;
257 
258   // Returns a random entry.
259   Node* FindRandomEntry() const;
260 
261   // Traverses a single level of the list, setting *out_prev to the last
262   // node before the key and *out_next to the first node after. Assumes
263   // that the key is not present in the skip list. On entry, before should
264   // point to a node that is before the key, and after should point to
265   // a node that is after the key.  after should be nullptr if a good after
266   // node isn't conveniently available.
267   template<bool prefetch_before>
268   void FindSpliceForLevel(const DecodedKey& key, Node* before, Node* after, int level,
269                           Node** out_prev, Node** out_next);
270 
271   // Recomputes Splice levels from highest_level (inclusive) down to
272   // lowest_level (inclusive).
273   void RecomputeSpliceLevels(const DecodedKey& key, Splice* splice,
274                              int recompute_level);
275 };
276 
277 // Implementation details follow
278 
279 template <class Comparator>
280 struct InlineSkipList<Comparator>::Splice {
281   // The invariant of a Splice is that prev_[i+1].key <= prev_[i].key <
282   // next_[i].key <= next_[i+1].key for all i.  That means that if a
283   // key is bracketed by prev_[i] and next_[i] then it is bracketed by
284   // all higher levels.  It is _not_ required that prev_[i]->Next(i) ==
285   // next_[i] (it probably did at some point in the past, but intervening
286   // or concurrent operations might have inserted nodes in between).
287   int height_ = 0;
288   Node** prev_;
289   Node** next_;
290 };
291 
292 // The Node data type is more of a pointer into custom-managed memory than
293 // a traditional C++ struct.  The key is stored in the bytes immediately
294 // after the struct, and the next_ pointers for nodes with height > 1 are
295 // stored immediately _before_ the struct.  This avoids the need to include
296 // any pointer or sizing data, which reduces per-node memory overheads.
297 template <class Comparator>
298 struct InlineSkipList<Comparator>::Node {
299   // Stores the height of the node in the memory location normally used for
300   // next_[0].  This is used for passing data from AllocateKey to Insert.
301   void StashHeight(const int height) {
302     assert(sizeof(int) <= sizeof(next_[0]));
303     memcpy(static_cast<void*>(&next_[0]), &height, sizeof(int));
304   }
305 
306   // Retrieves the value passed to StashHeight.  Undefined after a call
307   // to SetNext or NoBarrier_SetNext.
308   int UnstashHeight() const {
309     int rv;
310     memcpy(&rv, &next_[0], sizeof(int));
311     return rv;
312   }
313 
314   const char* Key() const { return reinterpret_cast<const char*>(&next_[1]); }
315 
316   // Accessors/mutators for links.  Wrapped in methods so we can add
317   // the appropriate barriers as necessary, and perform the necessary
318   // addressing trickery for storing links below the Node in memory.
319   Node* Next(int n) {
320     assert(n >= 0);
321     // Use an 'acquire load' so that we observe a fully initialized
322     // version of the returned Node.
323     return ((&next_[0] - n)->load(std::memory_order_acquire));
324   }
325 
326   void SetNext(int n, Node* x) {
327     assert(n >= 0);
328     // Use a 'release store' so that anybody who reads through this
329     // pointer observes a fully initialized version of the inserted node.
330     (&next_[0] - n)->store(x, std::memory_order_release);
331   }
332 
333   bool CASNext(int n, Node* expected, Node* x) {
334     assert(n >= 0);
335     return (&next_[0] - n)->compare_exchange_strong(expected, x);
336   }
337 
338   // No-barrier variants that can be safely used in a few locations.
339   Node* NoBarrier_Next(int n) {
340     assert(n >= 0);
341     return (&next_[0] - n)->load(std::memory_order_relaxed);
342   }
343 
344   void NoBarrier_SetNext(int n, Node* x) {
345     assert(n >= 0);
346     (&next_[0] - n)->store(x, std::memory_order_relaxed);
347   }
348 
349   // Insert node after prev on specific level.
350   void InsertAfter(Node* prev, int level) {
351     // NoBarrier_SetNext() suffices since we will add a barrier when
352     // we publish a pointer to "this" in prev.
353     NoBarrier_SetNext(level, prev->NoBarrier_Next(level));
354     prev->SetNext(level, this);
355   }
356 
357  private:
358   // next_[0] is the lowest level link (level 0).  Higher levels are
359   // stored _earlier_, so level 1 is at next_[-1].
360   std::atomic<Node*> next_[1];
361 };
362 
363 template <class Comparator>
364 inline InlineSkipList<Comparator>::Iterator::Iterator(
365     const InlineSkipList* list) {
366   SetList(list);
367 }
368 
369 template <class Comparator>
370 inline void InlineSkipList<Comparator>::Iterator::SetList(
371     const InlineSkipList* list) {
372   list_ = list;
373   node_ = nullptr;
374 }
375 
376 template <class Comparator>
377 inline bool InlineSkipList<Comparator>::Iterator::Valid() const {
378   return node_ != nullptr;
379 }
380 
381 template <class Comparator>
382 inline const char* InlineSkipList<Comparator>::Iterator::key() const {
383   assert(Valid());
384   return node_->Key();
385 }
386 
387 template <class Comparator>
388 inline void InlineSkipList<Comparator>::Iterator::Next() {
389   assert(Valid());
390   node_ = node_->Next(0);
391 }
392 
393 template <class Comparator>
394 inline void InlineSkipList<Comparator>::Iterator::Prev() {
395   // Instead of using explicit "prev" links, we just search for the
396   // last node that falls before key.
397   assert(Valid());
398   node_ = list_->FindLessThan(node_->Key());
399   if (node_ == list_->head_) {
400     node_ = nullptr;
401   }
402 }
403 
404 template <class Comparator>
405 inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) {
406   node_ = list_->FindGreaterOrEqual(target);
407 }
408 
409 template <class Comparator>
410 inline void InlineSkipList<Comparator>::Iterator::SeekForPrev(
411     const char* target) {
412   Seek(target);
413   if (!Valid()) {
414     SeekToLast();
415   }
416   while (Valid() && list_->LessThan(target, key())) {
417     Prev();
418   }
419 }
420 
421 template <class Comparator>
422 inline void InlineSkipList<Comparator>::Iterator::RandomSeek() {
423   node_ = list_->FindRandomEntry();
424 }
425 
426 template <class Comparator>
427 inline void InlineSkipList<Comparator>::Iterator::SeekToFirst() {
428   node_ = list_->head_->Next(0);
429 }
430 
431 template <class Comparator>
432 inline void InlineSkipList<Comparator>::Iterator::SeekToLast() {
433   node_ = list_->FindLast();
434   if (node_ == list_->head_) {
435     node_ = nullptr;
436   }
437 }
438 
439 template <class Comparator>
440 int InlineSkipList<Comparator>::RandomHeight() {
441   auto rnd = Random::GetTLSInstance();
442 
443   // Increase height with probability 1 in kBranching
444   int height = 1;
445   while (height < kMaxHeight_ && height < kMaxPossibleHeight &&
446          rnd->Next() < kScaledInverseBranching_) {
447     height++;
448   }
449   assert(height > 0);
450   assert(height <= kMaxHeight_);
451   assert(height <= kMaxPossibleHeight);
452   return height;
453 }
454 
455 template <class Comparator>
456 bool InlineSkipList<Comparator>::KeyIsAfterNode(const char* key,
457                                                 Node* n) const {
458   // nullptr n is considered infinite
459   assert(n != head_);
460   return (n != nullptr) && (compare_(n->Key(), key) < 0);
461 }
462 
463 template <class Comparator>
464 bool InlineSkipList<Comparator>::KeyIsAfterNode(const DecodedKey& key,
465                                                 Node* n) const {
466   // nullptr n is considered infinite
467   assert(n != head_);
468   return (n != nullptr) && (compare_(n->Key(), key) < 0);
469 }
470 
471 template <class Comparator>
472 typename InlineSkipList<Comparator>::Node*
473 InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const {
474   // Note: It looks like we could reduce duplication by implementing
475   // this function as FindLessThan(key)->Next(0), but we wouldn't be able
476   // to exit early on equality and the result wouldn't even be correct.
477   // A concurrent insert might occur after FindLessThan(key) but before
478   // we get a chance to call Next(0).
479   Node* x = head_;
480   int level = GetMaxHeight() - 1;
481   Node* last_bigger = nullptr;
482   const DecodedKey key_decoded = compare_.decode_key(key);
483   while (true) {
484     Node* next = x->Next(level);
485     if (next != nullptr) {
486       PREFETCH(next->Next(level), 0, 1);
487     }
488     // Make sure the lists are sorted
489     assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x));
490     // Make sure we haven't overshot during our search
491     assert(x == head_ || KeyIsAfterNode(key_decoded, x));
492     int cmp = (next == nullptr || next == last_bigger)
493                   ? 1
494                   : compare_(next->Key(), key_decoded);
495     if (cmp == 0 || (cmp > 0 && level == 0)) {
496       return next;
497     } else if (cmp < 0) {
498       // Keep searching in this list
499       x = next;
500     } else {
501       // Switch to next list, reuse compare_() result
502       last_bigger = next;
503       level--;
504     }
505   }
506 }
507 
508 template <class Comparator>
509 typename InlineSkipList<Comparator>::Node*
510 InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev) const {
511   return FindLessThan(key, prev, head_, GetMaxHeight(), 0);
512 }
513 
514 template <class Comparator>
515 typename InlineSkipList<Comparator>::Node*
516 InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev,
517                                          Node* root, int top_level,
518                                          int bottom_level) const {
519   assert(top_level > bottom_level);
520   int level = top_level - 1;
521   Node* x = root;
522   // KeyIsAfter(key, last_not_after) is definitely false
523   Node* last_not_after = nullptr;
524   const DecodedKey key_decoded = compare_.decode_key(key);
525   while (true) {
526     assert(x != nullptr);
527     Node* next = x->Next(level);
528     if (next != nullptr) {
529       PREFETCH(next->Next(level), 0, 1);
530     }
531     assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x));
532     assert(x == head_ || KeyIsAfterNode(key_decoded, x));
533     if (next != last_not_after && KeyIsAfterNode(key_decoded, next)) {
534       // Keep searching in this list
535       assert(next != nullptr);
536       x = next;
537     } else {
538       if (prev != nullptr) {
539         prev[level] = x;
540       }
541       if (level == bottom_level) {
542         return x;
543       } else {
544         // Switch to next list, reuse KeyIsAfterNode() result
545         last_not_after = next;
546         level--;
547       }
548     }
549   }
550 }
551 
552 template <class Comparator>
553 typename InlineSkipList<Comparator>::Node*
554 InlineSkipList<Comparator>::FindLast() const {
555   Node* x = head_;
556   int level = GetMaxHeight() - 1;
557   while (true) {
558     Node* next = x->Next(level);
559     if (next == nullptr) {
560       if (level == 0) {
561         return x;
562       } else {
563         // Switch to next list
564         level--;
565       }
566     } else {
567       x = next;
568     }
569   }
570 }
571 
572 template <class Comparator>
573 typename InlineSkipList<Comparator>::Node*
574 InlineSkipList<Comparator>::FindRandomEntry() const {
575   // TODO(bjlemaire): consider adding PREFETCH calls.
576   Node *x = head_, *scan_node = nullptr, *limit_node = nullptr;
577 
578   // We start at the max level.
579   // FOr each level, we look at all the nodes at the level, and
580   // we randomly pick one of them. Then decrement the level
581   // and reiterate the process.
582   // eg: assume GetMaxHeight()=5, and there are #100 elements (nodes).
583   // level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15.
584   // We will consider all the nodes between #15 (inclusive) and #67
585   // (exclusive). #67 is called 'limit_node' here.
586   // level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose
587   // #51. #67 remains 'limit_node'.
588   // [...]
589   // level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57.
590   // Return Node #57.
591   std::vector<Node*> lvl_nodes;
592   Random* rnd = Random::GetTLSInstance();
593   int level = GetMaxHeight() - 1;
594 
595   while (level >= 0) {
596     lvl_nodes.clear();
597     scan_node = x;
598     while (scan_node != limit_node) {
599       lvl_nodes.push_back(scan_node);
600       scan_node = scan_node->Next(level);
601     }
602     uint32_t rnd_idx = rnd->Next() % lvl_nodes.size();
603     x = lvl_nodes[rnd_idx];
604     if (rnd_idx + 1 < lvl_nodes.size()) {
605       limit_node = lvl_nodes[rnd_idx + 1];
606     }
607     level--;
608   }
609   // There is a special case where x could still be the head_
610   // (note that the head_ contains no key).
611   return x == head_ ? head_->Next(0) : x;
612 }
613 
614 template <class Comparator>
615 uint64_t InlineSkipList<Comparator>::EstimateCount(const char* key) const {
616   uint64_t count = 0;
617 
618   Node* x = head_;
619   int level = GetMaxHeight() - 1;
620   const DecodedKey key_decoded = compare_.decode_key(key);
621   while (true) {
622     assert(x == head_ || compare_(x->Key(), key_decoded) < 0);
623     Node* next = x->Next(level);
624     if (next != nullptr) {
625       PREFETCH(next->Next(level), 0, 1);
626     }
627     if (next == nullptr || compare_(next->Key(), key_decoded) >= 0) {
628       if (level == 0) {
629         return count;
630       } else {
631         // Switch to next list
632         count *= kBranching_;
633         level--;
634       }
635     } else {
636       x = next;
637       count++;
638     }
639   }
640 }
641 
642 template <class Comparator>
643 InlineSkipList<Comparator>::InlineSkipList(const Comparator cmp,
644                                            Allocator* allocator,
645                                            int32_t max_height,
646                                            int32_t branching_factor)
647     : kMaxHeight_(static_cast<uint16_t>(max_height)),
648       kBranching_(static_cast<uint16_t>(branching_factor)),
649       kScaledInverseBranching_((Random::kMaxNext + 1) / kBranching_),
650       allocator_(allocator),
651       compare_(cmp),
652       head_(AllocateNode(0, max_height)),
653       max_height_(1),
654       seq_splice_(AllocateSplice()) {
655   assert(max_height > 0 && kMaxHeight_ == static_cast<uint32_t>(max_height));
656   assert(branching_factor > 1 &&
657          kBranching_ == static_cast<uint32_t>(branching_factor));
658   assert(kScaledInverseBranching_ > 0);
659 
660   for (int i = 0; i < kMaxHeight_; ++i) {
661     head_->SetNext(i, nullptr);
662   }
663 }
664 
665 template <class Comparator>
666 char* InlineSkipList<Comparator>::AllocateKey(size_t key_size) {
667   return const_cast<char*>(AllocateNode(key_size, RandomHeight())->Key());
668 }
669 
670 template <class Comparator>
671 typename InlineSkipList<Comparator>::Node*
672 InlineSkipList<Comparator>::AllocateNode(size_t key_size, int height) {
673   auto prefix = sizeof(std::atomic<Node*>) * (height - 1);
674 
675   // prefix is space for the height - 1 pointers that we store before
676   // the Node instance (next_[-(height - 1) .. -1]).  Node starts at
677   // raw + prefix, and holds the bottom-mode (level 0) skip list pointer
678   // next_[0].  key_size is the bytes for the key, which comes just after
679   // the Node.
680   char* raw = allocator_->AllocateAligned(prefix + sizeof(Node) + key_size);
681   Node* x = reinterpret_cast<Node*>(raw + prefix);
682 
683   // Once we've linked the node into the skip list we don't actually need
684   // to know its height, because we can implicitly use the fact that we
685   // traversed into a node at level h to known that h is a valid level
686   // for that node.  We need to convey the height to the Insert step,
687   // however, so that it can perform the proper links.  Since we're not
688   // using the pointers at the moment, StashHeight temporarily borrow
689   // storage from next_[0] for that purpose.
690   x->StashHeight(height);
691   return x;
692 }
693 
694 template <class Comparator>
695 typename InlineSkipList<Comparator>::Splice*
696 InlineSkipList<Comparator>::AllocateSplice() {
697   // size of prev_ and next_
698   size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1);
699   char* raw = allocator_->AllocateAligned(sizeof(Splice) + array_size * 2);
700   Splice* splice = reinterpret_cast<Splice*>(raw);
701   splice->height_ = 0;
702   splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice));
703   splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size);
704   return splice;
705 }
706 
707 template <class Comparator>
708 typename InlineSkipList<Comparator>::Splice*
709 InlineSkipList<Comparator>::AllocateSpliceOnHeap() {
710   size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1);
711   char* raw = new char[sizeof(Splice) + array_size * 2];
712   Splice* splice = reinterpret_cast<Splice*>(raw);
713   splice->height_ = 0;
714   splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice));
715   splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size);
716   return splice;
717 }
718 
719 template <class Comparator>
720 bool InlineSkipList<Comparator>::Insert(const char* key) {
721   return Insert<false>(key, seq_splice_, false);
722 }
723 
724 template <class Comparator>
725 bool InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
726   Node* prev[kMaxPossibleHeight];
727   Node* next[kMaxPossibleHeight];
728   Splice splice;
729   splice.prev_ = prev;
730   splice.next_ = next;
731   return Insert<true>(key, &splice, false);
732 }
733 
734 template <class Comparator>
735 bool InlineSkipList<Comparator>::InsertWithHint(const char* key, void** hint) {
736   assert(hint != nullptr);
737   Splice* splice = reinterpret_cast<Splice*>(*hint);
738   if (splice == nullptr) {
739     splice = AllocateSplice();
740     *hint = reinterpret_cast<void*>(splice);
741   }
742   return Insert<false>(key, splice, true);
743 }
744 
745 template <class Comparator>
746 bool InlineSkipList<Comparator>::InsertWithHintConcurrently(const char* key,
747                                                             void** hint) {
748   assert(hint != nullptr);
749   Splice* splice = reinterpret_cast<Splice*>(*hint);
750   if (splice == nullptr) {
751     splice = AllocateSpliceOnHeap();
752     *hint = reinterpret_cast<void*>(splice);
753   }
754   return Insert<true>(key, splice, true);
755 }
756 
757 template <class Comparator>
758 template <bool prefetch_before>
759 void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,
760                                                     Node* before, Node* after,
761                                                     int level, Node** out_prev,
762                                                     Node** out_next) {
763   while (true) {
764     Node* next = before->Next(level);
765     if (next != nullptr) {
766       PREFETCH(next->Next(level), 0, 1);
767     }
768     if (prefetch_before == true) {
769       if (next != nullptr && level>0) {
770         PREFETCH(next->Next(level-1), 0, 1);
771       }
772     }
773     assert(before == head_ || next == nullptr ||
774            KeyIsAfterNode(next->Key(), before));
775     assert(before == head_ || KeyIsAfterNode(key, before));
776     if (next == after || !KeyIsAfterNode(key, next)) {
777       // found it
778       *out_prev = before;
779       *out_next = next;
780       return;
781     }
782     before = next;
783   }
784 }
785 
786 template <class Comparator>
787 void InlineSkipList<Comparator>::RecomputeSpliceLevels(const DecodedKey& key,
788                                                        Splice* splice,
789                                                        int recompute_level) {
790   assert(recompute_level > 0);
791   assert(recompute_level <= splice->height_);
792   for (int i = recompute_level - 1; i >= 0; --i) {
793     FindSpliceForLevel<true>(key, splice->prev_[i + 1], splice->next_[i + 1], i,
794                        &splice->prev_[i], &splice->next_[i]);
795   }
796 }
797 
798 template <class Comparator>
799 template <bool UseCAS>
800 bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
801                                         bool allow_partial_splice_fix) {
802   Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
803   const DecodedKey key_decoded = compare_.decode_key(key);
804   int height = x->UnstashHeight();
805   assert(height >= 1 && height <= kMaxHeight_);
806 
807   int max_height = max_height_.load(std::memory_order_relaxed);
808   while (height > max_height) {
809     if (max_height_.compare_exchange_weak(max_height, height)) {
810       // successfully updated it
811       max_height = height;
812       break;
813     }
814     // else retry, possibly exiting the loop because somebody else
815     // increased it
816   }
817   assert(max_height <= kMaxPossibleHeight);
818 
819   int recompute_height = 0;
820   if (splice->height_ < max_height) {
821     // Either splice has never been used or max_height has grown since
822     // last use.  We could potentially fix it in the latter case, but
823     // that is tricky.
824     splice->prev_[max_height] = head_;
825     splice->next_[max_height] = nullptr;
826     splice->height_ = max_height;
827     recompute_height = max_height;
828   } else {
829     // Splice is a valid proper-height splice that brackets some
830     // key, but does it bracket this one?  We need to validate it and
831     // recompute a portion of the splice (levels 0..recompute_height-1)
832     // that is a superset of all levels that don't bracket the new key.
833     // Several choices are reasonable, because we have to balance the work
834     // saved against the extra comparisons required to validate the Splice.
835     //
836     // One strategy is just to recompute all of orig_splice_height if the
837     // bottom level isn't bracketing.  This pessimistically assumes that
838     // we will either get a perfect Splice hit (increasing sequential
839     // inserts) or have no locality.
840     //
841     // Another strategy is to walk up the Splice's levels until we find
842     // a level that brackets the key.  This strategy lets the Splice
843     // hint help for other cases: it turns insertion from O(log N) into
844     // O(log D), where D is the number of nodes in between the key that
845     // produced the Splice and the current insert (insertion is aided
846     // whether the new key is before or after the splice).  If you have
847     // a way of using a prefix of the key to map directly to the closest
848     // Splice out of O(sqrt(N)) Splices and we make it so that splices
849     // can also be used as hints during read, then we end up with Oshman's
850     // and Shavit's SkipTrie, which has O(log log N) lookup and insertion
851     // (compare to O(log N) for skip list).
852     //
853     // We control the pessimistic strategy with allow_partial_splice_fix.
854     // A good strategy is probably to be pessimistic for seq_splice_,
855     // optimistic if the caller actually went to the work of providing
856     // a Splice.
857     while (recompute_height < max_height) {
858       if (splice->prev_[recompute_height]->Next(recompute_height) !=
859           splice->next_[recompute_height]) {
860         // splice isn't tight at this level, there must have been some inserts
861         // to this
862         // location that didn't update the splice.  We might only be a little
863         // stale, but if
864         // the splice is very stale it would be O(N) to fix it.  We haven't used
865         // up any of
866         // our budget of comparisons, so always move up even if we are
867         // pessimistic about
868         // our chances of success.
869         ++recompute_height;
870       } else if (splice->prev_[recompute_height] != head_ &&
871                  !KeyIsAfterNode(key_decoded,
872                                  splice->prev_[recompute_height])) {
873         // key is from before splice
874         if (allow_partial_splice_fix) {
875           // skip all levels with the same node without more comparisons
876           Node* bad = splice->prev_[recompute_height];
877           while (splice->prev_[recompute_height] == bad) {
878             ++recompute_height;
879           }
880         } else {
881           // we're pessimistic, recompute everything
882           recompute_height = max_height;
883         }
884       } else if (KeyIsAfterNode(key_decoded,
885                                 splice->next_[recompute_height])) {
886         // key is from after splice
887         if (allow_partial_splice_fix) {
888           Node* bad = splice->next_[recompute_height];
889           while (splice->next_[recompute_height] == bad) {
890             ++recompute_height;
891           }
892         } else {
893           recompute_height = max_height;
894         }
895       } else {
896         // this level brackets the key, we won!
897         break;
898       }
899     }
900   }
901   assert(recompute_height <= max_height);
902   if (recompute_height > 0) {
903     RecomputeSpliceLevels(key_decoded, splice, recompute_height);
904   }
905 
906   bool splice_is_valid = true;
907   if (UseCAS) {
908     for (int i = 0; i < height; ++i) {
909       while (true) {
910         // Checking for duplicate keys on the level 0 is sufficient
911         if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
912                      compare_(x->Key(), splice->next_[i]->Key()) >= 0)) {
913           // duplicate key
914           return false;
915         }
916         if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
917                      compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) {
918           // duplicate key
919           return false;
920         }
921         assert(splice->next_[i] == nullptr ||
922                compare_(x->Key(), splice->next_[i]->Key()) < 0);
923         assert(splice->prev_[i] == head_ ||
924                compare_(splice->prev_[i]->Key(), x->Key()) < 0);
925         x->NoBarrier_SetNext(i, splice->next_[i]);
926         if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) {
927           // success
928           break;
929         }
930         // CAS failed, we need to recompute prev and next. It is unlikely
931         // to be helpful to try to use a different level as we redo the
932         // search, because it should be unlikely that lots of nodes have
933         // been inserted between prev[i] and next[i]. No point in using
934         // next[i] as the after hint, because we know it is stale.
935         FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
936                                   &splice->prev_[i], &splice->next_[i]);
937 
938         // Since we've narrowed the bracket for level i, we might have
939         // violated the Splice constraint between i and i-1.  Make sure
940         // we recompute the whole thing next time.
941         if (i > 0) {
942           splice_is_valid = false;
943         }
944       }
945     }
946   } else {
947     for (int i = 0; i < height; ++i) {
948       if (i >= recompute_height &&
949           splice->prev_[i]->Next(i) != splice->next_[i]) {
950         FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
951                                   &splice->prev_[i], &splice->next_[i]);
952       }
953       // Checking for duplicate keys on the level 0 is sufficient
954       if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
955                    compare_(x->Key(), splice->next_[i]->Key()) >= 0)) {
956         // duplicate key
957         return false;
958       }
959       if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
960                    compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) {
961         // duplicate key
962         return false;
963       }
964       assert(splice->next_[i] == nullptr ||
965              compare_(x->Key(), splice->next_[i]->Key()) < 0);
966       assert(splice->prev_[i] == head_ ||
967              compare_(splice->prev_[i]->Key(), x->Key()) < 0);
968       assert(splice->prev_[i]->Next(i) == splice->next_[i]);
969       x->NoBarrier_SetNext(i, splice->next_[i]);
970       splice->prev_[i]->SetNext(i, x);
971     }
972   }
973   if (splice_is_valid) {
974     for (int i = 0; i < height; ++i) {
975       splice->prev_[i] = x;
976     }
977     assert(splice->prev_[splice->height_] == head_);
978     assert(splice->next_[splice->height_] == nullptr);
979     for (int i = 0; i < splice->height_; ++i) {
980       assert(splice->next_[i] == nullptr ||
981              compare_(key, splice->next_[i]->Key()) < 0);
982       assert(splice->prev_[i] == head_ ||
983              compare_(splice->prev_[i]->Key(), key) <= 0);
984       assert(splice->prev_[i + 1] == splice->prev_[i] ||
985              splice->prev_[i + 1] == head_ ||
986              compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) <
987                  0);
988       assert(splice->next_[i + 1] == splice->next_[i] ||
989              splice->next_[i + 1] == nullptr ||
990              compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) <
991                  0);
992     }
993   } else {
994     splice->height_ = 0;
995   }
996   return true;
997 }
998 
999 template <class Comparator>
1000 bool InlineSkipList<Comparator>::Contains(const char* key) const {
1001   Node* x = FindGreaterOrEqual(key);
1002   if (x != nullptr && Equal(key, x->Key())) {
1003     return true;
1004   } else {
1005     return false;
1006   }
1007 }
1008 
1009 template <class Comparator>
1010 void InlineSkipList<Comparator>::TEST_Validate() const {
1011   // Interate over all levels at the same time, and verify nodes appear in
1012   // the right order, and nodes appear in upper level also appear in lower
1013   // levels.
1014   Node* nodes[kMaxPossibleHeight];
1015   int max_height = GetMaxHeight();
1016   assert(max_height > 0);
1017   for (int i = 0; i < max_height; i++) {
1018     nodes[i] = head_;
1019   }
1020   while (nodes[0] != nullptr) {
1021     Node* l0_next = nodes[0]->Next(0);
1022     if (l0_next == nullptr) {
1023       break;
1024     }
1025     assert(nodes[0] == head_ || compare_(nodes[0]->Key(), l0_next->Key()) < 0);
1026     nodes[0] = l0_next;
1027 
1028     int i = 1;
1029     while (i < max_height) {
1030       Node* next = nodes[i]->Next(i);
1031       if (next == nullptr) {
1032         break;
1033       }
1034       auto cmp = compare_(nodes[0]->Key(), next->Key());
1035       assert(cmp <= 0);
1036       if (cmp == 0) {
1037         assert(next == nodes[0]);
1038         nodes[i] = next;
1039       } else {
1040         break;
1041       }
1042       i++;
1043     }
1044   }
1045   for (int i = 1; i < max_height; i++) {
1046     assert(nodes[i] != nullptr && nodes[i]->Next(i) == nullptr);
1047   }
1048 }
1049 
1050 }  // namespace ROCKSDB_NAMESPACE
1051