1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include <mutex>
7 #include <condition_variable>
8 
9 #include "monitoring/thread_status_updater.h"
10 #include "rocksdb/db.h"
11 #include "test_util/testharness.h"
12 
13 #ifdef ROCKSDB_USING_THREAD_STATUS
14 
15 namespace ROCKSDB_NAMESPACE {
16 
17 class SimulatedBackgroundTask {
18  public:
SimulatedBackgroundTask(const void * db_key,const std::string & db_name,const void * cf_key,const std::string & cf_name,const ThreadStatus::OperationType operation_type=ThreadStatus::OP_UNKNOWN,const ThreadStatus::StateType state_type=ThreadStatus::STATE_UNKNOWN)19   SimulatedBackgroundTask(
20       const void* db_key, const std::string& db_name,
21       const void* cf_key, const std::string& cf_name,
22       const ThreadStatus::OperationType operation_type =
23           ThreadStatus::OP_UNKNOWN,
24       const ThreadStatus::StateType state_type =
25           ThreadStatus::STATE_UNKNOWN)
26       : db_key_(db_key), db_name_(db_name),
27         cf_key_(cf_key), cf_name_(cf_name),
28         operation_type_(operation_type), state_type_(state_type),
29         should_run_(true), running_count_(0) {
30     Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo(
31         db_key_, db_name_, cf_key_, cf_name_);
32   }
33 
~SimulatedBackgroundTask()34   ~SimulatedBackgroundTask() {
35     Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_);
36   }
37 
Run()38   void Run() {
39     std::unique_lock<std::mutex> l(mutex_);
40     running_count_++;
41     Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
42     Env::Default()->GetThreadStatusUpdater()->SetThreadOperation(
43         operation_type_);
44     Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_);
45     while (should_run_) {
46       bg_cv_.wait(l);
47     }
48     Env::Default()->GetThreadStatusUpdater()->ClearThreadState();
49     Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation();
50     Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(nullptr);
51     running_count_--;
52     bg_cv_.notify_all();
53   }
54 
FinishAllTasks()55   void FinishAllTasks() {
56     std::unique_lock<std::mutex> l(mutex_);
57     should_run_ = false;
58     bg_cv_.notify_all();
59   }
60 
WaitUntilScheduled(int job_count,Env * env)61   void WaitUntilScheduled(int job_count, Env* env) {
62     while (running_count_ < job_count) {
63       env->SleepForMicroseconds(1000);
64     }
65   }
66 
WaitUntilDone()67   void WaitUntilDone() {
68     std::unique_lock<std::mutex> l(mutex_);
69     while (running_count_ > 0) {
70       bg_cv_.wait(l);
71     }
72   }
73 
DoSimulatedTask(void * arg)74   static void DoSimulatedTask(void* arg) {
75     reinterpret_cast<SimulatedBackgroundTask*>(arg)->Run();
76   }
77 
78  private:
79   const void* db_key_;
80   const std::string db_name_;
81   const void* cf_key_;
82   const std::string cf_name_;
83   const ThreadStatus::OperationType operation_type_;
84   const ThreadStatus::StateType state_type_;
85   std::mutex mutex_;
86   std::condition_variable bg_cv_;
87   bool should_run_;
88   std::atomic<int> running_count_;
89 };
90 
91 class ThreadListTest : public testing::Test {
92  public:
ThreadListTest()93   ThreadListTest() {
94   }
95 };
96 
TEST_F(ThreadListTest,GlobalTables)97 TEST_F(ThreadListTest, GlobalTables) {
98   // verify the global tables for operations and states are properly indexed.
99   for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) {
100     ASSERT_EQ(global_operation_table[type].type, type);
101     ASSERT_EQ(global_operation_table[type].name,
102               ThreadStatus::GetOperationName(
103                   ThreadStatus::OperationType(type)));
104   }
105 
106   for (int type = 0; type != ThreadStatus::NUM_STATE_TYPES; ++type) {
107     ASSERT_EQ(global_state_table[type].type, type);
108     ASSERT_EQ(global_state_table[type].name,
109               ThreadStatus::GetStateName(
110                   ThreadStatus::StateType(type)));
111   }
112 
113   for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) {
114     ASSERT_EQ(global_op_stage_table[stage].stage, stage);
115     ASSERT_EQ(global_op_stage_table[stage].name,
116               ThreadStatus::GetOperationStageName(
117                   ThreadStatus::OperationStage(stage)));
118   }
119 }
120 
TEST_F(ThreadListTest,SimpleColumnFamilyInfoTest)121 TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
122   Env* env = Env::Default();
123   const int kHighPriorityThreads = 3;
124   const int kLowPriorityThreads = 5;
125   const int kSimulatedHighPriThreads = kHighPriorityThreads - 1;
126   const int kSimulatedLowPriThreads = kLowPriorityThreads / 3;
127   env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
128   env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
129 
130   SimulatedBackgroundTask running_task(
131       reinterpret_cast<void*>(1234), "running",
132       reinterpret_cast<void*>(5678), "pikachu");
133 
134   for (int test = 0; test < kSimulatedHighPriThreads; ++test) {
135     env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
136         &running_task, Env::Priority::HIGH);
137   }
138   for (int test = 0; test < kSimulatedLowPriThreads; ++test) {
139     env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
140         &running_task, Env::Priority::LOW);
141   }
142   running_task.WaitUntilScheduled(
143       kSimulatedHighPriThreads + kSimulatedLowPriThreads, env);
144 
145   std::vector<ThreadStatus> thread_list;
146 
147   // Verify the number of running threads in each pool.
148   env->GetThreadList(&thread_list);
149   int running_count[ThreadStatus::NUM_THREAD_TYPES] = {0};
150   for (auto thread_status : thread_list) {
151     if (thread_status.cf_name == "pikachu" &&
152         thread_status.db_name == "running") {
153       running_count[thread_status.thread_type]++;
154     }
155   }
156   ASSERT_EQ(
157       running_count[ThreadStatus::HIGH_PRIORITY],
158       kSimulatedHighPriThreads);
159   ASSERT_EQ(
160       running_count[ThreadStatus::LOW_PRIORITY],
161       kSimulatedLowPriThreads);
162   ASSERT_EQ(
163       running_count[ThreadStatus::USER], 0);
164 
165   running_task.FinishAllTasks();
166   running_task.WaitUntilDone();
167 
168   // Verify none of the threads are running
169   env->GetThreadList(&thread_list);
170 
171   for (int i = 0; i < ThreadStatus::NUM_THREAD_TYPES; ++i) {
172     running_count[i] = 0;
173   }
174   for (auto thread_status : thread_list) {
175     if (thread_status.cf_name == "pikachu" &&
176         thread_status.db_name == "running") {
177       running_count[thread_status.thread_type]++;
178     }
179   }
180 
181   ASSERT_EQ(
182       running_count[ThreadStatus::HIGH_PRIORITY], 0);
183   ASSERT_EQ(
184       running_count[ThreadStatus::LOW_PRIORITY], 0);
185   ASSERT_EQ(
186       running_count[ThreadStatus::USER], 0);
187 }
188 
189 namespace {
UpdateStatusCounts(const std::vector<ThreadStatus> & thread_list,int operation_counts[],int state_counts[])190   void UpdateStatusCounts(
191       const std::vector<ThreadStatus>& thread_list,
192       int operation_counts[], int state_counts[]) {
193     for (auto thread_status : thread_list) {
194       operation_counts[thread_status.operation_type]++;
195       state_counts[thread_status.state_type]++;
196     }
197   }
198 
VerifyAndResetCounts(const int correct_counts[],int collected_counts[],int size)199   void VerifyAndResetCounts(
200       const int correct_counts[], int collected_counts[], int size) {
201     for (int i = 0; i < size; ++i) {
202       ASSERT_EQ(collected_counts[i], correct_counts[i]);
203       collected_counts[i] = 0;
204     }
205   }
206 
UpdateCount(int operation_counts[],int from_event,int to_event,int amount)207   void UpdateCount(
208       int operation_counts[], int from_event, int to_event, int amount) {
209     operation_counts[from_event] -= amount;
210     operation_counts[to_event] += amount;
211   }
212 }  // namespace
213 
TEST_F(ThreadListTest,SimpleEventTest)214 TEST_F(ThreadListTest, SimpleEventTest) {
215   Env* env = Env::Default();
216 
217   // simulated tasks
218   const int kFlushWriteTasks = 3;
219   SimulatedBackgroundTask flush_write_task(
220       reinterpret_cast<void*>(1234), "running",
221       reinterpret_cast<void*>(5678), "pikachu",
222       ThreadStatus::OP_FLUSH);
223 
224   const int kCompactionWriteTasks = 4;
225   SimulatedBackgroundTask compaction_write_task(
226       reinterpret_cast<void*>(1234), "running",
227       reinterpret_cast<void*>(5678), "pikachu",
228       ThreadStatus::OP_COMPACTION);
229 
230   const int kCompactionReadTasks = 5;
231   SimulatedBackgroundTask compaction_read_task(
232       reinterpret_cast<void*>(1234), "running",
233       reinterpret_cast<void*>(5678), "pikachu",
234       ThreadStatus::OP_COMPACTION);
235 
236   const int kCompactionWaitTasks = 6;
237   SimulatedBackgroundTask compaction_wait_task(
238       reinterpret_cast<void*>(1234), "running",
239       reinterpret_cast<void*>(5678), "pikachu",
240       ThreadStatus::OP_COMPACTION);
241 
242   // setup right answers
243   int correct_operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
244   correct_operation_counts[ThreadStatus::OP_FLUSH] =
245       kFlushWriteTasks;
246   correct_operation_counts[ThreadStatus::OP_COMPACTION] =
247       kCompactionWriteTasks + kCompactionReadTasks + kCompactionWaitTasks;
248 
249   env->SetBackgroundThreads(
250       correct_operation_counts[ThreadStatus::OP_FLUSH], Env::HIGH);
251   env->SetBackgroundThreads(
252       correct_operation_counts[ThreadStatus::OP_COMPACTION], Env::LOW);
253 
254   // schedule the simulated tasks
255   for (int t = 0; t < kFlushWriteTasks; ++t) {
256     env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
257         &flush_write_task, Env::Priority::HIGH);
258   }
259   flush_write_task.WaitUntilScheduled(kFlushWriteTasks, env);
260 
261   for (int t = 0; t < kCompactionWriteTasks; ++t) {
262     env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
263         &compaction_write_task, Env::Priority::LOW);
264   }
265   compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks, env);
266 
267   for (int t = 0; t < kCompactionReadTasks; ++t) {
268     env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
269         &compaction_read_task, Env::Priority::LOW);
270   }
271   compaction_read_task.WaitUntilScheduled(kCompactionReadTasks, env);
272 
273   for (int t = 0; t < kCompactionWaitTasks; ++t) {
274     env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
275         &compaction_wait_task, Env::Priority::LOW);
276   }
277   compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks, env);
278 
279   // verify the thread-status
280   int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
281   int state_counts[ThreadStatus::NUM_STATE_TYPES] = {0};
282 
283   std::vector<ThreadStatus> thread_list;
284   env->GetThreadList(&thread_list);
285   UpdateStatusCounts(thread_list, operation_counts, state_counts);
286   VerifyAndResetCounts(correct_operation_counts, operation_counts,
287                        ThreadStatus::NUM_OP_TYPES);
288 
289   // terminate compaction-wait tasks and see if the thread-status
290   // reflects this update
291   compaction_wait_task.FinishAllTasks();
292   compaction_wait_task.WaitUntilDone();
293   UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
294               ThreadStatus::OP_UNKNOWN, kCompactionWaitTasks);
295 
296   env->GetThreadList(&thread_list);
297   UpdateStatusCounts(thread_list, operation_counts, state_counts);
298   VerifyAndResetCounts(correct_operation_counts, operation_counts,
299                        ThreadStatus::NUM_OP_TYPES);
300 
301   // terminate flush-write tasks and see if the thread-status
302   // reflects this update
303   flush_write_task.FinishAllTasks();
304   flush_write_task.WaitUntilDone();
305   UpdateCount(correct_operation_counts, ThreadStatus::OP_FLUSH,
306               ThreadStatus::OP_UNKNOWN, kFlushWriteTasks);
307 
308   env->GetThreadList(&thread_list);
309   UpdateStatusCounts(thread_list, operation_counts, state_counts);
310   VerifyAndResetCounts(correct_operation_counts, operation_counts,
311                        ThreadStatus::NUM_OP_TYPES);
312 
313   // terminate compaction-write tasks and see if the thread-status
314   // reflects this update
315   compaction_write_task.FinishAllTasks();
316   compaction_write_task.WaitUntilDone();
317   UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
318               ThreadStatus::OP_UNKNOWN, kCompactionWriteTasks);
319 
320   env->GetThreadList(&thread_list);
321   UpdateStatusCounts(thread_list, operation_counts, state_counts);
322   VerifyAndResetCounts(correct_operation_counts, operation_counts,
323                        ThreadStatus::NUM_OP_TYPES);
324 
325   // terminate compaction-write tasks and see if the thread-status
326   // reflects this update
327   compaction_read_task.FinishAllTasks();
328   compaction_read_task.WaitUntilDone();
329   UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
330               ThreadStatus::OP_UNKNOWN, kCompactionReadTasks);
331 
332   env->GetThreadList(&thread_list);
333   UpdateStatusCounts(thread_list, operation_counts, state_counts);
334   VerifyAndResetCounts(correct_operation_counts, operation_counts,
335                        ThreadStatus::NUM_OP_TYPES);
336 }
337 
338 }  // namespace ROCKSDB_NAMESPACE
339 
main(int argc,char ** argv)340 int main(int argc, char** argv) {
341   ::testing::InitGoogleTest(&argc, argv);
342   return RUN_ALL_TESTS();
343 }
344 
345 #else
346 
main(int argc,char ** argv)347 int main(int argc, char** argv) {
348   ::testing::InitGoogleTest(&argc, argv);
349   return 0;
350 }
351 
352 #endif  // ROCKSDB_USING_THREAD_STATUS
353