1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. Use of 7 // this source code is governed by a BSD-style license that can be found 8 // in the LICENSE file. See the AUTHORS file for names of contributors. 9 // 10 // InlineSkipList is derived from SkipList (skiplist.h), but it optimizes 11 // the memory layout by requiring that the key storage be allocated through 12 // the skip list instance. For the common case of SkipList<const char*, 13 // Cmp> this saves 1 pointer per skip list node and gives better cache 14 // locality, at the expense of wasted padding from using AllocateAligned 15 // instead of Allocate for the keys. The unused padding will be from 16 // 0 to sizeof(void*)-1 bytes, and the space savings are sizeof(void*) 17 // bytes, so despite the padding the space used is always less than 18 // SkipList<const char*, ..>. 19 // 20 // Thread safety ------------- 21 // 22 // Writes via Insert require external synchronization, most likely a mutex. 23 // InsertConcurrently can be safely called concurrently with reads and 24 // with other concurrent inserts. Reads require a guarantee that the 25 // InlineSkipList will not be destroyed while the read is in progress. 26 // Apart from that, reads progress without any internal locking or 27 // synchronization. 28 // 29 // Invariants: 30 // 31 // (1) Allocated nodes are never deleted until the InlineSkipList is 32 // destroyed. This is trivially guaranteed by the code since we never 33 // delete any skip list nodes. 34 // 35 // (2) The contents of a Node except for the next/prev pointers are 36 // immutable after the Node has been linked into the InlineSkipList. 37 // Only Insert() modifies the list, and it is careful to initialize a 38 // node and use release-stores to publish the nodes in one or more lists. 39 // 40 // ... prev vs. next pointer ordering ... 41 // 42 43 #pragma once 44 #include <assert.h> 45 #include <stdlib.h> 46 #include <algorithm> 47 #include <atomic> 48 #include <type_traits> 49 #include "memory/allocator.h" 50 #include "port/likely.h" 51 #include "port/port.h" 52 #include "rocksdb/slice.h" 53 #include "util/coding.h" 54 #include "util/random.h" 55 56 namespace ROCKSDB_NAMESPACE { 57 58 template <class Comparator> 59 class InlineSkipList { 60 private: 61 struct Node; 62 struct Splice; 63 64 public: 65 using DecodedKey = \ 66 typename std::remove_reference<Comparator>::type::DecodedType; 67 68 static const uint16_t kMaxPossibleHeight = 32; 69 70 // Create a new InlineSkipList object that will use "cmp" for comparing 71 // keys, and will allocate memory using "*allocator". Objects allocated 72 // in the allocator must remain allocated for the lifetime of the 73 // skiplist object. 74 explicit InlineSkipList(Comparator cmp, Allocator* allocator, 75 int32_t max_height = 12, 76 int32_t branching_factor = 4); 77 // No copying allowed 78 InlineSkipList(const InlineSkipList&) = delete; 79 InlineSkipList& operator=(const InlineSkipList&) = delete; 80 81 // Allocates a key and a skip-list node, returning a pointer to the key 82 // portion of the node. This method is thread-safe if the allocator 83 // is thread-safe. 84 char* AllocateKey(size_t key_size); 85 86 // Allocate a splice using allocator. 87 Splice* AllocateSplice(); 88 89 // Allocate a splice on heap. 90 Splice* AllocateSpliceOnHeap(); 91 92 // Inserts a key allocated by AllocateKey, after the actual key value 93 // has been filled in. 94 // 95 // REQUIRES: nothing that compares equal to key is currently in the list. 96 // REQUIRES: no concurrent calls to any of inserts. 97 bool Insert(const char* key); 98 99 // Inserts a key allocated by AllocateKey with a hint of last insert 100 // position in the skip-list. If hint points to nullptr, a new hint will be 101 // populated, which can be used in subsequent calls. 102 // 103 // It can be used to optimize the workload where there are multiple groups 104 // of keys, and each key is likely to insert to a location close to the last 105 // inserted key in the same group. One example is sequential inserts. 106 // 107 // REQUIRES: nothing that compares equal to key is currently in the list. 108 // REQUIRES: no concurrent calls to any of inserts. 109 bool InsertWithHint(const char* key, void** hint); 110 111 // Like InsertConcurrently, but with a hint 112 // 113 // REQUIRES: nothing that compares equal to key is currently in the list. 114 // REQUIRES: no concurrent calls that use same hint 115 bool InsertWithHintConcurrently(const char* key, void** hint); 116 117 // Like Insert, but external synchronization is not required. 118 bool InsertConcurrently(const char* key); 119 120 // Inserts a node into the skip list. key must have been allocated by 121 // AllocateKey and then filled in by the caller. If UseCAS is true, 122 // then external synchronization is not required, otherwise this method 123 // may not be called concurrently with any other insertions. 124 // 125 // Regardless of whether UseCAS is true, the splice must be owned 126 // exclusively by the current thread. If allow_partial_splice_fix is 127 // true, then the cost of insertion is amortized O(log D), where D is 128 // the distance from the splice to the inserted key (measured as the 129 // number of intervening nodes). Note that this bound is very good for 130 // sequential insertions! If allow_partial_splice_fix is false then 131 // the existing splice will be ignored unless the current key is being 132 // inserted immediately after the splice. allow_partial_splice_fix == 133 // false has worse running time for the non-sequential case O(log N), 134 // but a better constant factor. 135 template <bool UseCAS> 136 bool Insert(const char* key, Splice* splice, bool allow_partial_splice_fix); 137 138 // Returns true iff an entry that compares equal to key is in the list. 139 bool Contains(const char* key) const; 140 141 // Return estimated number of entries smaller than `key`. 142 uint64_t EstimateCount(const char* key) const; 143 144 // Validate correctness of the skip-list. 145 void TEST_Validate() const; 146 147 // Iteration over the contents of a skip list 148 class Iterator { 149 public: 150 // Initialize an iterator over the specified list. 151 // The returned iterator is not valid. 152 explicit Iterator(const InlineSkipList* list); 153 154 // Change the underlying skiplist used for this iterator 155 // This enables us not changing the iterator without deallocating 156 // an old one and then allocating a new one 157 void SetList(const InlineSkipList* list); 158 159 // Returns true iff the iterator is positioned at a valid node. 160 bool Valid() const; 161 162 // Returns the key at the current position. 163 // REQUIRES: Valid() 164 const char* key() const; 165 166 // Advances to the next position. 167 // REQUIRES: Valid() 168 void Next(); 169 170 // Advances to the previous position. 171 // REQUIRES: Valid() 172 void Prev(); 173 174 // Advance to the first entry with a key >= target 175 void Seek(const char* target); 176 177 // Retreat to the last entry with a key <= target 178 void SeekForPrev(const char* target); 179 180 // Advance to a random entry in the list. 181 void RandomSeek(); 182 183 // Position at the first entry in list. 184 // Final state of iterator is Valid() iff list is not empty. 185 void SeekToFirst(); 186 187 // Position at the last entry in list. 188 // Final state of iterator is Valid() iff list is not empty. 189 void SeekToLast(); 190 191 private: 192 const InlineSkipList* list_; 193 Node* node_; 194 // Intentionally copyable 195 }; 196 197 private: 198 const uint16_t kMaxHeight_; 199 const uint16_t kBranching_; 200 const uint32_t kScaledInverseBranching_; 201 202 Allocator* const allocator_; // Allocator used for allocations of nodes 203 // Immutable after construction 204 Comparator const compare_; 205 Node* const head_; 206 207 // Modified only by Insert(). Read racily by readers, but stale 208 // values are ok. 209 std::atomic<int> max_height_; // Height of the entire list 210 211 // seq_splice_ is a Splice used for insertions in the non-concurrent 212 // case. It caches the prev and next found during the most recent 213 // non-concurrent insertion. 214 Splice* seq_splice_; 215 GetMaxHeight()216 inline int GetMaxHeight() const { 217 return max_height_.load(std::memory_order_relaxed); 218 } 219 220 int RandomHeight(); 221 222 Node* AllocateNode(size_t key_size, int height); 223 Equal(const char * a,const char * b)224 bool Equal(const char* a, const char* b) const { 225 return (compare_(a, b) == 0); 226 } 227 LessThan(const char * a,const char * b)228 bool LessThan(const char* a, const char* b) const { 229 return (compare_(a, b) < 0); 230 } 231 232 // Return true if key is greater than the data stored in "n". Null n 233 // is considered infinite. n should not be head_. 234 bool KeyIsAfterNode(const char* key, Node* n) const; 235 bool KeyIsAfterNode(const DecodedKey& key, Node* n) const; 236 237 // Returns the earliest node with a key >= key. 238 // Return nullptr if there is no such node. 239 Node* FindGreaterOrEqual(const char* key) const; 240 241 // Return the latest node with a key < key. 242 // Return head_ if there is no such node. 243 // Fills prev[level] with pointer to previous node at "level" for every 244 // level in [0..max_height_-1], if prev is non-null. 245 Node* FindLessThan(const char* key, Node** prev = nullptr) const; 246 247 // Return the latest node with a key < key on bottom_level. Start searching 248 // from root node on the level below top_level. 249 // Fills prev[level] with pointer to previous node at "level" for every 250 // level in [bottom_level..top_level-1], if prev is non-null. 251 Node* FindLessThan(const char* key, Node** prev, Node* root, int top_level, 252 int bottom_level) const; 253 254 // Return the last node in the list. 255 // Return head_ if list is empty. 256 Node* FindLast() const; 257 258 // Returns a random entry. 259 Node* FindRandomEntry() const; 260 261 // Traverses a single level of the list, setting *out_prev to the last 262 // node before the key and *out_next to the first node after. Assumes 263 // that the key is not present in the skip list. On entry, before should 264 // point to a node that is before the key, and after should point to 265 // a node that is after the key. after should be nullptr if a good after 266 // node isn't conveniently available. 267 template<bool prefetch_before> 268 void FindSpliceForLevel(const DecodedKey& key, Node* before, Node* after, int level, 269 Node** out_prev, Node** out_next); 270 271 // Recomputes Splice levels from highest_level (inclusive) down to 272 // lowest_level (inclusive). 273 void RecomputeSpliceLevels(const DecodedKey& key, Splice* splice, 274 int recompute_level); 275 }; 276 277 // Implementation details follow 278 279 template <class Comparator> 280 struct InlineSkipList<Comparator>::Splice { 281 // The invariant of a Splice is that prev_[i+1].key <= prev_[i].key < 282 // next_[i].key <= next_[i+1].key for all i. That means that if a 283 // key is bracketed by prev_[i] and next_[i] then it is bracketed by 284 // all higher levels. It is _not_ required that prev_[i]->Next(i) == 285 // next_[i] (it probably did at some point in the past, but intervening 286 // or concurrent operations might have inserted nodes in between). 287 int height_ = 0; 288 Node** prev_; 289 Node** next_; 290 }; 291 292 // The Node data type is more of a pointer into custom-managed memory than 293 // a traditional C++ struct. The key is stored in the bytes immediately 294 // after the struct, and the next_ pointers for nodes with height > 1 are 295 // stored immediately _before_ the struct. This avoids the need to include 296 // any pointer or sizing data, which reduces per-node memory overheads. 297 template <class Comparator> 298 struct InlineSkipList<Comparator>::Node { 299 // Stores the height of the node in the memory location normally used for 300 // next_[0]. This is used for passing data from AllocateKey to Insert. 301 void StashHeight(const int height) { 302 assert(sizeof(int) <= sizeof(next_[0])); 303 memcpy(static_cast<void*>(&next_[0]), &height, sizeof(int)); 304 } 305 306 // Retrieves the value passed to StashHeight. Undefined after a call 307 // to SetNext or NoBarrier_SetNext. 308 int UnstashHeight() const { 309 int rv; 310 memcpy(&rv, &next_[0], sizeof(int)); 311 return rv; 312 } 313 314 const char* Key() const { return reinterpret_cast<const char*>(&next_[1]); } 315 316 // Accessors/mutators for links. Wrapped in methods so we can add 317 // the appropriate barriers as necessary, and perform the necessary 318 // addressing trickery for storing links below the Node in memory. 319 Node* Next(int n) { 320 assert(n >= 0); 321 // Use an 'acquire load' so that we observe a fully initialized 322 // version of the returned Node. 323 return ((&next_[0] - n)->load(std::memory_order_acquire)); 324 } 325 326 void SetNext(int n, Node* x) { 327 assert(n >= 0); 328 // Use a 'release store' so that anybody who reads through this 329 // pointer observes a fully initialized version of the inserted node. 330 (&next_[0] - n)->store(x, std::memory_order_release); 331 } 332 333 bool CASNext(int n, Node* expected, Node* x) { 334 assert(n >= 0); 335 return (&next_[0] - n)->compare_exchange_strong(expected, x); 336 } 337 338 // No-barrier variants that can be safely used in a few locations. 339 Node* NoBarrier_Next(int n) { 340 assert(n >= 0); 341 return (&next_[0] - n)->load(std::memory_order_relaxed); 342 } 343 344 void NoBarrier_SetNext(int n, Node* x) { 345 assert(n >= 0); 346 (&next_[0] - n)->store(x, std::memory_order_relaxed); 347 } 348 349 // Insert node after prev on specific level. 350 void InsertAfter(Node* prev, int level) { 351 // NoBarrier_SetNext() suffices since we will add a barrier when 352 // we publish a pointer to "this" in prev. 353 NoBarrier_SetNext(level, prev->NoBarrier_Next(level)); 354 prev->SetNext(level, this); 355 } 356 357 private: 358 // next_[0] is the lowest level link (level 0). Higher levels are 359 // stored _earlier_, so level 1 is at next_[-1]. 360 std::atomic<Node*> next_[1]; 361 }; 362 363 template <class Comparator> 364 inline InlineSkipList<Comparator>::Iterator::Iterator( 365 const InlineSkipList* list) { 366 SetList(list); 367 } 368 369 template <class Comparator> 370 inline void InlineSkipList<Comparator>::Iterator::SetList( 371 const InlineSkipList* list) { 372 list_ = list; 373 node_ = nullptr; 374 } 375 376 template <class Comparator> 377 inline bool InlineSkipList<Comparator>::Iterator::Valid() const { 378 return node_ != nullptr; 379 } 380 381 template <class Comparator> 382 inline const char* InlineSkipList<Comparator>::Iterator::key() const { 383 assert(Valid()); 384 return node_->Key(); 385 } 386 387 template <class Comparator> 388 inline void InlineSkipList<Comparator>::Iterator::Next() { 389 assert(Valid()); 390 node_ = node_->Next(0); 391 } 392 393 template <class Comparator> 394 inline void InlineSkipList<Comparator>::Iterator::Prev() { 395 // Instead of using explicit "prev" links, we just search for the 396 // last node that falls before key. 397 assert(Valid()); 398 node_ = list_->FindLessThan(node_->Key()); 399 if (node_ == list_->head_) { 400 node_ = nullptr; 401 } 402 } 403 404 template <class Comparator> 405 inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) { 406 node_ = list_->FindGreaterOrEqual(target); 407 } 408 409 template <class Comparator> 410 inline void InlineSkipList<Comparator>::Iterator::SeekForPrev( 411 const char* target) { 412 Seek(target); 413 if (!Valid()) { 414 SeekToLast(); 415 } 416 while (Valid() && list_->LessThan(target, key())) { 417 Prev(); 418 } 419 } 420 421 template <class Comparator> 422 inline void InlineSkipList<Comparator>::Iterator::RandomSeek() { 423 node_ = list_->FindRandomEntry(); 424 } 425 426 template <class Comparator> 427 inline void InlineSkipList<Comparator>::Iterator::SeekToFirst() { 428 node_ = list_->head_->Next(0); 429 } 430 431 template <class Comparator> 432 inline void InlineSkipList<Comparator>::Iterator::SeekToLast() { 433 node_ = list_->FindLast(); 434 if (node_ == list_->head_) { 435 node_ = nullptr; 436 } 437 } 438 439 template <class Comparator> 440 int InlineSkipList<Comparator>::RandomHeight() { 441 auto rnd = Random::GetTLSInstance(); 442 443 // Increase height with probability 1 in kBranching 444 int height = 1; 445 while (height < kMaxHeight_ && height < kMaxPossibleHeight && 446 rnd->Next() < kScaledInverseBranching_) { 447 height++; 448 } 449 assert(height > 0); 450 assert(height <= kMaxHeight_); 451 assert(height <= kMaxPossibleHeight); 452 return height; 453 } 454 455 template <class Comparator> 456 bool InlineSkipList<Comparator>::KeyIsAfterNode(const char* key, 457 Node* n) const { 458 // nullptr n is considered infinite 459 assert(n != head_); 460 return (n != nullptr) && (compare_(n->Key(), key) < 0); 461 } 462 463 template <class Comparator> 464 bool InlineSkipList<Comparator>::KeyIsAfterNode(const DecodedKey& key, 465 Node* n) const { 466 // nullptr n is considered infinite 467 assert(n != head_); 468 return (n != nullptr) && (compare_(n->Key(), key) < 0); 469 } 470 471 template <class Comparator> 472 typename InlineSkipList<Comparator>::Node* 473 InlineSkipList<Comparator>::FindGreaterOrEqual(const char* key) const { 474 // Note: It looks like we could reduce duplication by implementing 475 // this function as FindLessThan(key)->Next(0), but we wouldn't be able 476 // to exit early on equality and the result wouldn't even be correct. 477 // A concurrent insert might occur after FindLessThan(key) but before 478 // we get a chance to call Next(0). 479 Node* x = head_; 480 int level = GetMaxHeight() - 1; 481 Node* last_bigger = nullptr; 482 const DecodedKey key_decoded = compare_.decode_key(key); 483 while (true) { 484 Node* next = x->Next(level); 485 if (next != nullptr) { 486 PREFETCH(next->Next(level), 0, 1); 487 } 488 // Make sure the lists are sorted 489 assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x)); 490 // Make sure we haven't overshot during our search 491 assert(x == head_ || KeyIsAfterNode(key_decoded, x)); 492 int cmp = (next == nullptr || next == last_bigger) 493 ? 1 494 : compare_(next->Key(), key_decoded); 495 if (cmp == 0 || (cmp > 0 && level == 0)) { 496 return next; 497 } else if (cmp < 0) { 498 // Keep searching in this list 499 x = next; 500 } else { 501 // Switch to next list, reuse compare_() result 502 last_bigger = next; 503 level--; 504 } 505 } 506 } 507 508 template <class Comparator> 509 typename InlineSkipList<Comparator>::Node* 510 InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev) const { 511 return FindLessThan(key, prev, head_, GetMaxHeight(), 0); 512 } 513 514 template <class Comparator> 515 typename InlineSkipList<Comparator>::Node* 516 InlineSkipList<Comparator>::FindLessThan(const char* key, Node** prev, 517 Node* root, int top_level, 518 int bottom_level) const { 519 assert(top_level > bottom_level); 520 int level = top_level - 1; 521 Node* x = root; 522 // KeyIsAfter(key, last_not_after) is definitely false 523 Node* last_not_after = nullptr; 524 const DecodedKey key_decoded = compare_.decode_key(key); 525 while (true) { 526 assert(x != nullptr); 527 Node* next = x->Next(level); 528 if (next != nullptr) { 529 PREFETCH(next->Next(level), 0, 1); 530 } 531 assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x)); 532 assert(x == head_ || KeyIsAfterNode(key_decoded, x)); 533 if (next != last_not_after && KeyIsAfterNode(key_decoded, next)) { 534 // Keep searching in this list 535 assert(next != nullptr); 536 x = next; 537 } else { 538 if (prev != nullptr) { 539 prev[level] = x; 540 } 541 if (level == bottom_level) { 542 return x; 543 } else { 544 // Switch to next list, reuse KeyIsAfterNode() result 545 last_not_after = next; 546 level--; 547 } 548 } 549 } 550 } 551 552 template <class Comparator> 553 typename InlineSkipList<Comparator>::Node* 554 InlineSkipList<Comparator>::FindLast() const { 555 Node* x = head_; 556 int level = GetMaxHeight() - 1; 557 while (true) { 558 Node* next = x->Next(level); 559 if (next == nullptr) { 560 if (level == 0) { 561 return x; 562 } else { 563 // Switch to next list 564 level--; 565 } 566 } else { 567 x = next; 568 } 569 } 570 } 571 572 template <class Comparator> 573 typename InlineSkipList<Comparator>::Node* 574 InlineSkipList<Comparator>::FindRandomEntry() const { 575 // TODO(bjlemaire): consider adding PREFETCH calls. 576 Node *x = head_, *scan_node = nullptr, *limit_node = nullptr; 577 578 // We start at the max level. 579 // FOr each level, we look at all the nodes at the level, and 580 // we randomly pick one of them. Then decrement the level 581 // and reiterate the process. 582 // eg: assume GetMaxHeight()=5, and there are #100 elements (nodes). 583 // level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15. 584 // We will consider all the nodes between #15 (inclusive) and #67 585 // (exclusive). #67 is called 'limit_node' here. 586 // level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose 587 // #51. #67 remains 'limit_node'. 588 // [...] 589 // level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57. 590 // Return Node #57. 591 std::vector<Node*> lvl_nodes; 592 Random* rnd = Random::GetTLSInstance(); 593 int level = GetMaxHeight() - 1; 594 595 while (level >= 0) { 596 lvl_nodes.clear(); 597 scan_node = x; 598 while (scan_node != limit_node) { 599 lvl_nodes.push_back(scan_node); 600 scan_node = scan_node->Next(level); 601 } 602 uint32_t rnd_idx = rnd->Next() % lvl_nodes.size(); 603 x = lvl_nodes[rnd_idx]; 604 if (rnd_idx + 1 < lvl_nodes.size()) { 605 limit_node = lvl_nodes[rnd_idx + 1]; 606 } 607 level--; 608 } 609 // There is a special case where x could still be the head_ 610 // (note that the head_ contains no key). 611 return x == head_ ? head_->Next(0) : x; 612 } 613 614 template <class Comparator> 615 uint64_t InlineSkipList<Comparator>::EstimateCount(const char* key) const { 616 uint64_t count = 0; 617 618 Node* x = head_; 619 int level = GetMaxHeight() - 1; 620 const DecodedKey key_decoded = compare_.decode_key(key); 621 while (true) { 622 assert(x == head_ || compare_(x->Key(), key_decoded) < 0); 623 Node* next = x->Next(level); 624 if (next != nullptr) { 625 PREFETCH(next->Next(level), 0, 1); 626 } 627 if (next == nullptr || compare_(next->Key(), key_decoded) >= 0) { 628 if (level == 0) { 629 return count; 630 } else { 631 // Switch to next list 632 count *= kBranching_; 633 level--; 634 } 635 } else { 636 x = next; 637 count++; 638 } 639 } 640 } 641 642 template <class Comparator> 643 InlineSkipList<Comparator>::InlineSkipList(const Comparator cmp, 644 Allocator* allocator, 645 int32_t max_height, 646 int32_t branching_factor) 647 : kMaxHeight_(static_cast<uint16_t>(max_height)), 648 kBranching_(static_cast<uint16_t>(branching_factor)), 649 kScaledInverseBranching_((Random::kMaxNext + 1) / kBranching_), 650 allocator_(allocator), 651 compare_(cmp), 652 head_(AllocateNode(0, max_height)), 653 max_height_(1), 654 seq_splice_(AllocateSplice()) { 655 assert(max_height > 0 && kMaxHeight_ == static_cast<uint32_t>(max_height)); 656 assert(branching_factor > 1 && 657 kBranching_ == static_cast<uint32_t>(branching_factor)); 658 assert(kScaledInverseBranching_ > 0); 659 660 for (int i = 0; i < kMaxHeight_; ++i) { 661 head_->SetNext(i, nullptr); 662 } 663 } 664 665 template <class Comparator> 666 char* InlineSkipList<Comparator>::AllocateKey(size_t key_size) { 667 return const_cast<char*>(AllocateNode(key_size, RandomHeight())->Key()); 668 } 669 670 template <class Comparator> 671 typename InlineSkipList<Comparator>::Node* 672 InlineSkipList<Comparator>::AllocateNode(size_t key_size, int height) { 673 auto prefix = sizeof(std::atomic<Node*>) * (height - 1); 674 675 // prefix is space for the height - 1 pointers that we store before 676 // the Node instance (next_[-(height - 1) .. -1]). Node starts at 677 // raw + prefix, and holds the bottom-mode (level 0) skip list pointer 678 // next_[0]. key_size is the bytes for the key, which comes just after 679 // the Node. 680 char* raw = allocator_->AllocateAligned(prefix + sizeof(Node) + key_size); 681 Node* x = reinterpret_cast<Node*>(raw + prefix); 682 683 // Once we've linked the node into the skip list we don't actually need 684 // to know its height, because we can implicitly use the fact that we 685 // traversed into a node at level h to known that h is a valid level 686 // for that node. We need to convey the height to the Insert step, 687 // however, so that it can perform the proper links. Since we're not 688 // using the pointers at the moment, StashHeight temporarily borrow 689 // storage from next_[0] for that purpose. 690 x->StashHeight(height); 691 return x; 692 } 693 694 template <class Comparator> 695 typename InlineSkipList<Comparator>::Splice* 696 InlineSkipList<Comparator>::AllocateSplice() { 697 // size of prev_ and next_ 698 size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1); 699 char* raw = allocator_->AllocateAligned(sizeof(Splice) + array_size * 2); 700 Splice* splice = reinterpret_cast<Splice*>(raw); 701 splice->height_ = 0; 702 splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice)); 703 splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size); 704 return splice; 705 } 706 707 template <class Comparator> 708 typename InlineSkipList<Comparator>::Splice* 709 InlineSkipList<Comparator>::AllocateSpliceOnHeap() { 710 size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1); 711 char* raw = new char[sizeof(Splice) + array_size * 2]; 712 Splice* splice = reinterpret_cast<Splice*>(raw); 713 splice->height_ = 0; 714 splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice)); 715 splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size); 716 return splice; 717 } 718 719 template <class Comparator> 720 bool InlineSkipList<Comparator>::Insert(const char* key) { 721 return Insert<false>(key, seq_splice_, false); 722 } 723 724 template <class Comparator> 725 bool InlineSkipList<Comparator>::InsertConcurrently(const char* key) { 726 Node* prev[kMaxPossibleHeight]; 727 Node* next[kMaxPossibleHeight]; 728 Splice splice; 729 splice.prev_ = prev; 730 splice.next_ = next; 731 return Insert<true>(key, &splice, false); 732 } 733 734 template <class Comparator> 735 bool InlineSkipList<Comparator>::InsertWithHint(const char* key, void** hint) { 736 assert(hint != nullptr); 737 Splice* splice = reinterpret_cast<Splice*>(*hint); 738 if (splice == nullptr) { 739 splice = AllocateSplice(); 740 *hint = reinterpret_cast<void*>(splice); 741 } 742 return Insert<false>(key, splice, true); 743 } 744 745 template <class Comparator> 746 bool InlineSkipList<Comparator>::InsertWithHintConcurrently(const char* key, 747 void** hint) { 748 assert(hint != nullptr); 749 Splice* splice = reinterpret_cast<Splice*>(*hint); 750 if (splice == nullptr) { 751 splice = AllocateSpliceOnHeap(); 752 *hint = reinterpret_cast<void*>(splice); 753 } 754 return Insert<true>(key, splice, true); 755 } 756 757 template <class Comparator> 758 template <bool prefetch_before> 759 void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key, 760 Node* before, Node* after, 761 int level, Node** out_prev, 762 Node** out_next) { 763 while (true) { 764 Node* next = before->Next(level); 765 if (next != nullptr) { 766 PREFETCH(next->Next(level), 0, 1); 767 } 768 if (prefetch_before == true) { 769 if (next != nullptr && level>0) { 770 PREFETCH(next->Next(level-1), 0, 1); 771 } 772 } 773 assert(before == head_ || next == nullptr || 774 KeyIsAfterNode(next->Key(), before)); 775 assert(before == head_ || KeyIsAfterNode(key, before)); 776 if (next == after || !KeyIsAfterNode(key, next)) { 777 // found it 778 *out_prev = before; 779 *out_next = next; 780 return; 781 } 782 before = next; 783 } 784 } 785 786 template <class Comparator> 787 void InlineSkipList<Comparator>::RecomputeSpliceLevels(const DecodedKey& key, 788 Splice* splice, 789 int recompute_level) { 790 assert(recompute_level > 0); 791 assert(recompute_level <= splice->height_); 792 for (int i = recompute_level - 1; i >= 0; --i) { 793 FindSpliceForLevel<true>(key, splice->prev_[i + 1], splice->next_[i + 1], i, 794 &splice->prev_[i], &splice->next_[i]); 795 } 796 } 797 798 template <class Comparator> 799 template <bool UseCAS> 800 bool InlineSkipList<Comparator>::Insert(const char* key, Splice* splice, 801 bool allow_partial_splice_fix) { 802 Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1; 803 const DecodedKey key_decoded = compare_.decode_key(key); 804 int height = x->UnstashHeight(); 805 assert(height >= 1 && height <= kMaxHeight_); 806 807 int max_height = max_height_.load(std::memory_order_relaxed); 808 while (height > max_height) { 809 if (max_height_.compare_exchange_weak(max_height, height)) { 810 // successfully updated it 811 max_height = height; 812 break; 813 } 814 // else retry, possibly exiting the loop because somebody else 815 // increased it 816 } 817 assert(max_height <= kMaxPossibleHeight); 818 819 int recompute_height = 0; 820 if (splice->height_ < max_height) { 821 // Either splice has never been used or max_height has grown since 822 // last use. We could potentially fix it in the latter case, but 823 // that is tricky. 824 splice->prev_[max_height] = head_; 825 splice->next_[max_height] = nullptr; 826 splice->height_ = max_height; 827 recompute_height = max_height; 828 } else { 829 // Splice is a valid proper-height splice that brackets some 830 // key, but does it bracket this one? We need to validate it and 831 // recompute a portion of the splice (levels 0..recompute_height-1) 832 // that is a superset of all levels that don't bracket the new key. 833 // Several choices are reasonable, because we have to balance the work 834 // saved against the extra comparisons required to validate the Splice. 835 // 836 // One strategy is just to recompute all of orig_splice_height if the 837 // bottom level isn't bracketing. This pessimistically assumes that 838 // we will either get a perfect Splice hit (increasing sequential 839 // inserts) or have no locality. 840 // 841 // Another strategy is to walk up the Splice's levels until we find 842 // a level that brackets the key. This strategy lets the Splice 843 // hint help for other cases: it turns insertion from O(log N) into 844 // O(log D), where D is the number of nodes in between the key that 845 // produced the Splice and the current insert (insertion is aided 846 // whether the new key is before or after the splice). If you have 847 // a way of using a prefix of the key to map directly to the closest 848 // Splice out of O(sqrt(N)) Splices and we make it so that splices 849 // can also be used as hints during read, then we end up with Oshman's 850 // and Shavit's SkipTrie, which has O(log log N) lookup and insertion 851 // (compare to O(log N) for skip list). 852 // 853 // We control the pessimistic strategy with allow_partial_splice_fix. 854 // A good strategy is probably to be pessimistic for seq_splice_, 855 // optimistic if the caller actually went to the work of providing 856 // a Splice. 857 while (recompute_height < max_height) { 858 if (splice->prev_[recompute_height]->Next(recompute_height) != 859 splice->next_[recompute_height]) { 860 // splice isn't tight at this level, there must have been some inserts 861 // to this 862 // location that didn't update the splice. We might only be a little 863 // stale, but if 864 // the splice is very stale it would be O(N) to fix it. We haven't used 865 // up any of 866 // our budget of comparisons, so always move up even if we are 867 // pessimistic about 868 // our chances of success. 869 ++recompute_height; 870 } else if (splice->prev_[recompute_height] != head_ && 871 !KeyIsAfterNode(key_decoded, 872 splice->prev_[recompute_height])) { 873 // key is from before splice 874 if (allow_partial_splice_fix) { 875 // skip all levels with the same node without more comparisons 876 Node* bad = splice->prev_[recompute_height]; 877 while (splice->prev_[recompute_height] == bad) { 878 ++recompute_height; 879 } 880 } else { 881 // we're pessimistic, recompute everything 882 recompute_height = max_height; 883 } 884 } else if (KeyIsAfterNode(key_decoded, 885 splice->next_[recompute_height])) { 886 // key is from after splice 887 if (allow_partial_splice_fix) { 888 Node* bad = splice->next_[recompute_height]; 889 while (splice->next_[recompute_height] == bad) { 890 ++recompute_height; 891 } 892 } else { 893 recompute_height = max_height; 894 } 895 } else { 896 // this level brackets the key, we won! 897 break; 898 } 899 } 900 } 901 assert(recompute_height <= max_height); 902 if (recompute_height > 0) { 903 RecomputeSpliceLevels(key_decoded, splice, recompute_height); 904 } 905 906 bool splice_is_valid = true; 907 if (UseCAS) { 908 for (int i = 0; i < height; ++i) { 909 while (true) { 910 // Checking for duplicate keys on the level 0 is sufficient 911 if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && 912 compare_(x->Key(), splice->next_[i]->Key()) >= 0)) { 913 // duplicate key 914 return false; 915 } 916 if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && 917 compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) { 918 // duplicate key 919 return false; 920 } 921 assert(splice->next_[i] == nullptr || 922 compare_(x->Key(), splice->next_[i]->Key()) < 0); 923 assert(splice->prev_[i] == head_ || 924 compare_(splice->prev_[i]->Key(), x->Key()) < 0); 925 x->NoBarrier_SetNext(i, splice->next_[i]); 926 if (splice->prev_[i]->CASNext(i, splice->next_[i], x)) { 927 // success 928 break; 929 } 930 // CAS failed, we need to recompute prev and next. It is unlikely 931 // to be helpful to try to use a different level as we redo the 932 // search, because it should be unlikely that lots of nodes have 933 // been inserted between prev[i] and next[i]. No point in using 934 // next[i] as the after hint, because we know it is stale. 935 FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i, 936 &splice->prev_[i], &splice->next_[i]); 937 938 // Since we've narrowed the bracket for level i, we might have 939 // violated the Splice constraint between i and i-1. Make sure 940 // we recompute the whole thing next time. 941 if (i > 0) { 942 splice_is_valid = false; 943 } 944 } 945 } 946 } else { 947 for (int i = 0; i < height; ++i) { 948 if (i >= recompute_height && 949 splice->prev_[i]->Next(i) != splice->next_[i]) { 950 FindSpliceForLevel<false>(key_decoded, splice->prev_[i], nullptr, i, 951 &splice->prev_[i], &splice->next_[i]); 952 } 953 // Checking for duplicate keys on the level 0 is sufficient 954 if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && 955 compare_(x->Key(), splice->next_[i]->Key()) >= 0)) { 956 // duplicate key 957 return false; 958 } 959 if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && 960 compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) { 961 // duplicate key 962 return false; 963 } 964 assert(splice->next_[i] == nullptr || 965 compare_(x->Key(), splice->next_[i]->Key()) < 0); 966 assert(splice->prev_[i] == head_ || 967 compare_(splice->prev_[i]->Key(), x->Key()) < 0); 968 assert(splice->prev_[i]->Next(i) == splice->next_[i]); 969 x->NoBarrier_SetNext(i, splice->next_[i]); 970 splice->prev_[i]->SetNext(i, x); 971 } 972 } 973 if (splice_is_valid) { 974 for (int i = 0; i < height; ++i) { 975 splice->prev_[i] = x; 976 } 977 assert(splice->prev_[splice->height_] == head_); 978 assert(splice->next_[splice->height_] == nullptr); 979 for (int i = 0; i < splice->height_; ++i) { 980 assert(splice->next_[i] == nullptr || 981 compare_(key, splice->next_[i]->Key()) < 0); 982 assert(splice->prev_[i] == head_ || 983 compare_(splice->prev_[i]->Key(), key) <= 0); 984 assert(splice->prev_[i + 1] == splice->prev_[i] || 985 splice->prev_[i + 1] == head_ || 986 compare_(splice->prev_[i + 1]->Key(), splice->prev_[i]->Key()) < 987 0); 988 assert(splice->next_[i + 1] == splice->next_[i] || 989 splice->next_[i + 1] == nullptr || 990 compare_(splice->next_[i]->Key(), splice->next_[i + 1]->Key()) < 991 0); 992 } 993 } else { 994 splice->height_ = 0; 995 } 996 return true; 997 } 998 999 template <class Comparator> 1000 bool InlineSkipList<Comparator>::Contains(const char* key) const { 1001 Node* x = FindGreaterOrEqual(key); 1002 if (x != nullptr && Equal(key, x->Key())) { 1003 return true; 1004 } else { 1005 return false; 1006 } 1007 } 1008 1009 template <class Comparator> 1010 void InlineSkipList<Comparator>::TEST_Validate() const { 1011 // Interate over all levels at the same time, and verify nodes appear in 1012 // the right order, and nodes appear in upper level also appear in lower 1013 // levels. 1014 Node* nodes[kMaxPossibleHeight]; 1015 int max_height = GetMaxHeight(); 1016 assert(max_height > 0); 1017 for (int i = 0; i < max_height; i++) { 1018 nodes[i] = head_; 1019 } 1020 while (nodes[0] != nullptr) { 1021 Node* l0_next = nodes[0]->Next(0); 1022 if (l0_next == nullptr) { 1023 break; 1024 } 1025 assert(nodes[0] == head_ || compare_(nodes[0]->Key(), l0_next->Key()) < 0); 1026 nodes[0] = l0_next; 1027 1028 int i = 1; 1029 while (i < max_height) { 1030 Node* next = nodes[i]->Next(i); 1031 if (next == nullptr) { 1032 break; 1033 } 1034 auto cmp = compare_(nodes[0]->Key(), next->Key()); 1035 assert(cmp <= 0); 1036 if (cmp == 0) { 1037 assert(next == nodes[0]); 1038 nodes[i] = next; 1039 } else { 1040 break; 1041 } 1042 i++; 1043 } 1044 } 1045 for (int i = 1; i < max_height; i++) { 1046 assert(nodes[i] != nullptr && nodes[i]->Next(i) == nullptr); 1047 } 1048 } 1049 1050 } // namespace ROCKSDB_NAMESPACE 1051