1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "test_util/sync_point_impl.h"
7
8 #ifndef NDEBUG
9 namespace ROCKSDB_NAMESPACE {
GetInstance()10 KillPoint* KillPoint::GetInstance() {
11 static KillPoint kp;
12 return &kp;
13 }
14
TestKillRandom(std::string kill_point,int odds_weight,const std::string & srcfile,int srcline)15 void KillPoint::TestKillRandom(std::string kill_point, int odds_weight,
16 const std::string& srcfile, int srcline) {
17 if (rocksdb_kill_odds <= 0) {
18 return;
19 }
20 int odds = rocksdb_kill_odds * odds_weight;
21 for (auto& p : rocksdb_kill_exclude_prefixes) {
22 if (kill_point.substr(0, p.length()) == p) {
23 return;
24 }
25 }
26
27 assert(odds > 0);
28 if (odds % 7 == 0) {
29 // class Random uses multiplier 16807, which is 7^5. If odds are
30 // multiplier of 7, there might be limited values generated.
31 odds++;
32 }
33 auto* r = Random::GetTLSInstance();
34 bool crash = r->OneIn(odds);
35 if (crash) {
36 port::Crash(srcfile, srcline);
37 }
38 }
39
LoadDependency(const std::vector<SyncPointPair> & dependencies)40 void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
41 std::lock_guard<std::mutex> lock(mutex_);
42 successors_.clear();
43 predecessors_.clear();
44 cleared_points_.clear();
45 for (const auto& dependency : dependencies) {
46 successors_[dependency.predecessor].push_back(dependency.successor);
47 predecessors_[dependency.successor].push_back(dependency.predecessor);
48 point_filter_.Add(dependency.successor);
49 point_filter_.Add(dependency.predecessor);
50 }
51 cv_.notify_all();
52 }
53
LoadDependencyAndMarkers(const std::vector<SyncPointPair> & dependencies,const std::vector<SyncPointPair> & markers)54 void SyncPoint::Data::LoadDependencyAndMarkers(
55 const std::vector<SyncPointPair>& dependencies,
56 const std::vector<SyncPointPair>& markers) {
57 std::lock_guard<std::mutex> lock(mutex_);
58 successors_.clear();
59 predecessors_.clear();
60 cleared_points_.clear();
61 markers_.clear();
62 marked_thread_id_.clear();
63 for (const auto& dependency : dependencies) {
64 successors_[dependency.predecessor].push_back(dependency.successor);
65 predecessors_[dependency.successor].push_back(dependency.predecessor);
66 point_filter_.Add(dependency.successor);
67 point_filter_.Add(dependency.predecessor);
68 }
69 for (const auto& marker : markers) {
70 successors_[marker.predecessor].push_back(marker.successor);
71 predecessors_[marker.successor].push_back(marker.predecessor);
72 markers_[marker.predecessor].push_back(marker.successor);
73 point_filter_.Add(marker.predecessor);
74 point_filter_.Add(marker.successor);
75 }
76 cv_.notify_all();
77 }
78
PredecessorsAllCleared(const std::string & point)79 bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
80 for (const auto& pred : predecessors_[point]) {
81 if (cleared_points_.count(pred) == 0) {
82 return false;
83 }
84 }
85 return true;
86 }
87
ClearCallBack(const std::string & point)88 void SyncPoint::Data::ClearCallBack(const std::string& point) {
89 std::unique_lock<std::mutex> lock(mutex_);
90 while (num_callbacks_running_ > 0) {
91 cv_.wait(lock);
92 }
93 callbacks_.erase(point);
94 }
95
ClearAllCallBacks()96 void SyncPoint::Data::ClearAllCallBacks() {
97 std::unique_lock<std::mutex> lock(mutex_);
98 while (num_callbacks_running_ > 0) {
99 cv_.wait(lock);
100 }
101 callbacks_.clear();
102 }
103
Process(const Slice & point,void * cb_arg)104 void SyncPoint::Data::Process(const Slice& point, void* cb_arg) {
105 if (!enabled_) {
106 return;
107 }
108
109 // Use a filter to prevent mutex lock if possible.
110 if (!point_filter_.MayContain(point)) {
111 return;
112 }
113
114 // Must convert to std::string for remaining work. Take
115 // heap hit.
116 std::string point_string(point.ToString());
117 std::unique_lock<std::mutex> lock(mutex_);
118 auto thread_id = std::this_thread::get_id();
119
120 auto marker_iter = markers_.find(point_string);
121 if (marker_iter != markers_.end()) {
122 for (auto& marked_point : marker_iter->second) {
123 marked_thread_id_.emplace(marked_point, thread_id);
124 point_filter_.Add(marked_point);
125 }
126 }
127
128 if (DisabledByMarker(point_string, thread_id)) {
129 return;
130 }
131
132 while (!PredecessorsAllCleared(point_string)) {
133 cv_.wait(lock);
134 if (DisabledByMarker(point_string, thread_id)) {
135 return;
136 }
137 }
138
139 auto callback_pair = callbacks_.find(point_string);
140 if (callback_pair != callbacks_.end()) {
141 num_callbacks_running_++;
142 mutex_.unlock();
143 callback_pair->second(cb_arg);
144 mutex_.lock();
145 num_callbacks_running_--;
146 }
147 cleared_points_.insert(point_string);
148 cv_.notify_all();
149 }
150 } // namespace ROCKSDB_NAMESPACE
151 #endif
152