1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "monitoring/thread_status_updater.h"
7 #include <memory>
8 #include "port/likely.h"
9 #include "rocksdb/env.h"
10 #include "util/mutexlock.h"
11 
12 namespace rocksdb {
13 
14 #ifdef ROCKSDB_USING_THREAD_STATUS
15 
16 __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
17 
RegisterThread(ThreadStatus::ThreadType ttype,uint64_t thread_id)18 void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype,
19                                          uint64_t thread_id) {
20   if (UNLIKELY(thread_status_data_ == nullptr)) {
21     thread_status_data_ = new ThreadStatusData();
22     thread_status_data_->thread_type = ttype;
23     thread_status_data_->thread_id = thread_id;
24     std::lock_guard<std::mutex> lck(thread_list_mutex_);
25     thread_data_set_.insert(thread_status_data_);
26   }
27 
28   ClearThreadOperationProperties();
29 }
30 
UnregisterThread()31 void ThreadStatusUpdater::UnregisterThread() {
32   if (thread_status_data_ != nullptr) {
33     std::lock_guard<std::mutex> lck(thread_list_mutex_);
34     thread_data_set_.erase(thread_status_data_);
35     delete thread_status_data_;
36     thread_status_data_ = nullptr;
37   }
38 }
39 
ResetThreadStatus()40 void ThreadStatusUpdater::ResetThreadStatus() {
41   ClearThreadState();
42   ClearThreadOperation();
43   SetColumnFamilyInfoKey(nullptr);
44 }
45 
SetColumnFamilyInfoKey(const void * cf_key)46 void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) {
47   auto* data = Get();
48   if (data == nullptr) {
49     return;
50   }
51   // set the tracking flag based on whether cf_key is non-null or not.
52   // If enable_thread_tracking is set to false, the input cf_key
53   // would be nullptr.
54   data->enable_tracking = (cf_key != nullptr);
55   data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed);
56 }
57 
GetColumnFamilyInfoKey()58 const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
59   auto* data = GetLocalThreadStatus();
60   if (data == nullptr) {
61     return nullptr;
62   }
63   return data->cf_key.load(std::memory_order_relaxed);
64 }
65 
SetThreadOperation(const ThreadStatus::OperationType type)66 void ThreadStatusUpdater::SetThreadOperation(
67     const ThreadStatus::OperationType type) {
68   auto* data = GetLocalThreadStatus();
69   if (data == nullptr) {
70     return;
71   }
72   // NOTE: Our practice here is to set all the thread operation properties
73   //       and stage before we set thread operation, and thread operation
74   //       will be set in std::memory_order_release.  This is to ensure
75   //       whenever a thread operation is not OP_UNKNOWN, we will always
76   //       have a consistent information on its properties.
77   data->operation_type.store(type, std::memory_order_release);
78   if (type == ThreadStatus::OP_UNKNOWN) {
79     data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
80                                 std::memory_order_relaxed);
81     ClearThreadOperationProperties();
82   }
83 }
84 
SetThreadOperationProperty(int i,uint64_t value)85 void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) {
86   auto* data = GetLocalThreadStatus();
87   if (data == nullptr) {
88     return;
89   }
90   data->op_properties[i].store(value, std::memory_order_relaxed);
91 }
92 
IncreaseThreadOperationProperty(int i,uint64_t delta)93 void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i,
94                                                           uint64_t delta) {
95   auto* data = GetLocalThreadStatus();
96   if (data == nullptr) {
97     return;
98   }
99   data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
100 }
101 
SetOperationStartTime(const uint64_t start_time)102 void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
103   auto* data = GetLocalThreadStatus();
104   if (data == nullptr) {
105     return;
106   }
107   data->op_start_time.store(start_time, std::memory_order_relaxed);
108 }
109 
ClearThreadOperation()110 void ThreadStatusUpdater::ClearThreadOperation() {
111   auto* data = GetLocalThreadStatus();
112   if (data == nullptr) {
113     return;
114   }
115   data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
116                               std::memory_order_relaxed);
117   data->operation_type.store(ThreadStatus::OP_UNKNOWN,
118                              std::memory_order_relaxed);
119   ClearThreadOperationProperties();
120 }
121 
ClearThreadOperationProperties()122 void ThreadStatusUpdater::ClearThreadOperationProperties() {
123   auto* data = GetLocalThreadStatus();
124   if (data == nullptr) {
125     return;
126   }
127   for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
128     data->op_properties[i].store(0, std::memory_order_relaxed);
129   }
130 }
131 
SetThreadOperationStage(ThreadStatus::OperationStage stage)132 ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
133     ThreadStatus::OperationStage stage) {
134   auto* data = GetLocalThreadStatus();
135   if (data == nullptr) {
136     return ThreadStatus::STAGE_UNKNOWN;
137   }
138   return data->operation_stage.exchange(stage, std::memory_order_relaxed);
139 }
140 
SetThreadState(const ThreadStatus::StateType type)141 void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type) {
142   auto* data = GetLocalThreadStatus();
143   if (data == nullptr) {
144     return;
145   }
146   data->state_type.store(type, std::memory_order_relaxed);
147 }
148 
ClearThreadState()149 void ThreadStatusUpdater::ClearThreadState() {
150   auto* data = GetLocalThreadStatus();
151   if (data == nullptr) {
152     return;
153   }
154   data->state_type.store(ThreadStatus::STATE_UNKNOWN,
155                          std::memory_order_relaxed);
156 }
157 
GetThreadList(std::vector<ThreadStatus> * thread_list)158 Status ThreadStatusUpdater::GetThreadList(
159     std::vector<ThreadStatus>* thread_list) {
160   thread_list->clear();
161   std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
162   uint64_t now_micros = Env::Default()->NowMicros();
163 
164   std::lock_guard<std::mutex> lck(thread_list_mutex_);
165   for (auto* thread_data : thread_data_set_) {
166     assert(thread_data);
167     auto thread_id = thread_data->thread_id.load(std::memory_order_relaxed);
168     auto thread_type = thread_data->thread_type.load(std::memory_order_relaxed);
169     // Since any change to cf_info_map requires thread_list_mutex,
170     // which is currently held by GetThreadList(), here we can safely
171     // use "memory_order_relaxed" to load the cf_key.
172     auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed);
173 
174     ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
175     ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
176     ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
177     uint64_t op_elapsed_micros = 0;
178     uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0};
179 
180     auto iter = cf_info_map_.find(cf_key);
181     if (iter != cf_info_map_.end()) {
182       op_type = thread_data->operation_type.load(std::memory_order_acquire);
183       // display lower-level info only when higher-level info is available.
184       if (op_type != ThreadStatus::OP_UNKNOWN) {
185         op_elapsed_micros = now_micros - thread_data->op_start_time.load(
186                                              std::memory_order_relaxed);
187         op_stage = thread_data->operation_stage.load(std::memory_order_relaxed);
188         state_type = thread_data->state_type.load(std::memory_order_relaxed);
189         for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
190           op_props[i] =
191               thread_data->op_properties[i].load(std::memory_order_relaxed);
192         }
193       }
194     }
195 
196     thread_list->emplace_back(
197         thread_id, thread_type,
198         iter != cf_info_map_.end() ? iter->second.db_name : "",
199         iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type,
200         op_elapsed_micros, op_stage, op_props, state_type);
201   }
202 
203   return Status::OK();
204 }
205 
GetLocalThreadStatus()206 ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
207   if (thread_status_data_ == nullptr) {
208     return nullptr;
209   }
210   if (!thread_status_data_->enable_tracking) {
211     assert(thread_status_data_->cf_key.load(std::memory_order_relaxed) ==
212            nullptr);
213     return nullptr;
214   }
215   return thread_status_data_;
216 }
217 
NewColumnFamilyInfo(const void * db_key,const std::string & db_name,const void * cf_key,const std::string & cf_name)218 void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key,
219                                               const std::string& db_name,
220                                               const void* cf_key,
221                                               const std::string& cf_name) {
222   // Acquiring same lock as GetThreadList() to guarantee
223   // a consistent view of global column family table (cf_info_map).
224   std::lock_guard<std::mutex> lck(thread_list_mutex_);
225 
226   cf_info_map_.emplace(std::piecewise_construct, std::make_tuple(cf_key),
227                        std::make_tuple(db_key, db_name, cf_name));
228   db_key_map_[db_key].insert(cf_key);
229 }
230 
EraseColumnFamilyInfo(const void * cf_key)231 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
232   // Acquiring same lock as GetThreadList() to guarantee
233   // a consistent view of global column family table (cf_info_map).
234   std::lock_guard<std::mutex> lck(thread_list_mutex_);
235 
236   auto cf_pair = cf_info_map_.find(cf_key);
237   if (cf_pair != cf_info_map_.end()) {
238     // Remove its entry from db_key_map_ by the following steps:
239     // 1. Obtain the entry in db_key_map_ whose set contains cf_key
240     // 2. Remove it from the set.
241     ConstantColumnFamilyInfo& cf_info = cf_pair->second;
242     auto db_pair = db_key_map_.find(cf_info.db_key);
243     assert(db_pair != db_key_map_.end());
244     size_t result __attribute__((__unused__));
245     result = db_pair->second.erase(cf_key);
246     assert(result);
247     cf_info_map_.erase(cf_pair);
248   }
249 }
250 
EraseDatabaseInfo(const void * db_key)251 void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
252   // Acquiring same lock as GetThreadList() to guarantee
253   // a consistent view of global column family table (cf_info_map).
254   std::lock_guard<std::mutex> lck(thread_list_mutex_);
255   auto db_pair = db_key_map_.find(db_key);
256   if (UNLIKELY(db_pair == db_key_map_.end())) {
257     // In some occasional cases such as DB::Open fails, we won't
258     // register ColumnFamilyInfo for a db.
259     return;
260   }
261 
262   for (auto cf_key : db_pair->second) {
263     auto cf_pair = cf_info_map_.find(cf_key);
264     if (cf_pair != cf_info_map_.end()) {
265       cf_info_map_.erase(cf_pair);
266     }
267   }
268   db_key_map_.erase(db_key);
269 }
270 
271 #else
272 
273 void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType /*ttype*/,
274                                          uint64_t /*thread_id*/) {}
275 
276 void ThreadStatusUpdater::UnregisterThread() {}
277 
278 void ThreadStatusUpdater::ResetThreadStatus() {}
279 
280 void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {}
281 
282 void ThreadStatusUpdater::SetThreadOperation(
283     const ThreadStatus::OperationType /*type*/) {}
284 
285 void ThreadStatusUpdater::ClearThreadOperation() {}
286 
287 void ThreadStatusUpdater::SetThreadState(
288     const ThreadStatus::StateType /*type*/) {}
289 
290 void ThreadStatusUpdater::ClearThreadState() {}
291 
292 Status ThreadStatusUpdater::GetThreadList(
293     std::vector<ThreadStatus>* /*thread_list*/) {
294   return Status::NotSupported(
295       "GetThreadList is not supported in the current running environment.");
296 }
297 
298 void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/,
299                                               const std::string& /*db_name*/,
300                                               const void* /*cf_key*/,
301                                               const std::string& /*cf_name*/) {}
302 
303 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {}
304 
305 void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {}
306 
307 void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/,
308                                                      uint64_t /*value*/) {}
309 
310 void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/,
311                                                           uint64_t /*delta*/) {}
312 
313 #endif  // ROCKSDB_USING_THREAD_STATUS
314 }  // namespace rocksdb
315