1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #include "test_util/sync_point_impl.h" 7 8 #ifndef NDEBUG 9 namespace ROCKSDB_NAMESPACE { 10 11 void TestKillRandom(std::string kill_point, int odds, 12 const std::string& srcfile, int srcline) { 13 for (auto& p : rocksdb_kill_prefix_blacklist) { 14 if (kill_point.substr(0, p.length()) == p) { 15 return; 16 } 17 } 18 19 assert(odds > 0); 20 if (odds % 7 == 0) { 21 // class Random uses multiplier 16807, which is 7^5. If odds are 22 // multiplier of 7, there might be limited values generated. 23 odds++; 24 } 25 auto* r = Random::GetTLSInstance(); 26 bool crash = r->OneIn(odds); 27 if (crash) { 28 port::Crash(srcfile, srcline); 29 } 30 } 31 32 33 void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) { 34 std::lock_guard<std::mutex> lock(mutex_); 35 successors_.clear(); 36 predecessors_.clear(); 37 cleared_points_.clear(); 38 for (const auto& dependency : dependencies) { 39 successors_[dependency.predecessor].push_back(dependency.successor); 40 predecessors_[dependency.successor].push_back(dependency.predecessor); 41 } 42 cv_.notify_all(); 43 } 44 45 void SyncPoint::Data::LoadDependencyAndMarkers( 46 const std::vector<SyncPointPair>& dependencies, 47 const std::vector<SyncPointPair>& markers) { 48 std::lock_guard<std::mutex> lock(mutex_); 49 successors_.clear(); 50 predecessors_.clear(); 51 cleared_points_.clear(); 52 markers_.clear(); 53 marked_thread_id_.clear(); 54 for (const auto& dependency : dependencies) { 55 successors_[dependency.predecessor].push_back(dependency.successor); 56 predecessors_[dependency.successor].push_back(dependency.predecessor); 57 } 58 for (const auto& marker : markers) { 59 successors_[marker.predecessor].push_back(marker.successor); 60 predecessors_[marker.successor].push_back(marker.predecessor); 61 markers_[marker.predecessor].push_back(marker.successor); 62 } 63 cv_.notify_all(); 64 } 65 66 bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) { 67 for (const auto& pred : predecessors_[point]) { 68 if (cleared_points_.count(pred) == 0) { 69 return false; 70 } 71 } 72 return true; 73 } 74 75 void SyncPoint::Data::ClearCallBack(const std::string& point) { 76 std::unique_lock<std::mutex> lock(mutex_); 77 while (num_callbacks_running_ > 0) { 78 cv_.wait(lock); 79 } 80 callbacks_.erase(point); 81 } 82 83 void SyncPoint::Data::ClearAllCallBacks() { 84 std::unique_lock<std::mutex> lock(mutex_); 85 while (num_callbacks_running_ > 0) { 86 cv_.wait(lock); 87 } 88 callbacks_.clear(); 89 } 90 91 void SyncPoint::Data::Process(const std::string& point, void* cb_arg) { 92 if (!enabled_) { 93 return; 94 } 95 96 std::unique_lock<std::mutex> lock(mutex_); 97 auto thread_id = std::this_thread::get_id(); 98 99 auto marker_iter = markers_.find(point); 100 if (marker_iter != markers_.end()) { 101 for (auto& marked_point : marker_iter->second) { 102 marked_thread_id_.emplace(marked_point, thread_id); 103 } 104 } 105 106 if (DisabledByMarker(point, thread_id)) { 107 return; 108 } 109 110 while (!PredecessorsAllCleared(point)) { 111 cv_.wait(lock); 112 if (DisabledByMarker(point, thread_id)) { 113 return; 114 } 115 } 116 117 auto callback_pair = callbacks_.find(point); 118 if (callback_pair != callbacks_.end()) { 119 num_callbacks_running_++; 120 mutex_.unlock(); 121 callback_pair->second(cb_arg); 122 mutex_.lock(); 123 num_callbacks_running_--; 124 } 125 cleared_points_.insert(point); 126 cv_.notify_all(); 127 } 128 } // namespace ROCKSDB_NAMESPACE 129 #endif 130