1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/flush_scheduler.h"
7 
8 #include <cassert>
9 
10 #include "db/column_family.h"
11 
12 namespace ROCKSDB_NAMESPACE {
13 
14 void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) {
15 #ifndef NDEBUG
16   {
17     std::lock_guard<std::mutex> lock(checking_mutex_);
18     assert(checking_set_.count(cfd) == 0);
19     checking_set_.insert(cfd);
20   }
21 #endif  // NDEBUG
22   cfd->Ref();
23 // Suppress false positive clang analyzer warnings.
24 #ifndef __clang_analyzer__
25   Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
26   while (!head_.compare_exchange_strong(
27       node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
28     // failing CAS updates the first param, so we are already set for
29     // retry.  TakeNextColumnFamily won't happen until after another
30     // inter-thread synchronization, so we don't even need release
31     // semantics for this CAS
32   }
33 #endif  // __clang_analyzer__
34 }
35 
36 ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
37   while (true) {
38     if (head_.load(std::memory_order_relaxed) == nullptr) {
39       return nullptr;
40     }
41 
42     // dequeue the head
43     Node* node = head_.load(std::memory_order_relaxed);
44     head_.store(node->next, std::memory_order_relaxed);
45     ColumnFamilyData* cfd = node->column_family;
46     delete node;
47 
48 #ifndef NDEBUG
49     {
50       std::lock_guard<std::mutex> lock(checking_mutex_);
51       auto iter = checking_set_.find(cfd);
52       assert(iter != checking_set_.end());
53       checking_set_.erase(iter);
54     }
55 #endif  // NDEBUG
56 
57     if (!cfd->IsDropped()) {
58       // success
59       return cfd;
60     }
61 
62     // no longer relevant, retry
63     cfd->UnrefAndTryDelete();
64   }
65 }
66 
67 bool FlushScheduler::Empty() {
68   auto rv = head_.load(std::memory_order_relaxed) == nullptr;
69 #ifndef NDEBUG
70   std::lock_guard<std::mutex> lock(checking_mutex_);
71   // Empty is allowed to be called concurrnetly with ScheduleFlush. It would
72   // only miss the recent schedules.
73   assert((rv == checking_set_.empty()) || rv);
74 #endif  // NDEBUG
75   return rv;
76 }
77 
78 void FlushScheduler::Clear() {
79   ColumnFamilyData* cfd;
80   while ((cfd = TakeNextColumnFamily()) != nullptr) {
81     cfd->UnrefAndTryDelete();
82   }
83   assert(head_.load(std::memory_order_relaxed) == nullptr);
84 }
85 
86 }  // namespace ROCKSDB_NAMESPACE
87