1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #include "db/flush_scheduler.h" 7 8 #include <cassert> 9 10 #include "db/column_family.h" 11 12 namespace ROCKSDB_NAMESPACE { 13 14 void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) { 15 #ifndef NDEBUG 16 { 17 std::lock_guard<std::mutex> lock(checking_mutex_); 18 assert(checking_set_.count(cfd) == 0); 19 checking_set_.insert(cfd); 20 } 21 #endif // NDEBUG 22 cfd->Ref(); 23 // Suppress false positive clang analyzer warnings. 24 #ifndef __clang_analyzer__ 25 Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)}; 26 while (!head_.compare_exchange_strong( 27 node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) { 28 // failing CAS updates the first param, so we are already set for 29 // retry. TakeNextColumnFamily won't happen until after another 30 // inter-thread synchronization, so we don't even need release 31 // semantics for this CAS 32 } 33 #endif // __clang_analyzer__ 34 } 35 36 ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { 37 while (true) { 38 if (head_.load(std::memory_order_relaxed) == nullptr) { 39 return nullptr; 40 } 41 42 // dequeue the head 43 Node* node = head_.load(std::memory_order_relaxed); 44 head_.store(node->next, std::memory_order_relaxed); 45 ColumnFamilyData* cfd = node->column_family; 46 delete node; 47 48 #ifndef NDEBUG 49 { 50 std::lock_guard<std::mutex> lock(checking_mutex_); 51 auto iter = checking_set_.find(cfd); 52 assert(iter != checking_set_.end()); 53 checking_set_.erase(iter); 54 } 55 #endif // NDEBUG 56 57 if (!cfd->IsDropped()) { 58 // success 59 return cfd; 60 } 61 62 // no longer relevant, retry 63 cfd->UnrefAndTryDelete(); 64 } 65 } 66 67 bool FlushScheduler::Empty() { 68 auto rv = head_.load(std::memory_order_relaxed) == nullptr; 69 #ifndef NDEBUG 70 std::lock_guard<std::mutex> lock(checking_mutex_); 71 // Empty is allowed to be called concurrnetly with ScheduleFlush. It would 72 // only miss the recent schedules. 73 assert((rv == checking_set_.empty()) || rv); 74 #endif // NDEBUG 75 return rv; 76 } 77 78 void FlushScheduler::Clear() { 79 ColumnFamilyData* cfd; 80 while ((cfd = TakeNextColumnFamily()) != nullptr) { 81 cfd->UnrefAndTryDelete(); 82 } 83 assert(head_.load(std::memory_order_relaxed) == nullptr); 84 } 85 86 } // namespace ROCKSDB_NAMESPACE 87