1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.  Use of
7 // this source code is governed by a BSD-style license that can be found
8 // in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // InlineSkipList is derived from SkipList (skiplist.h), but it optimizes
11 // the memory layout by requiring that the key storage be allocated through
12 // the skip list instance.  For the common case of SkipList<const char*,
13 // Cmp> this saves 1 pointer per skip list node and gives better cache
14 // locality, at the expense of wasted padding from using AllocateAligned
15 // instead of Allocate for the keys.  The unused padding will be from
16 // 0 to sizeof(void*)-1 bytes, and the space savings are sizeof(void*)
17 // bytes, so despite the padding the space used is always less than
18 // SkipList<const char*, ..>.
19 //
20 // Thread safety -------------
21 //
22 // Writes via Insert require external synchronization, most likely a mutex.
23 // InsertConcurrently can be safely called concurrently with reads and
24 // with other concurrent inserts.  Reads require a guarantee that the
25 // InlineSkipList will not be destroyed while the read is in progress.
26 // Apart from that, reads progress without any internal locking or
27 // synchronization.
28 //
29 // Invariants:
30 //
31 // (1) Allocated nodes are never deleted until the InlineSkipList is
32 // destroyed.  This is trivially guaranteed by the code since we never
33 // delete any skip list nodes.
34 //
35 // (2) The contents of a Node except for the next/prev pointers are
36 // immutable after the Node has been linked into the InlineSkipList.
37 // Only Insert() modifies the list, and it is careful to initialize a
38 // node and use release-stores to publish the nodes in one or more lists.
39 //
40 // ... prev vs. next pointer ordering ...
41 //
42 
43 #pragma once
44 #include <assert.h>
45 #include <stdlib.h>
46 #include <algorithm>
47 #include <atomic>
48 #include <type_traits>
49 #include "memory/allocator.h"
50 #include "port/likely.h"
51 #include "port/port.h"
52 #include "rocksdb/slice.h"
53 #include "util/coding.h"
54 #include "util/random.h"
55 
56 namespace ROCKSDB_NAMESPACE {
57 
58 template <class Comparator>
59 class InlineSkipList {
60  private:
61   struct Node;
62   struct Splice;
63 
64  public:
65   using DecodedKey = \
66     typename std::remove_reference<Comparator>::type::DecodedType;
67 
68   static const uint16_t kMaxPossibleHeight = 32;
69 
70   // Create a new InlineSkipList object that will use "cmp" for comparing
71   // keys, and will allocate memory using "*allocator".  Objects allocated
72   // in the allocator must remain allocated for the lifetime of the
73   // skiplist object.
74   explicit InlineSkipList(Comparator cmp, Allocator* allocator,
75                           int32_t max_height = 12,
76                           int32_t branching_factor = 4);
77   // No copying allowed
78   InlineSkipList(const InlineSkipList&) = delete;
79   InlineSkipList& operator=(const InlineSkipList&) = delete;
80 
81   // Allocates a key and a skip-list node, returning a pointer to the key
82   // portion of the node.  This method is thread-safe if the allocator
83   // is thread-safe.
84   char* AllocateKey(size_t key_size);
85 
86   // Allocate a splice using allocator.
87   Splice* AllocateSplice();
88 
89   // Allocate a splice on heap.
90   Splice* AllocateSpliceOnHeap();
91 
92   // Inserts a key allocated by AllocateKey, after the actual key value
93   // has been filled in.
94   //
95   // REQUIRES: nothing that compares equal to key is currently in the list.
96   // REQUIRES: no concurrent calls to any of inserts.
97   bool Insert(const char* key);
98 
99   // Inserts a key allocated by AllocateKey with a hint of last insert
100   // position in the skip-list. If hint points to nullptr, a new hint will be
101   // populated, which can be used in subsequent calls.
102   //
103   // It can be used to optimize the workload where there are multiple groups
104   // of keys, and each key is likely to insert to a location close to the last
105   // inserted key in the same group. One example is sequential inserts.
106   //
107   // REQUIRES: nothing that compares equal to key is currently in the list.
108   // REQUIRES: no concurrent calls to any of inserts.
109   bool InsertWithHint(const char* key, void** hint);
110 
111   // Like InsertConcurrently, but with a hint
112   //
113   // REQUIRES: nothing that compares equal to key is currently in the list.
114   // REQUIRES: no concurrent calls that use same hint
115   bool InsertWithHintConcurrently(const char* key, void** hint);
116 
117   // Like Insert, but external synchronization is not required.
118   bool InsertConcurrently(const char* key);
119 
120   // Inserts a node into the skip list.  key must have been allocated by
121   // AllocateKey and then filled in by the caller.  If UseCAS is true,
122   // then external synchronization is not required, otherwise this method
123   // may not be called concurrently with any other insertions.
124   //
125   // Regardless of whether UseCAS is true, the splice must be owned
126   // exclusively by the current thread.  If allow_partial_splice_fix is
127   // true, then the cost of insertion is amortized O(log D), where D is
128   // the distance from the splice to the inserted key (measured as the
129   // number of intervening nodes).  Note that this bound is very good for
130   // sequential insertions!  If allow_partial_splice_fix is false then
131   // the existing splice will be ignored unless the current key is being
132   // inserted immediately after the splice.  allow_partial_splice_fix ==
133   // false has worse running time for the non-sequential case O(log N),
134   // but a better constant factor.
135   template <bool UseCAS>
136   bool Insert(const char* key, Splice* splice, bool allow_partial_splice_fix);
137 
138   // Returns true iff an entry that compares equal to key is in the list.
139   bool Contains(const char* key) const;
140 
141   // Return estimated number of entries smaller than `key`.
142   uint64_t EstimateCount(const char* key) const;
143 
144   // Validate correctness of the skip-list.
145   void TEST_Validate() const;
146 
147   // Iteration over the contents of a skip list
148   class Iterator {
149    public:
150     // Initialize an iterator over the specified list.
151     // The returned iterator is not valid.
152     explicit Iterator(const InlineSkipList* list);
153 
154     // Change the underlying skiplist used for this iterator
155     // This enables us not changing the iterator without deallocating
156     // an old one and then allocating a new one
157     void SetList(const InlineSkipList* list);
158 
159     // Returns true iff the iterator is positioned at a valid node.
160     bool Valid() const;
161 
162     // Returns the key at the current position.
163     // REQUIRES: Valid()
164     const char* key() const;
165 
166     // Advances to the next position.
167     // REQUIRES: Valid()
168     void Next();
169 
170     // Advances to the previous position.
171     // REQUIRES: Valid()
172     void Prev();
173 
174     // Advance to the first entry with a key >= target
175     void Seek(const char* target);
176 
177     // Retreat to the last entry with a key <= target
178     void SeekForPrev(const char* target);
179 
180     // Position at the first entry in list.
181     // Final state of iterator is Valid() iff list is not empty.
182     void SeekToFirst();
183 
184     // Position at the last entry in list.
185     // Final state of iterator is Valid() iff list is not empty.
186     void SeekToLast();
187 
188    private:
189     const InlineSkipList* list_;
190     Node* node_;
191     // Intentionally copyable
192   };
193 
194  private:
195   const uint16_t kMaxHeight_;
196   const uint16_t kBranching_;
197   const uint32_t kScaledInverseBranching_;
198 
199   Allocator* const allocator_;  // Allocator used for allocations of nodes
200   // Immutable after construction
201   Comparator const compare_;
202   Node* const head_;
203 
204   // Modified only by Insert().  Read racily by readers, but stale
205   // values are ok.
206   std::atomic<int> max_height_;  // Height of the entire list
207 
208   // seq_splice_ is a Splice used for insertions in the non-concurrent
209   // case.  It caches the prev and next found during the most recent
210   // non-concurrent insertion.
211   Splice* seq_splice_;
212 
GetMaxHeight()213   inline int GetMaxHeight() const {
214     return max_height_.load(std::memory_order_relaxed);
215   }
216 
217   int RandomHeight();
218 
219   Node* AllocateNode(size_t key_size, int height);
220 
Equal(const char * a,const char * b)221   bool Equal(const char* a, const char* b) const {
222     return (compare_(a, b) == 0);
223   }
224 
LessThan(const char * a,const char * b)225   bool LessThan(const char* a, const char* b) const {
226     return (compare_(a, b) < 0);
227   }
228 
229   // Return true if key is greater than the data stored in "n".  Null n
230   // is considered infinite.  n should not be head_.
231   bool KeyIsAfterNode(const char* key, Node* n) const;
232   bool KeyIsAfterNode(const DecodedKey& key, Node* n) const;
233 
234   // Returns the earliest node with a key >= key.
235   // Return nullptr if there is no such node.
236   Node* FindGreaterOrEqual(const char* key) const;
237 
238   // Return the latest node with a key < key.
239   // Return head_ if there is no such node.
240   // Fills prev[level] with pointer to previous node at "level" for every
241   // level in [0..max_height_-1], if prev is non-null.
242   Node* FindLessThan(const char* key, Node** prev = nullptr) const;
243 
244   // Return the latest node with a key < key on bottom_level. Start searching
245   // from root node on the level below top_level.
246   // Fills prev[level] with pointer to previous node at "level" for every
247   // level in [bottom_level..top_level-1], if prev is non-null.
248   Node* FindLessThan(const char* key, Node** prev, Node* root, int top_level,
249                      int bottom_level) const;
250 
251   // Return the last node in the list.
252   // Return head_ if list is empty.
253   Node* FindLast() const;
254 
255   // Traverses a single level of the list, setting *out_prev to the last
256   // node before the key and *out_next to the first node after. Assumes
257   // that the key is not present in the skip list. On entry, before should
258   // point to a node that is before the key, and after should point to
259   // a node that is after the key.  after should be nullptr if a good after
260   // node isn't conveniently available.
261   template<bool prefetch_before>
262   void FindSpliceForLevel(const DecodedKey& key, Node* before, Node* after, int level,
263                           Node** out_prev, Node** out_next);
264 
265   // Recomputes Splice levels from highest_level (inclusive) down to
266   // lowest_level (inclusive).
267   void RecomputeSpliceLevels(const DecodedKey& key, Splice* splice,
268                              int recompute_level);
269 };
270 
271 // Implementation details follow
272 
273 template <class Comparator>
274 struct InlineSkipList<Comparator>::Splice {
275   // The invariant of a Splice is that prev_[i+1].key <= prev_[i].key <
276   // next_[i].key <= next_[i+1].key for all i.  That means that if a
277   // key is bracketed by prev_[i] and next_[i] then it is bracketed by
278   // all higher levels.  It is _not_ required that prev_[i]->Next(i) ==
279   // next_[i] (it probably did at some point in the past, but intervening
280   // or concurrent operations might have inserted nodes in between).
281   int height_ = 0;
282   Node** prev_;
283   Node** next_;
284 };
285 
286 // The Node data type is more of a pointer into custom-managed memory than
287 // a traditional C++ struct.  The key is stored in the bytes immediately
288 // after the struct, and the next_ pointers for nodes with height > 1 are
289 // stored immediately _before_ the struct.  This avoids the need to include
290 // any pointer or sizing data, which reduces per-node memory overheads.
291 template <class Comparator>
292 struct InlineSkipList<Comparator>::Node {
293   // Stores the height of the node in the memory location normally used for
294   // next_[0].  This is used for passing data from AllocateKey to Insert.
295   void StashHeight(const int height) {
296     assert(sizeof(int) <= sizeof(next_[0]));
297     memcpy(static_cast<void*>(&next_[0]), &height, sizeof(int));
298   }
299 
300   // Retrieves the value passed to StashHeight.  Undefined after a call
301   // to SetNext or NoBarrier_SetNext.
302   int UnstashHeight() const {
303     int rv;
304     memcpy(&rv, &next_[0], sizeof(int));
305     return rv;
306   }
307 
308   const char* Key() const { return reinterpret_cast<const char*>(&next_[1]); }
309 
310   // Accessors/mutators for links.  Wrapped in methods so we can add
311   // the appropriate barriers as necessary, and perform the necessary
312   // addressing trickery for storing links below the Node in memory.
313   Node* Next(int n) {
314     assert(n >= 0);
315     // Use an 'acquire load' so that we observe a fully initialized
316     // version of the returned Node.
317     return ((&next_[0] - n)->load(std::memory_order_acquire));
318   }
319 
320   void SetNext(int n, Node* x) {
321     assert(n >= 0);
322     // Use a 'release store' so that anybody who reads through this
323     // pointer observes a fully initialized version of the inserted node.
324     (&next_[0] - n)->store(x, std::memory_order_release);
325   }
326 
327   bool CASNext(int n, Node* expected, Node* x) {
328     assert(n >= 0);
329     return (&next_[0] - n)->compare_exchange_strong(expected, x);
330   }
331 
332   // No-barrier variants that can be safely used in a few locations.
333   Node* NoBarrier_Next(int n) {
334     assert(n >= 0);
335     return (&next_[0] - n)->load(std::memory_order_relaxed);
336   }
337 
338   void NoBarrier_SetNext(int n, Node* x) {
339     assert(n >= 0);
340     (&next_[0] - n)->store(x, std::memory_order_relaxed);
341   }
342 
343   // Insert node after prev on specific level.
344   void InsertAfter(Node* prev, int level) {
345     // NoBarrier_SetNext() suffices since we will add a barrier when
346     // we publish a pointer to "this" in prev.
347     NoBarrier_SetNext(level, prev->NoBarrier_Next(level));
348     prev->SetNext(level, this);
349   }
350 
351  private:
352   // next_[0] is the lowest level link (level 0).  Higher levels are
353   // stored _earlier_, so level 1 is at next_[-1].
354   std::atomic<Node*> next_[1];
355 };
356 
357 template <class Comparator>
358 inline InlineSkipList<Comparator>::Iterator::Iterator(
359     const InlineSkipList* list) {
360   SetList(list);
361 }
362 
363 template <class Comparator>
364 inline void InlineSkipList<Comparator>::Iterator::SetList(
365     const InlineSkipList* list) {
366   list_ = list;
367   node_ = nullptr;
368 }
369 
370 template <class Comparator>
371 inline bool InlineSkipList<Comparator>::Iterator::Valid() const {
372   return node_ != nullptr;
373 }
374 
375 template <class Comparator>
376 inline const char* InlineSkipList<Comparator>::Iterator::key() const {
377   assert(Valid());
378   return node_->Key();
379 }
380 
381 template <class Comparator>
382 inline void InlineSkipList<Comparator>::Iterator::Next() {
383   assert(Valid());
384   node_ = node_->Next(0);
385 }
386 
387 template <class Comparator>
388 inline void InlineSkipList<Comparator>::Iterator::Prev() {
389   // Instead of using explicit "prev" links, we just search for the
390   // last node that falls before key.
391   assert(Valid());
392   node_ = list_->FindLessThan(node_->Key());
393   if (node_ == list_->head_) {
394     node_ = nullptr;
395   }
396 }
397 
398 template <class Comparator>
399 inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) {
400   node_ = list_->FindGreaterOrEqual(target);
401 }
402 
403 template <class Comparator>
404 inline void InlineSkipList<Comparator>::Iterator::SeekForPrev(
405     const char* target) {
406   Seek(target);
407   if (!Valid()) {
408     SeekToLast();
409   }
410   while (Valid() && list_->LessThan(target, key())) {
411     Prev();
412   }
413 }
414 
415 template <class Comparator>
416 inline void InlineSkipList<Comparator>::Iterator::SeekToFirst() {
417   node_ = list_->head_->Next(0);
418 }
419 
420 template <class Comparator>
421 inline void InlineSkipList<Comparator>::Iterator::SeekToLast() {
422   node_ = list_->FindLast();
423   if (node_ == list_->head_) {
424     node_ = nullptr;
425   }
426 }
427 
428 template <class Comparator>
429 int InlineSkipList<Comparator>::RandomHeight() {
430   auto rnd = Random::GetTLSInstance();
431 
432   // Increase height with probability 1 in kBranching
433   int height = 1;
434   while (height < kMaxHeight_ && height < kMaxPossibleHeight &&
435          rnd->Next() < kScaledInverseBranching_) {
436     height++;
437   }
438   assert(height > 0);
439   assert(height <= kMaxHeight_);
440   assert(height <= kMaxPossibleHeight);
441   return height;
442 }
443 
444 template <class Comparator>
445 bool InlineSkipList<Comparator>::KeyIsAfterNode(const char* key,
446                                                 Node* n) const {
447   // nullptr n is considered infinite
448   assert(n != head_);
449   return (n != nullptr) && (compare_(n->Key(), key) < 0);
450 }
451 
452 template <class Comparator>
453 bool InlineSkipList<Comparator>::KeyIsAfterNode(const DecodedKey& key,
454                                                 Node* n) const {
455   // nullptr n is considered infinite
456   assert(n != head_);
457   return (n != nullptr) && (compare_(n->Key(), key) < 0);
458 }
459 
460 template <class Comparator>
461 typename InlineSkipList<Comparator>::Node*
462 InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const {
463   // Note: It looks like we could reduce duplication by implementing
464   // this function as FindLessThan(key)->Next(0), but we wouldn't be able
465   // to exit early on equality and the result wouldn't even be correct.
466   // A concurrent insert might occur after FindLessThan(key) but before
467   // we get a chance to call Next(0).
468   Node* x = head_;
469   int level = GetMaxHeight() - 1;
470   Node* last_bigger = nullptr;
471   const DecodedKey key_decoded = compare_.decode_key(key);
472   while (true) {
473     Node* next = x->Next(level);
474     if (next != nullptr) {
475       PREFETCH(next->Next(level), 0, 1);
476     }
477     // Make sure the lists are sorted
478     assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x));
479     // Make sure we haven't overshot during our search
480     assert(x == head_ || KeyIsAfterNode(key_decoded, x));
481     int cmp = (next == nullptr || next == last_bigger)
482                   ? 1
483                   : compare_(next->Key(), key_decoded);
484     if (cmp == 0 || (cmp > 0 && level == 0)) {
485       return next;
486     } else if (cmp < 0) {
487       // Keep searching in this list
488       x = next;
489     } else {
490       // Switch to next list, reuse compare_() result
491       last_bigger = next;
492       level--;
493     }
494   }
495 }
496 
497 template <class Comparator>
498 typename InlineSkipList<Comparator>::Node*
499 InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev) const {
500   return FindLessThan(key, prev, head_, GetMaxHeight(), 0);
501 }
502 
503 template <class Comparator>
504 typename InlineSkipList<Comparator>::Node*
505 InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev,
506                                          Node* root, int top_level,
507                                          int bottom_level) const {
508   assert(top_level > bottom_level);
509   int level = top_level - 1;
510   Node* x = root;
511   // KeyIsAfter(key, last_not_after) is definitely false
512   Node* last_not_after = nullptr;
513   const DecodedKey key_decoded = compare_.decode_key(key);
514   while (true) {
515     assert(x != nullptr);
516     Node* next = x->Next(level);
517     if (next != nullptr) {
518       PREFETCH(next->Next(level), 0, 1);
519     }
520     assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x));
521     assert(x == head_ || KeyIsAfterNode(key_decoded, x));
522     if (next != last_not_after && KeyIsAfterNode(key_decoded, next)) {
523       // Keep searching in this list
524       assert(next != nullptr);
525       x = next;
526     } else {
527       if (prev != nullptr) {
528         prev[level] = x;
529       }
530       if (level == bottom_level) {
531         return x;
532       } else {
533         // Switch to next list, reuse KeyIsAfterNode() result
534         last_not_after = next;
535         level--;
536       }
537     }
538   }
539 }
540 
541 template <class Comparator>
542 typename InlineSkipList<Comparator>::Node*
543 InlineSkipList<Comparator>::FindLast() const {
544   Node* x = head_;
545   int level = GetMaxHeight() - 1;
546   while (true) {
547     Node* next = x->Next(level);
548     if (next == nullptr) {
549       if (level == 0) {
550         return x;
551       } else {
552         // Switch to next list
553         level--;
554       }
555     } else {
556       x = next;
557     }
558   }
559 }
560 
561 template <class Comparator>
562 uint64_t InlineSkipList<Comparator>::EstimateCount(const char* key) const {
563   uint64_t count = 0;
564 
565   Node* x = head_;
566   int level = GetMaxHeight() - 1;
567   const DecodedKey key_decoded = compare_.decode_key(key);
568   while (true) {
569     assert(x == head_ || compare_(x->Key(), key_decoded) < 0);
570     Node* next = x->Next(level);
571     if (next != nullptr) {
572       PREFETCH(next->Next(level), 0, 1);
573     }
574     if (next == nullptr || compare_(next->Key(), key_decoded) >= 0) {
575       if (level == 0) {
576         return count;
577       } else {
578         // Switch to next list
579         count *= kBranching_;
580         level--;
581       }
582     } else {
583       x = next;
584       count++;
585     }
586   }
587 }
588 
589 template <class Comparator>
590 InlineSkipList<Comparator>::InlineSkipList(const Comparator cmp,
591                                            Allocator* allocator,
592                                            int32_t max_height,
593                                            int32_t branching_factor)
594     : kMaxHeight_(static_cast<uint16_t>(max_height)),
595       kBranching_(static_cast<uint16_t>(branching_factor)),
596       kScaledInverseBranching_((Random::kMaxNext + 1) / kBranching_),
597       allocator_(allocator),
598       compare_(cmp),
599       head_(AllocateNode(0, max_height)),
600       max_height_(1),
601       seq_splice_(AllocateSplice()) {
602   assert(max_height > 0 && kMaxHeight_ == static_cast<uint32_t>(max_height));
603   assert(branching_factor > 1 &&
604          kBranching_ == static_cast<uint32_t>(branching_factor));
605   assert(kScaledInverseBranching_ > 0);
606 
607   for (int i = 0; i < kMaxHeight_; ++i) {
608     head_->SetNext(i, nullptr);
609   }
610 }
611 
612 template <class Comparator>
613 char* InlineSkipList<Comparator>::AllocateKey(size_t key_size) {
614   return const_cast<char*>(AllocateNode(key_size, RandomHeight())->Key());
615 }
616 
617 template <class Comparator>
618 typename InlineSkipList<Comparator>::Node*
619 InlineSkipList<Comparator>::AllocateNode(size_t key_size, int height) {
620   auto prefix = sizeof(std::atomic<Node*>) * (height - 1);
621 
622   // prefix is space for the height - 1 pointers that we store before
623   // the Node instance (next_[-(height - 1) .. -1]).  Node starts at
624   // raw + prefix, and holds the bottom-mode (level 0) skip list pointer
625   // next_[0].  key_size is the bytes for the key, which comes just after
626   // the Node.
627   char* raw = allocator_->AllocateAligned(prefix + sizeof(Node) + key_size);
628   Node* x = reinterpret_cast<Node*>(raw + prefix);
629 
630   // Once we've linked the node into the skip list we don't actually need
631   // to know its height, because we can implicitly use the fact that we
632   // traversed into a node at level h to known that h is a valid level
633   // for that node.  We need to convey the height to the Insert step,
634   // however, so that it can perform the proper links.  Since we're not
635   // using the pointers at the moment, StashHeight temporarily borrow
636   // storage from next_[0] for that purpose.
637   x->StashHeight(height);
638   return x;
639 }
640 
641 template <class Comparator>
642 typename InlineSkipList<Comparator>::Splice*
643 InlineSkipList<Comparator>::AllocateSplice() {
644   // size of prev_ and next_
645   size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1);
646   char* raw = allocator_->AllocateAligned(sizeof(Splice) + array_size * 2);
647   Splice* splice = reinterpret_cast<Splice*>(raw);
648   splice->height_ = 0;
649   splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice));
650   splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size);
651   return splice;
652 }
653 
654 template <class Comparator>
655 typename InlineSkipList<Comparator>::Splice*
656 InlineSkipList<Comparator>::AllocateSpliceOnHeap() {
657   size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1);
658   char* raw = new char[sizeof(Splice) + array_size * 2];
659   Splice* splice = reinterpret_cast<Splice*>(raw);
660   splice->height_ = 0;
661   splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice));
662   splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size);
663   return splice;
664 }
665 
666 template <class Comparator>
667 bool InlineSkipList<Comparator>::Insert(const char* key) {
668   return Insert<false>(key, seq_splice_, false);
669 }
670 
671 template <class Comparator>
672 bool InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
673   Node* prev[kMaxPossibleHeight];
674   Node* next[kMaxPossibleHeight];
675   Splice splice;
676   splice.prev_ = prev;
677   splice.next_ = next;
678   return Insert<true>(key, &splice, false);
679 }
680 
681 template <class Comparator>
682 bool InlineSkipList<Comparator>::InsertWithHint(const char* key, void** hint) {
683   assert(hint != nullptr);
684   Splice* splice = reinterpret_cast<Splice*>(*hint);
685   if (splice == nullptr) {
686     splice = AllocateSplice();
687     *hint = reinterpret_cast<void*>(splice);
688   }
689   return Insert<false>(key, splice, true);
690 }
691 
692 template <class Comparator>
693 bool InlineSkipList<Comparator>::InsertWithHintConcurrently(const char* key,
694                                                             void** hint) {
695   assert(hint != nullptr);
696   Splice* splice = reinterpret_cast<Splice*>(*hint);
697   if (splice == nullptr) {
698     splice = AllocateSpliceOnHeap();
699     *hint = reinterpret_cast<void*>(splice);
700   }
701   return Insert<true>(key, splice, true);
702 }
703 
704 template <class Comparator>
705 template <bool prefetch_before>
706 void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,
707                                                     Node* before, Node* after,
708                                                     int level, Node** out_prev,
709                                                     Node** out_next) {
710   while (true) {
711     Node* next = before->Next(level);
712     if (next != nullptr) {
713       PREFETCH(next->Next(level), 0, 1);
714     }
715     if (prefetch_before == true) {
716       if (next != nullptr && level>0) {
717         PREFETCH(next->Next(level-1), 0, 1);
718       }
719     }
720     assert(before == head_ || next == nullptr ||
721            KeyIsAfterNode(next->Key(), before));
722     assert(before == head_ || KeyIsAfterNode(key, before));
723     if (next == after || !KeyIsAfterNode(key, next)) {
724       // found it
725       *out_prev = before;
726       *out_next = next;
727       return;
728     }
729     before = next;
730   }
731 }
732 
733 template <class Comparator>
734 void InlineSkipList<Comparator>::RecomputeSpliceLevels(const DecodedKey& key,
735                                                        Splice* splice,
736                                                        int recompute_level) {
737   assert(recompute_level > 0);
738   assert(recompute_level <= splice->height_);
739   for (int i = recompute_level - 1; i >= 0; --i) {
740     FindSpliceForLevel<true>(key, splice->prev_[i + 1], splice->next_[i + 1], i,
741                        &splice->prev_[i], &splice->next_[i]);
742   }
743 }
744 
745 template <class Comparator>
746 template <bool UseCAS>
747 bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice,
748                                         bool allow_partial_splice_fix) {
749   Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
750   const DecodedKey key_decoded = compare_.decode_key(key);
751   int height = x->UnstashHeight();
752   assert(height >= 1 && height <= kMaxHeight_);
753 
754   int max_height = max_height_.load(std::memory_order_relaxed);
755   while (height > max_height) {
756     if (max_height_.compare_exchange_weak(max_height, height)) {
757       // successfully updated it
758       max_height = height;
759       break;
760     }
761     // else retry, possibly exiting the loop because somebody else
762     // increased it
763   }
764   assert(max_height <= kMaxPossibleHeight);
765 
766   int recompute_height = 0;
767   if (splice->height_ < max_height) {
768     // Either splice has never been used or max_height has grown since
769     // last use.  We could potentially fix it in the latter case, but
770     // that is tricky.
771     splice->prev_[max_height] = head_;
772     splice->next_[max_height] = nullptr;
773     splice->height_ = max_height;
774     recompute_height = max_height;
775   } else {
776     // Splice is a valid proper-height splice that brackets some
777     // key, but does it bracket this one?  We need to validate it and
778     // recompute a portion of the splice (levels 0..recompute_height-1)
779     // that is a superset of all levels that don't bracket the new key.
780     // Several choices are reasonable, because we have to balance the work
781     // saved against the extra comparisons required to validate the Splice.
782     //
783     // One strategy is just to recompute all of orig_splice_height if the
784     // bottom level isn't bracketing.  This pessimistically assumes that
785     // we will either get a perfect Splice hit (increasing sequential
786     // inserts) or have no locality.
787     //
788     // Another strategy is to walk up the Splice's levels until we find
789     // a level that brackets the key.  This strategy lets the Splice
790     // hint help for other cases: it turns insertion from O(log N) into
791     // O(log D), where D is the number of nodes in between the key that
792     // produced the Splice and the current insert (insertion is aided
793     // whether the new key is before or after the splice).  If you have
794     // a way of using a prefix of the key to map directly to the closest
795     // Splice out of O(sqrt(N)) Splices and we make it so that splices
796     // can also be used as hints during read, then we end up with Oshman's
797     // and Shavit's SkipTrie, which has O(log log N) lookup and insertion
798     // (compare to O(log N) for skip list).
799     //
800     // We control the pessimistic strategy with allow_partial_splice_fix.
801     // A good strategy is probably to be pessimistic for seq_splice_,
802     // optimistic if the caller actually went to the work of providing
803     // a Splice.
804     while (recompute_height < max_height) {
805       if (splice->prev_[recompute_height]->Next(recompute_height) !=
806           splice->next_[recompute_height]) {
807         // splice isn't tight at this level, there must have been some inserts
808         // to this
809         // location that didn't update the splice.  We might only be a little
810         // stale, but if
811         // the splice is very stale it would be O(N) to fix it.  We haven't used
812         // up any of
813         // our budget of comparisons, so always move up even if we are
814         // pessimistic about
815         // our chances of success.
816         ++recompute_height;
817       } else if (splice->prev_[recompute_height] != head_ &&
818                  !KeyIsAfterNode(key_decoded,
819                                  splice->prev_[recompute_height])) {
820         // key is from before splice
821         if (allow_partial_splice_fix) {
822           // skip all levels with the same node without more comparisons
823           Node* bad = splice->prev_[recompute_height];
824           while (splice->prev_[recompute_height] == bad) {
825             ++recompute_height;
826           }
827         } else {
828           // we're pessimistic, recompute everything
829           recompute_height = max_height;
830         }
831       } else if (KeyIsAfterNode(key_decoded,
832                                 splice->next_[recompute_height])) {
833         // key is from after splice
834         if (allow_partial_splice_fix) {
835           Node* bad = splice->next_[recompute_height];
836           while (splice->next_[recompute_height] == bad) {
837             ++recompute_height;
838           }
839         } else {
840           recompute_height = max_height;
841         }
842       } else {
843         // this level brackets the key, we won!
844         break;
845       }
846     }
847   }
848   assert(recompute_height <= max_height);
849   if (recompute_height > 0) {
850     RecomputeSpliceLevels(key_decoded, splice, recompute_height);
851   }
852 
853   bool splice_is_valid = true;
854   if (UseCAS) {
855     for (int i = 0; i < height; ++i) {
856       while (true) {
857         // Checking for duplicate keys on the level 0 is sufficient
858         if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
859                      compare_(x->Key(), splice->next_[i]->Key()) >= 0)) {
860           // duplicate key
861           return false;
862         }
863         if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
864                      compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) {
865           // duplicate key
866           return false;
867         }
868         assert(splice->next_[i] == nullptr ||
869                compare_(x->Key(), splice->next_[i]->Key()) < 0);
870         assert(splice->prev_[i] == head_ ||
871                compare_(splice->prev_[i]->Key(), x->Key()) < 0);
872         x->NoBarrier_SetNext(i, splice->next_[i]);
873         if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) {
874           // success
875           break;
876         }
877         // CAS failed, we need to recompute prev and next. It is unlikely
878         // to be helpful to try to use a different level as we redo the
879         // search, because it should be unlikely that lots of nodes have
880         // been inserted between prev[i] and next[i]. No point in using
881         // next[i] as the after hint, because we know it is stale.
882         FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
883                                   &splice->prev_[i], &splice->next_[i]);
884 
885         // Since we've narrowed the bracket for level i, we might have
886         // violated the Splice constraint between i and i-1.  Make sure
887         // we recompute the whole thing next time.
888         if (i > 0) {
889           splice_is_valid = false;
890         }
891       }
892     }
893   } else {
894     for (int i = 0; i < height; ++i) {
895       if (i >= recompute_height &&
896           splice->prev_[i]->Next(i) != splice->next_[i]) {
897         FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i,
898                                   &splice->prev_[i], &splice->next_[i]);
899       }
900       // Checking for duplicate keys on the level 0 is sufficient
901       if (UNLIKELY(i == 0 && splice->next_[i] != nullptr &&
902                    compare_(x->Key(), splice->next_[i]->Key()) >= 0)) {
903         // duplicate key
904         return false;
905       }
906       if (UNLIKELY(i == 0 && splice->prev_[i] != head_ &&
907                    compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) {
908         // duplicate key
909         return false;
910       }
911       assert(splice->next_[i] == nullptr ||
912              compare_(x->Key(), splice->next_[i]->Key()) < 0);
913       assert(splice->prev_[i] == head_ ||
914              compare_(splice->prev_[i]->Key(), x->Key()) < 0);
915       assert(splice->prev_[i]->Next(i) == splice->next_[i]);
916       x->NoBarrier_SetNext(i, splice->next_[i]);
917       splice->prev_[i]->SetNext(i, x);
918     }
919   }
920   if (splice_is_valid) {
921     for (int i = 0; i < height; ++i) {
922       splice->prev_[i] = x;
923     }
924     assert(splice->prev_[splice->height_] == head_);
925     assert(splice->next_[splice->height_] == nullptr);
926     for (int i = 0; i < splice->height_; ++i) {
927       assert(splice->next_[i] == nullptr ||
928              compare_(key, splice->next_[i]->Key()) < 0);
929       assert(splice->prev_[i] == head_ ||
930              compare_(splice->prev_[i]->Key(), key) <= 0);
931       assert(splice->prev_[i + 1] == splice->prev_[i] ||
932              splice->prev_[i + 1] == head_ ||
933              compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) <
934                  0);
935       assert(splice->next_[i + 1] == splice->next_[i] ||
936              splice->next_[i + 1] == nullptr ||
937              compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) <
938                  0);
939     }
940   } else {
941     splice->height_ = 0;
942   }
943   return true;
944 }
945 
946 template <class Comparator>
947 bool InlineSkipList<Comparator>::Contains(const char* key) const {
948   Node* x = FindGreaterOrEqual(key);
949   if (x != nullptr && Equal(key, x->Key())) {
950     return true;
951   } else {
952     return false;
953   }
954 }
955 
956 template <class Comparator>
957 void InlineSkipList<Comparator>::TEST_Validate() const {
958   // Interate over all levels at the same time, and verify nodes appear in
959   // the right order, and nodes appear in upper level also appear in lower
960   // levels.
961   Node* nodes[kMaxPossibleHeight];
962   int max_height = GetMaxHeight();
963   assert(max_height > 0);
964   for (int i = 0; i < max_height; i++) {
965     nodes[i] = head_;
966   }
967   while (nodes[0] != nullptr) {
968     Node* l0_next = nodes[0]->Next(0);
969     if (l0_next == nullptr) {
970       break;
971     }
972     assert(nodes[0] == head_ || compare_(nodes[0]->Key(), l0_next->Key()) < 0);
973     nodes[0] = l0_next;
974 
975     int i = 1;
976     while (i < max_height) {
977       Node* next = nodes[i]->Next(i);
978       if (next == nullptr) {
979         break;
980       }
981       auto cmp = compare_(nodes[0]->Key(), next->Key());
982       assert(cmp <= 0);
983       if (cmp == 0) {
984         assert(next == nodes[0]);
985         nodes[i] = next;
986       } else {
987         break;
988       }
989       i++;
990     }
991   }
992   for (int i = 1; i < max_height; i++) {
993     assert(nodes[i] != nullptr && nodes[i]->Next(i) == nullptr);
994   }
995 }
996 
997 }  // namespace ROCKSDB_NAMESPACE
998