1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "test_util/sync_point_impl.h"
7 
8 #ifndef NDEBUG
9 namespace ROCKSDB_NAMESPACE {
10 
TestKillRandom(std::string kill_point,int odds,const std::string & srcfile,int srcline)11 void TestKillRandom(std::string kill_point, int odds,
12                     const std::string& srcfile, int srcline) {
13   for (auto& p : rocksdb_kill_prefix_blacklist) {
14     if (kill_point.substr(0, p.length()) == p) {
15       return;
16     }
17   }
18 
19   assert(odds > 0);
20   if (odds % 7 == 0) {
21     // class Random uses multiplier 16807, which is 7^5. If odds are
22     // multiplier of 7, there might be limited values generated.
23     odds++;
24   }
25   auto* r = Random::GetTLSInstance();
26   bool crash = r->OneIn(odds);
27   if (crash) {
28     port::Crash(srcfile, srcline);
29   }
30 }
31 
32 
LoadDependency(const std::vector<SyncPointPair> & dependencies)33 void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
34   std::lock_guard<std::mutex> lock(mutex_);
35   successors_.clear();
36   predecessors_.clear();
37   cleared_points_.clear();
38   for (const auto& dependency : dependencies) {
39     successors_[dependency.predecessor].push_back(dependency.successor);
40     predecessors_[dependency.successor].push_back(dependency.predecessor);
41   }
42   cv_.notify_all();
43 }
44 
LoadDependencyAndMarkers(const std::vector<SyncPointPair> & dependencies,const std::vector<SyncPointPair> & markers)45 void SyncPoint::Data::LoadDependencyAndMarkers(
46   const std::vector<SyncPointPair>& dependencies,
47   const std::vector<SyncPointPair>& markers) {
48   std::lock_guard<std::mutex> lock(mutex_);
49   successors_.clear();
50   predecessors_.clear();
51   cleared_points_.clear();
52   markers_.clear();
53   marked_thread_id_.clear();
54   for (const auto& dependency : dependencies) {
55     successors_[dependency.predecessor].push_back(dependency.successor);
56     predecessors_[dependency.successor].push_back(dependency.predecessor);
57   }
58   for (const auto& marker : markers) {
59     successors_[marker.predecessor].push_back(marker.successor);
60     predecessors_[marker.successor].push_back(marker.predecessor);
61     markers_[marker.predecessor].push_back(marker.successor);
62   }
63   cv_.notify_all();
64 }
65 
PredecessorsAllCleared(const std::string & point)66 bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
67   for (const auto& pred : predecessors_[point]) {
68     if (cleared_points_.count(pred) == 0) {
69       return false;
70     }
71   }
72   return true;
73 }
74 
ClearCallBack(const std::string & point)75 void SyncPoint::Data::ClearCallBack(const std::string& point) {
76   std::unique_lock<std::mutex> lock(mutex_);
77   while (num_callbacks_running_ > 0) {
78     cv_.wait(lock);
79   }
80   callbacks_.erase(point);
81 }
82 
ClearAllCallBacks()83 void SyncPoint::Data::ClearAllCallBacks() {
84   std::unique_lock<std::mutex> lock(mutex_);
85   while (num_callbacks_running_ > 0) {
86     cv_.wait(lock);
87   }
88   callbacks_.clear();
89 }
90 
Process(const std::string & point,void * cb_arg)91 void SyncPoint::Data::Process(const std::string& point, void* cb_arg) {
92   if (!enabled_) {
93     return;
94   }
95 
96   std::unique_lock<std::mutex> lock(mutex_);
97   auto thread_id = std::this_thread::get_id();
98 
99   auto marker_iter = markers_.find(point);
100   if (marker_iter != markers_.end()) {
101     for (auto& marked_point : marker_iter->second) {
102       marked_thread_id_.emplace(marked_point, thread_id);
103     }
104   }
105 
106   if (DisabledByMarker(point, thread_id)) {
107     return;
108   }
109 
110   while (!PredecessorsAllCleared(point)) {
111     cv_.wait(lock);
112     if (DisabledByMarker(point, thread_id)) {
113       return;
114     }
115   }
116 
117   auto callback_pair = callbacks_.find(point);
118   if (callback_pair != callbacks_.end()) {
119     num_callbacks_running_++;
120     mutex_.unlock();
121     callback_pair->second(cb_arg);
122     mutex_.lock();
123     num_callbacks_running_--;
124   }
125   cleared_points_.insert(point);
126   cv_.notify_all();
127 }
128 }  // namespace ROCKSDB_NAMESPACE
129 #endif
130