1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 8 #include <functional> 9 #include <string> 10 11 #include "port/port.h" 12 #include "rocksdb/env.h" 13 #include "test_util/mock_time_env.h" 14 #include "util/mutexlock.h" 15 16 namespace ROCKSDB_NAMESPACE { 17 18 // Simple wrapper around port::Thread that supports calling a callback every 19 // X seconds. If you pass in 0, then it will call your callback repeatedly 20 // without delay. 21 class RepeatableThread { 22 public: 23 RepeatableThread(std::function<void()> function, 24 const std::string& thread_name, Env* env, uint64_t delay_us, 25 uint64_t initial_delay_us = 0) function_(function)26 : function_(function), 27 thread_name_("rocksdb:" + thread_name), 28 env_(env), 29 delay_us_(delay_us), 30 initial_delay_us_(initial_delay_us), 31 mutex_(env), 32 cond_var_(&mutex_), 33 running_(true), 34 #ifndef NDEBUG 35 waiting_(false), 36 run_count_(0), 37 #endif 38 thread_([this] { thread(); }) { 39 } 40 cancel()41 void cancel() { 42 { 43 InstrumentedMutexLock l(&mutex_); 44 if (!running_) { 45 return; 46 } 47 running_ = false; 48 cond_var_.SignalAll(); 49 } 50 thread_.join(); 51 } 52 IsRunning()53 bool IsRunning() { return running_; } 54 ~RepeatableThread()55 ~RepeatableThread() { cancel(); } 56 57 #ifndef NDEBUG 58 // Wait until RepeatableThread starting waiting, call the optional callback, 59 // then wait for one run of RepeatableThread. Tests can use provide a 60 // custom env object to mock time, and use the callback here to bump current 61 // time and trigger RepeatableThread. See repeatable_thread_test for example. 62 // 63 // Note: only support one caller of this method. 64 void TEST_WaitForRun(std::function<void()> callback = nullptr) { 65 InstrumentedMutexLock l(&mutex_); 66 while (!waiting_) { 67 cond_var_.Wait(); 68 } 69 uint64_t prev_count = run_count_; 70 if (callback != nullptr) { 71 callback(); 72 } 73 cond_var_.SignalAll(); 74 while (!(run_count_ > prev_count)) { 75 cond_var_.Wait(); 76 } 77 } 78 #endif 79 80 private: wait(uint64_t delay)81 bool wait(uint64_t delay) { 82 InstrumentedMutexLock l(&mutex_); 83 if (running_ && delay > 0) { 84 uint64_t wait_until = env_->NowMicros() + delay; 85 #ifndef NDEBUG 86 waiting_ = true; 87 cond_var_.SignalAll(); 88 #endif 89 while (running_) { 90 cond_var_.TimedWait(wait_until); 91 if (env_->NowMicros() >= wait_until) { 92 break; 93 } 94 } 95 #ifndef NDEBUG 96 waiting_ = false; 97 #endif 98 } 99 return running_; 100 } 101 thread()102 void thread() { 103 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) 104 #if __GLIBC_PREREQ(2, 12) 105 // Set thread name. 106 auto thread_handle = thread_.native_handle(); 107 int ret __attribute__((__unused__)) = 108 pthread_setname_np(thread_handle, thread_name_.c_str()); 109 assert(ret == 0); 110 #endif 111 #endif 112 113 assert(delay_us_ > 0); 114 if (!wait(initial_delay_us_)) { 115 return; 116 } 117 do { 118 function_(); 119 #ifndef NDEBUG 120 { 121 InstrumentedMutexLock l(&mutex_); 122 run_count_++; 123 cond_var_.SignalAll(); 124 } 125 #endif 126 } while (wait(delay_us_)); 127 } 128 129 const std::function<void()> function_; 130 const std::string thread_name_; 131 Env* const env_; 132 const uint64_t delay_us_; 133 const uint64_t initial_delay_us_; 134 135 // Mutex lock should be held when accessing running_, waiting_ 136 // and run_count_. 137 InstrumentedMutex mutex_; 138 InstrumentedCondVar cond_var_; 139 bool running_; 140 #ifndef NDEBUG 141 // RepeatableThread waiting for timeout. 142 bool waiting_; 143 // Times function_ had run. 144 uint64_t run_count_; 145 #endif 146 port::Thread thread_; 147 }; 148 149 } // namespace ROCKSDB_NAMESPACE 150