1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 7 #pragma once 8 9 #include <functional> 10 #include <memory> 11 #include <queue> 12 #include <unordered_map> 13 #include <utility> 14 #include <vector> 15 16 #include "monitoring/instrumented_mutex.h" 17 #include "rocksdb/system_clock.h" 18 #include "test_util/sync_point.h" 19 #include "util/mutexlock.h" 20 21 namespace ROCKSDB_NAMESPACE { 22 23 // A Timer class to handle repeated work. 24 // 25 // `Start()` and `Shutdown()` are currently not thread-safe. The client must 26 // serialize calls to these two member functions. 27 // 28 // A single timer instance can handle multiple functions via a single thread. 29 // It is better to leave long running work to a dedicated thread pool. 30 // 31 // Timer can be started by calling `Start()`, and ended by calling `Shutdown()`. 32 // Work (in terms of a `void function`) can be scheduled by calling `Add` with 33 // a unique function name and de-scheduled by calling `Cancel`. 34 // Many functions can be added. 35 // 36 // Impl Details: 37 // A heap is used to keep track of when the next timer goes off. 38 // A map from a function name to the function keeps track of all the functions. 39 class Timer { 40 public: Timer(SystemClock * clock)41 explicit Timer(SystemClock* clock) 42 : clock_(clock), 43 mutex_(clock), 44 cond_var_(&mutex_), 45 running_(false), 46 executing_task_(false) {} 47 ~Timer()48 ~Timer() { Shutdown(); } 49 50 // Add a new function to run. 51 // fn_name has to be identical, otherwise, the new one overrides the existing 52 // one, regardless if the function is pending removed (invalid) or not. 53 // start_after_us is the initial delay. 54 // repeat_every_us is the interval between ending time of the last call and 55 // starting time of the next call. For example, repeat_every_us = 2000 and 56 // the function takes 1000us to run. If it starts at time [now]us, then it 57 // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us. 58 // repeat_every_us == 0 means do not repeat. Add(std::function<void ()> fn,const std::string & fn_name,uint64_t start_after_us,uint64_t repeat_every_us)59 void Add(std::function<void()> fn, 60 const std::string& fn_name, 61 uint64_t start_after_us, 62 uint64_t repeat_every_us) { 63 std::unique_ptr<FunctionInfo> fn_info(new FunctionInfo( 64 std::move(fn), fn_name, clock_->NowMicros() + start_after_us, 65 repeat_every_us)); 66 { 67 InstrumentedMutexLock l(&mutex_); 68 auto it = map_.find(fn_name); 69 if (it == map_.end()) { 70 heap_.push(fn_info.get()); 71 map_.emplace(std::make_pair(fn_name, std::move(fn_info))); 72 } else { 73 // If it already exists, overriding it. 74 it->second->fn = std::move(fn_info->fn); 75 it->second->valid = true; 76 it->second->next_run_time_us = clock_->NowMicros() + start_after_us; 77 it->second->repeat_every_us = repeat_every_us; 78 } 79 } 80 cond_var_.SignalAll(); 81 } 82 Cancel(const std::string & fn_name)83 void Cancel(const std::string& fn_name) { 84 InstrumentedMutexLock l(&mutex_); 85 86 // Mark the function with fn_name as invalid so that it will not be 87 // requeued. 88 auto it = map_.find(fn_name); 89 if (it != map_.end() && it->second) { 90 it->second->Cancel(); 91 } 92 93 // If the currently running function is fn_name, then we need to wait 94 // until it finishes before returning to caller. 95 while (!heap_.empty() && executing_task_) { 96 FunctionInfo* func_info = heap_.top(); 97 assert(func_info); 98 if (func_info->name == fn_name) { 99 WaitForTaskCompleteIfNecessary(); 100 } else { 101 break; 102 } 103 } 104 } 105 CancelAll()106 void CancelAll() { 107 InstrumentedMutexLock l(&mutex_); 108 CancelAllWithLock(); 109 } 110 111 // Start the Timer Start()112 bool Start() { 113 InstrumentedMutexLock l(&mutex_); 114 if (running_) { 115 return false; 116 } 117 118 running_ = true; 119 thread_.reset(new port::Thread(&Timer::Run, this)); 120 return true; 121 } 122 123 // Shutdown the Timer Shutdown()124 bool Shutdown() { 125 { 126 InstrumentedMutexLock l(&mutex_); 127 if (!running_) { 128 return false; 129 } 130 running_ = false; 131 CancelAllWithLock(); 132 cond_var_.SignalAll(); 133 } 134 135 if (thread_) { 136 thread_->join(); 137 } 138 return true; 139 } 140 HasPendingTask()141 bool HasPendingTask() const { 142 InstrumentedMutexLock l(&mutex_); 143 for (auto it = map_.begin(); it != map_.end(); it++) { 144 if (it->second->IsValid()) { 145 return true; 146 } 147 } 148 return false; 149 } 150 151 #ifndef NDEBUG 152 // Wait until Timer starting waiting, call the optional callback, then wait 153 // for Timer waiting again. 154 // Tests can provide a custom Clock object to mock time, and use the callback 155 // here to bump current time and trigger Timer. See timer_test for example. 156 // 157 // Note: only support one caller of this method. 158 void TEST_WaitForRun(std::function<void()> callback = nullptr) { 159 InstrumentedMutexLock l(&mutex_); 160 // It act as a spin lock 161 while (executing_task_ || 162 (!heap_.empty() && 163 heap_.top()->next_run_time_us <= clock_->NowMicros())) { 164 cond_var_.TimedWait(clock_->NowMicros() + 1000); 165 } 166 if (callback != nullptr) { 167 callback(); 168 } 169 cond_var_.SignalAll(); 170 do { 171 cond_var_.TimedWait(clock_->NowMicros() + 1000); 172 } while (executing_task_ || 173 (!heap_.empty() && 174 heap_.top()->next_run_time_us <= clock_->NowMicros())); 175 } 176 TEST_GetPendingTaskNum()177 size_t TEST_GetPendingTaskNum() const { 178 InstrumentedMutexLock l(&mutex_); 179 size_t ret = 0; 180 for (auto it = map_.begin(); it != map_.end(); it++) { 181 if (it->second->IsValid()) { 182 ret++; 183 } 184 } 185 return ret; 186 } 187 #endif // NDEBUG 188 189 private: 190 Run()191 void Run() { 192 InstrumentedMutexLock l(&mutex_); 193 194 while (running_) { 195 if (heap_.empty()) { 196 // wait 197 TEST_SYNC_POINT("Timer::Run::Waiting"); 198 cond_var_.Wait(); 199 continue; 200 } 201 202 FunctionInfo* current_fn = heap_.top(); 203 assert(current_fn); 204 205 if (!current_fn->IsValid()) { 206 heap_.pop(); 207 map_.erase(current_fn->name); 208 continue; 209 } 210 211 if (current_fn->next_run_time_us <= clock_->NowMicros()) { 212 // make a copy of the function so it won't be changed after 213 // mutex_.unlock. 214 std::function<void()> fn = current_fn->fn; 215 executing_task_ = true; 216 mutex_.Unlock(); 217 // Execute the work 218 fn(); 219 mutex_.Lock(); 220 executing_task_ = false; 221 cond_var_.SignalAll(); 222 223 // Remove the work from the heap once it is done executing. 224 // Note that we are just removing the pointer from the heap. Its 225 // memory is still managed in the map (as it holds a unique ptr). 226 // So current_fn is still a valid ptr. 227 heap_.pop(); 228 229 // current_fn may be cancelled already. 230 if (current_fn->IsValid() && current_fn->repeat_every_us > 0) { 231 assert(running_); 232 current_fn->next_run_time_us = 233 clock_->NowMicros() + current_fn->repeat_every_us; 234 235 // Schedule new work into the heap with new time. 236 heap_.push(current_fn); 237 } 238 } else { 239 cond_var_.TimedWait(current_fn->next_run_time_us); 240 } 241 } 242 } 243 CancelAllWithLock()244 void CancelAllWithLock() { 245 mutex_.AssertHeld(); 246 if (map_.empty() && heap_.empty()) { 247 return; 248 } 249 250 // With mutex_ held, set all tasks to invalid so that they will not be 251 // re-queued. 252 for (auto& elem : map_) { 253 auto& func_info = elem.second; 254 assert(func_info); 255 func_info->Cancel(); 256 } 257 258 // WaitForTaskCompleteIfNecessary() may release mutex_ 259 WaitForTaskCompleteIfNecessary(); 260 261 while (!heap_.empty()) { 262 heap_.pop(); 263 } 264 map_.clear(); 265 } 266 267 // A wrapper around std::function to keep track when it should run next 268 // and at what frequency. 269 struct FunctionInfo { 270 // the actual work 271 std::function<void()> fn; 272 // name of the function 273 std::string name; 274 // when the function should run next 275 uint64_t next_run_time_us; 276 // repeat interval 277 uint64_t repeat_every_us; 278 // controls whether this function is valid. 279 // A function is valid upon construction and until someone explicitly 280 // calls `Cancel()`. 281 bool valid; 282 FunctionInfoFunctionInfo283 FunctionInfo(std::function<void()>&& _fn, const std::string& _name, 284 const uint64_t _next_run_time_us, uint64_t _repeat_every_us) 285 : fn(std::move(_fn)), 286 name(_name), 287 next_run_time_us(_next_run_time_us), 288 repeat_every_us(_repeat_every_us), 289 valid(true) {} 290 CancelFunctionInfo291 void Cancel() { 292 valid = false; 293 } 294 IsValidFunctionInfo295 bool IsValid() const { return valid; } 296 }; 297 WaitForTaskCompleteIfNecessary()298 void WaitForTaskCompleteIfNecessary() { 299 mutex_.AssertHeld(); 300 while (executing_task_) { 301 TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting"); 302 cond_var_.Wait(); 303 } 304 } 305 306 struct RunTimeOrder { operatorRunTimeOrder307 bool operator()(const FunctionInfo* f1, 308 const FunctionInfo* f2) { 309 return f1->next_run_time_us > f2->next_run_time_us; 310 } 311 }; 312 313 SystemClock* clock_; 314 // This mutex controls both the heap_ and the map_. It needs to be held for 315 // making any changes in them. 316 mutable InstrumentedMutex mutex_; 317 InstrumentedCondVar cond_var_; 318 std::unique_ptr<port::Thread> thread_; 319 bool running_; 320 bool executing_task_; 321 322 std::priority_queue<FunctionInfo*, 323 std::vector<FunctionInfo*>, 324 RunTimeOrder> heap_; 325 326 // In addition to providing a mapping from a function name to a function, 327 // it is also responsible for memory management. 328 std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_; 329 }; 330 331 } // namespace ROCKSDB_NAMESPACE 332