1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #include <functional>
9 #include <string>
10 
11 #include "port/port.h"
12 #include "rocksdb/env.h"
13 #include "test_util/mock_time_env.h"
14 #include "util/mutexlock.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
18 // Simple wrapper around port::Thread that supports calling a callback every
19 // X seconds. If you pass in 0, then it will call your callback repeatedly
20 // without delay.
21 class RepeatableThread {
22  public:
23   RepeatableThread(std::function<void()> function,
24                    const std::string& thread_name, Env* env, uint64_t delay_us,
25                    uint64_t initial_delay_us = 0)
function_(function)26       : function_(function),
27         thread_name_("rocksdb:" + thread_name),
28         env_(env),
29         delay_us_(delay_us),
30         initial_delay_us_(initial_delay_us),
31         mutex_(env),
32         cond_var_(&mutex_),
33         running_(true),
34 #ifndef NDEBUG
35         waiting_(false),
36         run_count_(0),
37 #endif
38         thread_([this] { thread(); }) {
39   }
40 
cancel()41   void cancel() {
42     {
43       InstrumentedMutexLock l(&mutex_);
44       if (!running_) {
45         return;
46       }
47       running_ = false;
48       cond_var_.SignalAll();
49     }
50     thread_.join();
51   }
52 
IsRunning()53   bool IsRunning() { return running_; }
54 
~RepeatableThread()55   ~RepeatableThread() { cancel(); }
56 
57 #ifndef NDEBUG
58   // Wait until RepeatableThread starting waiting, call the optional callback,
59   // then wait for one run of RepeatableThread. Tests can use provide a
60   // custom env object to mock time, and use the callback here to bump current
61   // time and trigger RepeatableThread. See repeatable_thread_test for example.
62   //
63   // Note: only support one caller of this method.
64   void TEST_WaitForRun(std::function<void()> callback = nullptr) {
65     InstrumentedMutexLock l(&mutex_);
66     while (!waiting_) {
67       cond_var_.Wait();
68     }
69     uint64_t prev_count = run_count_;
70     if (callback != nullptr) {
71       callback();
72     }
73     cond_var_.SignalAll();
74     while (!(run_count_ > prev_count)) {
75       cond_var_.Wait();
76     }
77   }
78 #endif
79 
80  private:
wait(uint64_t delay)81   bool wait(uint64_t delay) {
82     InstrumentedMutexLock l(&mutex_);
83     if (running_ && delay > 0) {
84       uint64_t wait_until = env_->NowMicros() + delay;
85 #ifndef NDEBUG
86       waiting_ = true;
87       cond_var_.SignalAll();
88 #endif
89       while (running_) {
90         cond_var_.TimedWait(wait_until);
91         if (env_->NowMicros() >= wait_until) {
92           break;
93         }
94       }
95 #ifndef NDEBUG
96       waiting_ = false;
97 #endif
98     }
99     return running_;
100   }
101 
thread()102   void thread() {
103 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
104 #if __GLIBC_PREREQ(2, 12)
105     // Set thread name.
106     auto thread_handle = thread_.native_handle();
107     int ret __attribute__((__unused__)) =
108         pthread_setname_np(thread_handle, thread_name_.c_str());
109     assert(ret == 0);
110 #endif
111 #endif
112 
113     assert(delay_us_ > 0);
114     if (!wait(initial_delay_us_)) {
115       return;
116     }
117     do {
118       function_();
119 #ifndef NDEBUG
120       {
121         InstrumentedMutexLock l(&mutex_);
122         run_count_++;
123         cond_var_.SignalAll();
124       }
125 #endif
126     } while (wait(delay_us_));
127   }
128 
129   const std::function<void()> function_;
130   const std::string thread_name_;
131   Env* const env_;
132   const uint64_t delay_us_;
133   const uint64_t initial_delay_us_;
134 
135   // Mutex lock should be held when accessing running_, waiting_
136   // and run_count_.
137   InstrumentedMutex mutex_;
138   InstrumentedCondVar cond_var_;
139   bool running_;
140 #ifndef NDEBUG
141   // RepeatableThread waiting for timeout.
142   bool waiting_;
143   // Times function_ had run.
144   uint64_t run_count_;
145 #endif
146   port::Thread thread_;
147 };
148 
149 }  // namespace ROCKSDB_NAMESPACE
150