1 // Copyright (c) 2015-2019 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <scheduler.h>
6 
7 #include <random.h>
8 
9 #include <assert.h>
10 #include <utility>
11 
CScheduler()12 CScheduler::CScheduler()
13 {
14 }
15 
~CScheduler()16 CScheduler::~CScheduler()
17 {
18     assert(nThreadsServicingQueue == 0);
19     if (stopWhenEmpty) assert(taskQueue.empty());
20 }
21 
22 
serviceQueue()23 void CScheduler::serviceQueue()
24 {
25     WAIT_LOCK(newTaskMutex, lock);
26     ++nThreadsServicingQueue;
27 
28     // newTaskMutex is locked throughout this loop EXCEPT
29     // when the thread is waiting or when the user's function
30     // is called.
31     while (!shouldStop()) {
32         try {
33             if (!shouldStop() && taskQueue.empty()) {
34                 REVERSE_LOCK(lock);
35             }
36             while (!shouldStop() && taskQueue.empty()) {
37                 // Wait until there is something to do.
38                 newTaskScheduled.wait(lock);
39             }
40 
41             // Wait until either there is a new task, or until
42             // the time of the first item on the queue:
43 
44             while (!shouldStop() && !taskQueue.empty()) {
45                 std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
46                 if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
47                     break; // Exit loop after timeout, it means we reached the time of the event
48                 }
49             }
50 
51             // If there are multiple threads, the queue can empty while we're waiting (another
52             // thread may service the task we were waiting on).
53             if (shouldStop() || taskQueue.empty())
54                 continue;
55 
56             Function f = taskQueue.begin()->second;
57             taskQueue.erase(taskQueue.begin());
58 
59             {
60                 // Unlock before calling f, so it can reschedule itself or another task
61                 // without deadlocking:
62                 REVERSE_LOCK(lock);
63                 f();
64             }
65         } catch (...) {
66             --nThreadsServicingQueue;
67             throw;
68         }
69     }
70     --nThreadsServicingQueue;
71     newTaskScheduled.notify_one();
72 }
73 
stop(bool drain)74 void CScheduler::stop(bool drain)
75 {
76     {
77         LOCK(newTaskMutex);
78         if (drain)
79             stopWhenEmpty = true;
80         else
81             stopRequested = true;
82     }
83     newTaskScheduled.notify_all();
84 }
85 
schedule(CScheduler::Function f,std::chrono::system_clock::time_point t)86 void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
87 {
88     {
89         LOCK(newTaskMutex);
90         taskQueue.insert(std::make_pair(t, f));
91     }
92     newTaskScheduled.notify_one();
93 }
94 
MockForward(std::chrono::seconds delta_seconds)95 void CScheduler::MockForward(std::chrono::seconds delta_seconds)
96 {
97     assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
98 
99     {
100         LOCK(newTaskMutex);
101 
102         // use temp_queue to maintain updated schedule
103         std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
104 
105         for (const auto& element : taskQueue) {
106             temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
107         }
108 
109         // point taskQueue to temp_queue
110         taskQueue = std::move(temp_queue);
111     }
112 
113     // notify that the taskQueue needs to be processed
114     newTaskScheduled.notify_one();
115 }
116 
Repeat(CScheduler & s,CScheduler::Function f,std::chrono::milliseconds delta)117 static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
118 {
119     f();
120     s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
121 }
122 
scheduleEvery(CScheduler::Function f,std::chrono::milliseconds delta)123 void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
124 {
125     scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
126 }
127 
getQueueInfo(std::chrono::system_clock::time_point & first,std::chrono::system_clock::time_point & last) const128 size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
129                              std::chrono::system_clock::time_point &last) const
130 {
131     LOCK(newTaskMutex);
132     size_t result = taskQueue.size();
133     if (!taskQueue.empty()) {
134         first = taskQueue.begin()->first;
135         last = taskQueue.rbegin()->first;
136     }
137     return result;
138 }
139 
AreThreadsServicingQueue() const140 bool CScheduler::AreThreadsServicingQueue() const {
141     LOCK(newTaskMutex);
142     return nThreadsServicingQueue;
143 }
144 
145 
MaybeScheduleProcessQueue()146 void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
147     {
148         LOCK(m_cs_callbacks_pending);
149         // Try to avoid scheduling too many copies here, but if we
150         // accidentally have two ProcessQueue's scheduled at once its
151         // not a big deal.
152         if (m_are_callbacks_running) return;
153         if (m_callbacks_pending.empty()) return;
154     }
155     m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
156 }
157 
ProcessQueue()158 void SingleThreadedSchedulerClient::ProcessQueue() {
159     std::function<void ()> callback;
160     {
161         LOCK(m_cs_callbacks_pending);
162         if (m_are_callbacks_running) return;
163         if (m_callbacks_pending.empty()) return;
164         m_are_callbacks_running = true;
165 
166         callback = std::move(m_callbacks_pending.front());
167         m_callbacks_pending.pop_front();
168     }
169 
170     // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
171     // to ensure both happen safely even if callback() throws.
172     struct RAIICallbacksRunning {
173         SingleThreadedSchedulerClient* instance;
174         explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
175         ~RAIICallbacksRunning() {
176             {
177                 LOCK(instance->m_cs_callbacks_pending);
178                 instance->m_are_callbacks_running = false;
179             }
180             instance->MaybeScheduleProcessQueue();
181         }
182     } raiicallbacksrunning(this);
183 
184     callback();
185 }
186 
AddToProcessQueue(std::function<void ()> func)187 void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void ()> func) {
188     assert(m_pscheduler);
189 
190     {
191         LOCK(m_cs_callbacks_pending);
192         m_callbacks_pending.emplace_back(std::move(func));
193     }
194     MaybeScheduleProcessQueue();
195 }
196 
EmptyQueue()197 void SingleThreadedSchedulerClient::EmptyQueue() {
198     assert(!m_pscheduler->AreThreadsServicingQueue());
199     bool should_continue = true;
200     while (should_continue) {
201         ProcessQueue();
202         LOCK(m_cs_callbacks_pending);
203         should_continue = !m_callbacks_pending.empty();
204     }
205 }
206 
CallbacksPending()207 size_t SingleThreadedSchedulerClient::CallbacksPending() {
208     LOCK(m_cs_callbacks_pending);
209     return m_callbacks_pending.size();
210 }
211