1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "monitoring/thread_status_updater.h"
7 #include <memory>
8 #include "port/likely.h"
9 #include "rocksdb/env.h"
10 #include "util/mutexlock.h"
11
12 namespace rocksdb {
13
14 #ifdef ROCKSDB_USING_THREAD_STATUS
15
16 __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
17
RegisterThread(ThreadStatus::ThreadType ttype,uint64_t thread_id)18 void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype,
19 uint64_t thread_id) {
20 if (UNLIKELY(thread_status_data_ == nullptr)) {
21 thread_status_data_ = new ThreadStatusData();
22 thread_status_data_->thread_type = ttype;
23 thread_status_data_->thread_id = thread_id;
24 std::lock_guard<std::mutex> lck(thread_list_mutex_);
25 thread_data_set_.insert(thread_status_data_);
26 }
27
28 ClearThreadOperationProperties();
29 }
30
UnregisterThread()31 void ThreadStatusUpdater::UnregisterThread() {
32 if (thread_status_data_ != nullptr) {
33 std::lock_guard<std::mutex> lck(thread_list_mutex_);
34 thread_data_set_.erase(thread_status_data_);
35 delete thread_status_data_;
36 thread_status_data_ = nullptr;
37 }
38 }
39
ResetThreadStatus()40 void ThreadStatusUpdater::ResetThreadStatus() {
41 ClearThreadState();
42 ClearThreadOperation();
43 SetColumnFamilyInfoKey(nullptr);
44 }
45
SetColumnFamilyInfoKey(const void * cf_key)46 void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) {
47 auto* data = Get();
48 if (data == nullptr) {
49 return;
50 }
51 // set the tracking flag based on whether cf_key is non-null or not.
52 // If enable_thread_tracking is set to false, the input cf_key
53 // would be nullptr.
54 data->enable_tracking = (cf_key != nullptr);
55 data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed);
56 }
57
GetColumnFamilyInfoKey()58 const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
59 auto* data = GetLocalThreadStatus();
60 if (data == nullptr) {
61 return nullptr;
62 }
63 return data->cf_key.load(std::memory_order_relaxed);
64 }
65
SetThreadOperation(const ThreadStatus::OperationType type)66 void ThreadStatusUpdater::SetThreadOperation(
67 const ThreadStatus::OperationType type) {
68 auto* data = GetLocalThreadStatus();
69 if (data == nullptr) {
70 return;
71 }
72 // NOTE: Our practice here is to set all the thread operation properties
73 // and stage before we set thread operation, and thread operation
74 // will be set in std::memory_order_release. This is to ensure
75 // whenever a thread operation is not OP_UNKNOWN, we will always
76 // have a consistent information on its properties.
77 data->operation_type.store(type, std::memory_order_release);
78 if (type == ThreadStatus::OP_UNKNOWN) {
79 data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
80 std::memory_order_relaxed);
81 ClearThreadOperationProperties();
82 }
83 }
84
SetThreadOperationProperty(int i,uint64_t value)85 void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) {
86 auto* data = GetLocalThreadStatus();
87 if (data == nullptr) {
88 return;
89 }
90 data->op_properties[i].store(value, std::memory_order_relaxed);
91 }
92
IncreaseThreadOperationProperty(int i,uint64_t delta)93 void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i,
94 uint64_t delta) {
95 auto* data = GetLocalThreadStatus();
96 if (data == nullptr) {
97 return;
98 }
99 data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
100 }
101
SetOperationStartTime(const uint64_t start_time)102 void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
103 auto* data = GetLocalThreadStatus();
104 if (data == nullptr) {
105 return;
106 }
107 data->op_start_time.store(start_time, std::memory_order_relaxed);
108 }
109
ClearThreadOperation()110 void ThreadStatusUpdater::ClearThreadOperation() {
111 auto* data = GetLocalThreadStatus();
112 if (data == nullptr) {
113 return;
114 }
115 data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
116 std::memory_order_relaxed);
117 data->operation_type.store(ThreadStatus::OP_UNKNOWN,
118 std::memory_order_relaxed);
119 ClearThreadOperationProperties();
120 }
121
ClearThreadOperationProperties()122 void ThreadStatusUpdater::ClearThreadOperationProperties() {
123 auto* data = GetLocalThreadStatus();
124 if (data == nullptr) {
125 return;
126 }
127 for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
128 data->op_properties[i].store(0, std::memory_order_relaxed);
129 }
130 }
131
SetThreadOperationStage(ThreadStatus::OperationStage stage)132 ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
133 ThreadStatus::OperationStage stage) {
134 auto* data = GetLocalThreadStatus();
135 if (data == nullptr) {
136 return ThreadStatus::STAGE_UNKNOWN;
137 }
138 return data->operation_stage.exchange(stage, std::memory_order_relaxed);
139 }
140
SetThreadState(const ThreadStatus::StateType type)141 void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type) {
142 auto* data = GetLocalThreadStatus();
143 if (data == nullptr) {
144 return;
145 }
146 data->state_type.store(type, std::memory_order_relaxed);
147 }
148
ClearThreadState()149 void ThreadStatusUpdater::ClearThreadState() {
150 auto* data = GetLocalThreadStatus();
151 if (data == nullptr) {
152 return;
153 }
154 data->state_type.store(ThreadStatus::STATE_UNKNOWN,
155 std::memory_order_relaxed);
156 }
157
GetThreadList(std::vector<ThreadStatus> * thread_list)158 Status ThreadStatusUpdater::GetThreadList(
159 std::vector<ThreadStatus>* thread_list) {
160 thread_list->clear();
161 std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
162 uint64_t now_micros = Env::Default()->NowMicros();
163
164 std::lock_guard<std::mutex> lck(thread_list_mutex_);
165 for (auto* thread_data : thread_data_set_) {
166 assert(thread_data);
167 auto thread_id = thread_data->thread_id.load(std::memory_order_relaxed);
168 auto thread_type = thread_data->thread_type.load(std::memory_order_relaxed);
169 // Since any change to cf_info_map requires thread_list_mutex,
170 // which is currently held by GetThreadList(), here we can safely
171 // use "memory_order_relaxed" to load the cf_key.
172 auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed);
173
174 ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
175 ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
176 ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
177 uint64_t op_elapsed_micros = 0;
178 uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0};
179
180 auto iter = cf_info_map_.find(cf_key);
181 if (iter != cf_info_map_.end()) {
182 op_type = thread_data->operation_type.load(std::memory_order_acquire);
183 // display lower-level info only when higher-level info is available.
184 if (op_type != ThreadStatus::OP_UNKNOWN) {
185 op_elapsed_micros = now_micros - thread_data->op_start_time.load(
186 std::memory_order_relaxed);
187 op_stage = thread_data->operation_stage.load(std::memory_order_relaxed);
188 state_type = thread_data->state_type.load(std::memory_order_relaxed);
189 for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
190 op_props[i] =
191 thread_data->op_properties[i].load(std::memory_order_relaxed);
192 }
193 }
194 }
195
196 thread_list->emplace_back(
197 thread_id, thread_type,
198 iter != cf_info_map_.end() ? iter->second.db_name : "",
199 iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type,
200 op_elapsed_micros, op_stage, op_props, state_type);
201 }
202
203 return Status::OK();
204 }
205
GetLocalThreadStatus()206 ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
207 if (thread_status_data_ == nullptr) {
208 return nullptr;
209 }
210 if (!thread_status_data_->enable_tracking) {
211 assert(thread_status_data_->cf_key.load(std::memory_order_relaxed) ==
212 nullptr);
213 return nullptr;
214 }
215 return thread_status_data_;
216 }
217
NewColumnFamilyInfo(const void * db_key,const std::string & db_name,const void * cf_key,const std::string & cf_name)218 void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key,
219 const std::string& db_name,
220 const void* cf_key,
221 const std::string& cf_name) {
222 // Acquiring same lock as GetThreadList() to guarantee
223 // a consistent view of global column family table (cf_info_map).
224 std::lock_guard<std::mutex> lck(thread_list_mutex_);
225
226 cf_info_map_.emplace(std::piecewise_construct, std::make_tuple(cf_key),
227 std::make_tuple(db_key, db_name, cf_name));
228 db_key_map_[db_key].insert(cf_key);
229 }
230
EraseColumnFamilyInfo(const void * cf_key)231 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
232 // Acquiring same lock as GetThreadList() to guarantee
233 // a consistent view of global column family table (cf_info_map).
234 std::lock_guard<std::mutex> lck(thread_list_mutex_);
235
236 auto cf_pair = cf_info_map_.find(cf_key);
237 if (cf_pair != cf_info_map_.end()) {
238 // Remove its entry from db_key_map_ by the following steps:
239 // 1. Obtain the entry in db_key_map_ whose set contains cf_key
240 // 2. Remove it from the set.
241 ConstantColumnFamilyInfo& cf_info = cf_pair->second;
242 auto db_pair = db_key_map_.find(cf_info.db_key);
243 assert(db_pair != db_key_map_.end());
244 size_t result __attribute__((__unused__));
245 result = db_pair->second.erase(cf_key);
246 assert(result);
247 cf_info_map_.erase(cf_pair);
248 }
249 }
250
EraseDatabaseInfo(const void * db_key)251 void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
252 // Acquiring same lock as GetThreadList() to guarantee
253 // a consistent view of global column family table (cf_info_map).
254 std::lock_guard<std::mutex> lck(thread_list_mutex_);
255 auto db_pair = db_key_map_.find(db_key);
256 if (UNLIKELY(db_pair == db_key_map_.end())) {
257 // In some occasional cases such as DB::Open fails, we won't
258 // register ColumnFamilyInfo for a db.
259 return;
260 }
261
262 for (auto cf_key : db_pair->second) {
263 auto cf_pair = cf_info_map_.find(cf_key);
264 if (cf_pair != cf_info_map_.end()) {
265 cf_info_map_.erase(cf_pair);
266 }
267 }
268 db_key_map_.erase(db_key);
269 }
270
271 #else
272
273 void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType /*ttype*/,
274 uint64_t /*thread_id*/) {}
275
276 void ThreadStatusUpdater::UnregisterThread() {}
277
278 void ThreadStatusUpdater::ResetThreadStatus() {}
279
280 void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {}
281
282 void ThreadStatusUpdater::SetThreadOperation(
283 const ThreadStatus::OperationType /*type*/) {}
284
285 void ThreadStatusUpdater::ClearThreadOperation() {}
286
287 void ThreadStatusUpdater::SetThreadState(
288 const ThreadStatus::StateType /*type*/) {}
289
290 void ThreadStatusUpdater::ClearThreadState() {}
291
292 Status ThreadStatusUpdater::GetThreadList(
293 std::vector<ThreadStatus>* /*thread_list*/) {
294 return Status::NotSupported(
295 "GetThreadList is not supported in the current running environment.");
296 }
297
298 void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/,
299 const std::string& /*db_name*/,
300 const void* /*cf_key*/,
301 const std::string& /*cf_name*/) {}
302
303 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {}
304
305 void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {}
306
307 void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/,
308 uint64_t /*value*/) {}
309
310 void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/,
311 uint64_t /*delta*/) {}
312
313 #endif // ROCKSDB_USING_THREAD_STATUS
314 } // namespace rocksdb
315