1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "test_util/sync_point_impl.h"
7 
8 #ifndef NDEBUG
9 namespace ROCKSDB_NAMESPACE {
GetInstance()10 KillPoint* KillPoint::GetInstance() {
11   static KillPoint kp;
12   return &kp;
13 }
14 
TestKillRandom(std::string kill_point,int odds_weight,const std::string & srcfile,int srcline)15 void KillPoint::TestKillRandom(std::string kill_point, int odds_weight,
16                                const std::string& srcfile, int srcline) {
17   if (rocksdb_kill_odds <= 0) {
18     return;
19   }
20   int odds = rocksdb_kill_odds * odds_weight;
21   for (auto& p : rocksdb_kill_exclude_prefixes) {
22     if (kill_point.substr(0, p.length()) == p) {
23       return;
24     }
25   }
26 
27   assert(odds > 0);
28   if (odds % 7 == 0) {
29     // class Random uses multiplier 16807, which is 7^5. If odds are
30     // multiplier of 7, there might be limited values generated.
31     odds++;
32   }
33   auto* r = Random::GetTLSInstance();
34   bool crash = r->OneIn(odds);
35   if (crash) {
36     port::Crash(srcfile, srcline);
37   }
38 }
39 
LoadDependency(const std::vector<SyncPointPair> & dependencies)40 void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
41   std::lock_guard<std::mutex> lock(mutex_);
42   successors_.clear();
43   predecessors_.clear();
44   cleared_points_.clear();
45   for (const auto& dependency : dependencies) {
46     successors_[dependency.predecessor].push_back(dependency.successor);
47     predecessors_[dependency.successor].push_back(dependency.predecessor);
48     point_filter_.Add(dependency.successor);
49     point_filter_.Add(dependency.predecessor);
50   }
51   cv_.notify_all();
52 }
53 
LoadDependencyAndMarkers(const std::vector<SyncPointPair> & dependencies,const std::vector<SyncPointPair> & markers)54 void SyncPoint::Data::LoadDependencyAndMarkers(
55   const std::vector<SyncPointPair>& dependencies,
56   const std::vector<SyncPointPair>& markers) {
57   std::lock_guard<std::mutex> lock(mutex_);
58   successors_.clear();
59   predecessors_.clear();
60   cleared_points_.clear();
61   markers_.clear();
62   marked_thread_id_.clear();
63   for (const auto& dependency : dependencies) {
64     successors_[dependency.predecessor].push_back(dependency.successor);
65     predecessors_[dependency.successor].push_back(dependency.predecessor);
66     point_filter_.Add(dependency.successor);
67     point_filter_.Add(dependency.predecessor);
68   }
69   for (const auto& marker : markers) {
70     successors_[marker.predecessor].push_back(marker.successor);
71     predecessors_[marker.successor].push_back(marker.predecessor);
72     markers_[marker.predecessor].push_back(marker.successor);
73     point_filter_.Add(marker.predecessor);
74     point_filter_.Add(marker.successor);
75   }
76   cv_.notify_all();
77 }
78 
PredecessorsAllCleared(const std::string & point)79 bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
80   for (const auto& pred : predecessors_[point]) {
81     if (cleared_points_.count(pred) == 0) {
82       return false;
83     }
84   }
85   return true;
86 }
87 
ClearCallBack(const std::string & point)88 void SyncPoint::Data::ClearCallBack(const std::string& point) {
89   std::unique_lock<std::mutex> lock(mutex_);
90   while (num_callbacks_running_ > 0) {
91     cv_.wait(lock);
92   }
93   callbacks_.erase(point);
94 }
95 
ClearAllCallBacks()96 void SyncPoint::Data::ClearAllCallBacks() {
97   std::unique_lock<std::mutex> lock(mutex_);
98   while (num_callbacks_running_ > 0) {
99     cv_.wait(lock);
100   }
101   callbacks_.clear();
102 }
103 
Process(const Slice & point,void * cb_arg)104 void SyncPoint::Data::Process(const Slice& point, void* cb_arg) {
105   if (!enabled_) {
106     return;
107   }
108 
109   // Use a filter to prevent mutex lock if possible.
110   if (!point_filter_.MayContain(point)) {
111     return;
112   }
113 
114   // Must convert to std::string for remaining work.  Take
115   //  heap hit.
116   std::string point_string(point.ToString());
117   std::unique_lock<std::mutex> lock(mutex_);
118   auto thread_id = std::this_thread::get_id();
119 
120   auto marker_iter = markers_.find(point_string);
121   if (marker_iter != markers_.end()) {
122     for (auto& marked_point : marker_iter->second) {
123       marked_thread_id_.emplace(marked_point, thread_id);
124       point_filter_.Add(marked_point);
125     }
126   }
127 
128   if (DisabledByMarker(point_string, thread_id)) {
129     return;
130   }
131 
132   while (!PredecessorsAllCleared(point_string)) {
133     cv_.wait(lock);
134     if (DisabledByMarker(point_string, thread_id)) {
135       return;
136     }
137   }
138 
139   auto callback_pair = callbacks_.find(point_string);
140   if (callback_pair != callbacks_.end()) {
141     num_callbacks_running_++;
142     mutex_.unlock();
143     callback_pair->second(cb_arg);
144     mutex_.lock();
145     num_callbacks_running_--;
146   }
147   cleared_points_.insert(point_string);
148   cv_.notify_all();
149 }
150 }  // namespace ROCKSDB_NAMESPACE
151 #endif
152