1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 
7 #pragma once
8 
9 #include <functional>
10 #include <memory>
11 #include <queue>
12 #include <unordered_map>
13 #include <utility>
14 #include <vector>
15 
16 #include "monitoring/instrumented_mutex.h"
17 #include "rocksdb/system_clock.h"
18 #include "test_util/sync_point.h"
19 #include "util/mutexlock.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 // A Timer class to handle repeated work.
24 //
25 // `Start()` and `Shutdown()` are currently not thread-safe. The client must
26 // serialize calls to these two member functions.
27 //
28 // A single timer instance can handle multiple functions via a single thread.
29 // It is better to leave long running work to a dedicated thread pool.
30 //
31 // Timer can be started by calling `Start()`, and ended by calling `Shutdown()`.
32 // Work (in terms of a `void function`) can be scheduled by calling `Add` with
33 // a unique function name and de-scheduled by calling `Cancel`.
34 // Many functions can be added.
35 //
36 // Impl Details:
37 // A heap is used to keep track of when the next timer goes off.
38 // A map from a function name to the function keeps track of all the functions.
39 class Timer {
40  public:
Timer(SystemClock * clock)41   explicit Timer(SystemClock* clock)
42       : clock_(clock),
43         mutex_(clock),
44         cond_var_(&mutex_),
45         running_(false),
46         executing_task_(false) {}
47 
~Timer()48   ~Timer() { Shutdown(); }
49 
50   // Add a new function to run.
51   // fn_name has to be identical, otherwise, the new one overrides the existing
52   // one, regardless if the function is pending removed (invalid) or not.
53   // start_after_us is the initial delay.
54   // repeat_every_us is the interval between ending time of the last call and
55   // starting time of the next call. For example, repeat_every_us = 2000 and
56   // the function takes 1000us to run. If it starts at time [now]us, then it
57   // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
58   // repeat_every_us == 0 means do not repeat.
Add(std::function<void ()> fn,const std::string & fn_name,uint64_t start_after_us,uint64_t repeat_every_us)59   void Add(std::function<void()> fn,
60            const std::string& fn_name,
61            uint64_t start_after_us,
62            uint64_t repeat_every_us) {
63     std::unique_ptr<FunctionInfo> fn_info(new FunctionInfo(
64         std::move(fn), fn_name, clock_->NowMicros() + start_after_us,
65         repeat_every_us));
66     {
67       InstrumentedMutexLock l(&mutex_);
68       auto it = map_.find(fn_name);
69       if (it == map_.end()) {
70         heap_.push(fn_info.get());
71         map_.emplace(std::make_pair(fn_name, std::move(fn_info)));
72       } else {
73         // If it already exists, overriding it.
74         it->second->fn = std::move(fn_info->fn);
75         it->second->valid = true;
76         it->second->next_run_time_us = clock_->NowMicros() + start_after_us;
77         it->second->repeat_every_us = repeat_every_us;
78       }
79     }
80     cond_var_.SignalAll();
81   }
82 
Cancel(const std::string & fn_name)83   void Cancel(const std::string& fn_name) {
84     InstrumentedMutexLock l(&mutex_);
85 
86     // Mark the function with fn_name as invalid so that it will not be
87     // requeued.
88     auto it = map_.find(fn_name);
89     if (it != map_.end() && it->second) {
90       it->second->Cancel();
91     }
92 
93     // If the currently running function is fn_name, then we need to wait
94     // until it finishes before returning to caller.
95     while (!heap_.empty() && executing_task_) {
96       FunctionInfo* func_info = heap_.top();
97       assert(func_info);
98       if (func_info->name == fn_name) {
99         WaitForTaskCompleteIfNecessary();
100       } else {
101         break;
102       }
103     }
104   }
105 
CancelAll()106   void CancelAll() {
107     InstrumentedMutexLock l(&mutex_);
108     CancelAllWithLock();
109   }
110 
111   // Start the Timer
Start()112   bool Start() {
113     InstrumentedMutexLock l(&mutex_);
114     if (running_) {
115       return false;
116     }
117 
118     running_ = true;
119     thread_.reset(new port::Thread(&Timer::Run, this));
120     return true;
121   }
122 
123   // Shutdown the Timer
Shutdown()124   bool Shutdown() {
125     {
126       InstrumentedMutexLock l(&mutex_);
127       if (!running_) {
128         return false;
129       }
130       running_ = false;
131       CancelAllWithLock();
132       cond_var_.SignalAll();
133     }
134 
135     if (thread_) {
136       thread_->join();
137     }
138     return true;
139   }
140 
HasPendingTask()141   bool HasPendingTask() const {
142     InstrumentedMutexLock l(&mutex_);
143     for (auto it = map_.begin(); it != map_.end(); it++) {
144       if (it->second->IsValid()) {
145         return true;
146       }
147     }
148     return false;
149   }
150 
151 #ifndef NDEBUG
152   // Wait until Timer starting waiting, call the optional callback, then wait
153   // for Timer waiting again.
154   // Tests can provide a custom Clock object to mock time, and use the callback
155   // here to bump current time and trigger Timer. See timer_test for example.
156   //
157   // Note: only support one caller of this method.
158   void TEST_WaitForRun(std::function<void()> callback = nullptr) {
159     InstrumentedMutexLock l(&mutex_);
160     // It act as a spin lock
161     while (executing_task_ ||
162            (!heap_.empty() &&
163             heap_.top()->next_run_time_us <= clock_->NowMicros())) {
164       cond_var_.TimedWait(clock_->NowMicros() + 1000);
165     }
166     if (callback != nullptr) {
167       callback();
168     }
169     cond_var_.SignalAll();
170     do {
171       cond_var_.TimedWait(clock_->NowMicros() + 1000);
172     } while (executing_task_ ||
173              (!heap_.empty() &&
174               heap_.top()->next_run_time_us <= clock_->NowMicros()));
175   }
176 
TEST_GetPendingTaskNum()177   size_t TEST_GetPendingTaskNum() const {
178     InstrumentedMutexLock l(&mutex_);
179     size_t ret = 0;
180     for (auto it = map_.begin(); it != map_.end(); it++) {
181       if (it->second->IsValid()) {
182         ret++;
183       }
184     }
185     return ret;
186   }
187 #endif  // NDEBUG
188 
189  private:
190 
Run()191   void Run() {
192     InstrumentedMutexLock l(&mutex_);
193 
194     while (running_) {
195       if (heap_.empty()) {
196         // wait
197         TEST_SYNC_POINT("Timer::Run::Waiting");
198         cond_var_.Wait();
199         continue;
200       }
201 
202       FunctionInfo* current_fn = heap_.top();
203       assert(current_fn);
204 
205       if (!current_fn->IsValid()) {
206         heap_.pop();
207         map_.erase(current_fn->name);
208         continue;
209       }
210 
211       if (current_fn->next_run_time_us <= clock_->NowMicros()) {
212         // make a copy of the function so it won't be changed after
213         // mutex_.unlock.
214         std::function<void()> fn = current_fn->fn;
215         executing_task_ = true;
216         mutex_.Unlock();
217         // Execute the work
218         fn();
219         mutex_.Lock();
220         executing_task_ = false;
221         cond_var_.SignalAll();
222 
223         // Remove the work from the heap once it is done executing.
224         // Note that we are just removing the pointer from the heap. Its
225         // memory is still managed in the map (as it holds a unique ptr).
226         // So current_fn is still a valid ptr.
227         heap_.pop();
228 
229         // current_fn may be cancelled already.
230         if (current_fn->IsValid() && current_fn->repeat_every_us > 0) {
231           assert(running_);
232           current_fn->next_run_time_us =
233               clock_->NowMicros() + current_fn->repeat_every_us;
234 
235           // Schedule new work into the heap with new time.
236           heap_.push(current_fn);
237         }
238       } else {
239         cond_var_.TimedWait(current_fn->next_run_time_us);
240       }
241     }
242   }
243 
CancelAllWithLock()244   void CancelAllWithLock() {
245     mutex_.AssertHeld();
246     if (map_.empty() && heap_.empty()) {
247       return;
248     }
249 
250     // With mutex_ held, set all tasks to invalid so that they will not be
251     // re-queued.
252     for (auto& elem : map_) {
253       auto& func_info = elem.second;
254       assert(func_info);
255       func_info->Cancel();
256     }
257 
258     // WaitForTaskCompleteIfNecessary() may release mutex_
259     WaitForTaskCompleteIfNecessary();
260 
261     while (!heap_.empty()) {
262       heap_.pop();
263     }
264     map_.clear();
265   }
266 
267   // A wrapper around std::function to keep track when it should run next
268   // and at what frequency.
269   struct FunctionInfo {
270     // the actual work
271     std::function<void()> fn;
272     // name of the function
273     std::string name;
274     // when the function should run next
275     uint64_t next_run_time_us;
276     // repeat interval
277     uint64_t repeat_every_us;
278     // controls whether this function is valid.
279     // A function is valid upon construction and until someone explicitly
280     // calls `Cancel()`.
281     bool valid;
282 
FunctionInfoFunctionInfo283     FunctionInfo(std::function<void()>&& _fn, const std::string& _name,
284                  const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
285         : fn(std::move(_fn)),
286           name(_name),
287           next_run_time_us(_next_run_time_us),
288           repeat_every_us(_repeat_every_us),
289           valid(true) {}
290 
CancelFunctionInfo291     void Cancel() {
292       valid = false;
293     }
294 
IsValidFunctionInfo295     bool IsValid() const { return valid; }
296   };
297 
WaitForTaskCompleteIfNecessary()298   void WaitForTaskCompleteIfNecessary() {
299     mutex_.AssertHeld();
300     while (executing_task_) {
301       TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting");
302       cond_var_.Wait();
303     }
304   }
305 
306   struct RunTimeOrder {
operatorRunTimeOrder307     bool operator()(const FunctionInfo* f1,
308                     const FunctionInfo* f2) {
309       return f1->next_run_time_us > f2->next_run_time_us;
310     }
311   };
312 
313   SystemClock* clock_;
314   // This mutex controls both the heap_ and the map_. It needs to be held for
315   // making any changes in them.
316   mutable InstrumentedMutex mutex_;
317   InstrumentedCondVar cond_var_;
318   std::unique_ptr<port::Thread> thread_;
319   bool running_;
320   bool executing_task_;
321 
322   std::priority_queue<FunctionInfo*,
323                       std::vector<FunctionInfo*>,
324                       RunTimeOrder> heap_;
325 
326   // In addition to providing a mapping from a function name to a function,
327   // it is also responsible for memory management.
328   std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_;
329 };
330 
331 }  // namespace ROCKSDB_NAMESPACE
332