1 // @file background.cpp
2 
3 
4 /**
5  *    Copyright (C) 2018-present MongoDB, Inc.
6  *
7  *    This program is free software: you can redistribute it and/or modify
8  *    it under the terms of the Server Side Public License, version 1,
9  *    as published by MongoDB, Inc.
10  *
11  *    This program is distributed in the hope that it will be useful,
12  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *    Server Side Public License for more details.
15  *
16  *    You should have received a copy of the Server Side Public License
17  *    along with this program. If not, see
18  *    <http://www.mongodb.com/licensing/server-side-public-license>.
19  *
20  *    As a special exception, the copyright holders give permission to link the
21  *    code of portions of this program with the OpenSSL library under certain
22  *    conditions as described in each individual source file and distribute
23  *    linked combinations including the program with the OpenSSL library. You
24  *    must comply with the Server Side Public License in all respects for
25  *    all of the code used other than as permitted herein. If you modify file(s)
26  *    with this exception, you may extend this exception to your version of the
27  *    file(s), but you are not obligated to do so. If you do not wish to do so,
28  *    delete this exception statement from your version. If you delete this
29  *    exception statement from all source files in the program, then also delete
30  *    it in the license file.
31  */
32 
33 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
34 
35 #include "mongo/platform/basic.h"
36 
37 #include "mongo/util/background.h"
38 
39 #include "mongo/config.h"
40 #include "mongo/stdx/condition_variable.h"
41 #include "mongo/stdx/functional.h"
42 #include "mongo/stdx/mutex.h"
43 #include "mongo/stdx/thread.h"
44 #include "mongo/util/concurrency/idle_thread_block.h"
45 #include "mongo/util/concurrency/mutex.h"
46 #include "mongo/util/concurrency/spin_lock.h"
47 #include "mongo/util/concurrency/thread_name.h"
48 #include "mongo/util/debug_util.h"
49 #include "mongo/util/log.h"
50 #include "mongo/util/mongoutils/str.h"
51 #include "mongo/util/timer.h"
52 
53 using namespace std;
54 namespace mongo {
55 
56 namespace {
57 
58 class PeriodicTaskRunner : public BackgroundJob {
59 public:
PeriodicTaskRunner()60     PeriodicTaskRunner() : _shutdownRequested(false) {}
61 
62     void add(PeriodicTask* task);
63     void remove(PeriodicTask* task);
64 
65     Status stop(int gracePeriodMillis);
66 
67 private:
name() const68     virtual std::string name() const {
69         return "PeriodicTaskRunner";
70     }
71 
72     virtual void run();
73 
74     // Returns true if shutdown has been requested.  You must hold _mutex to call this
75     // function.
76     bool _isShutdownRequested() const;
77 
78     // Runs all registered tasks. You must hold _mutex to call this function.
79     void _runTasks();
80 
81     // Runs one task to completion, and optionally reports timing. You must hold _mutex
82     // to call this function.
83     void _runTask(PeriodicTask* task);
84 
85     // _mutex protects the _shutdownRequested flag and the _tasks vector.
86     stdx::mutex _mutex;
87 
88     // The condition variable is used to sleep for the interval between task
89     // executions, and is notified when the _shutdownRequested flag is toggled.
90     stdx::condition_variable _cond;
91 
92     // Used to break the loop. You should notify _cond after changing this to true
93     // so that shutdown proceeds promptly.
94     bool _shutdownRequested;
95 
96     // The PeriodicTasks contained in this vector are NOT owned by the
97     // PeriodicTaskRunner, and are not deleted. The vector never shrinks, removed Tasks
98     // have their entry overwritten with NULL.
99     std::vector<PeriodicTask*> _tasks;
100 };
101 
runnerMutex()102 SimpleMutex* runnerMutex() {
103     static SimpleMutex mutex;
104     return &mutex;
105 }
106 
107 // A scoped lock like object that only locks/unlocks the mutex if it exists.
108 class ConditionalScopedLock {
109 public:
ConditionalScopedLock(SimpleMutex * mutex)110     ConditionalScopedLock(SimpleMutex* mutex) : _mutex(mutex) {
111         if (_mutex)
112             _mutex->lock();
113     }
~ConditionalScopedLock()114     ~ConditionalScopedLock() {
115         if (_mutex)
116             _mutex->unlock();
117     }
118 
119 private:
120     SimpleMutex* const _mutex;
121 };
122 
123 // The unique PeriodicTaskRunner, also zero-initialized.
124 PeriodicTaskRunner* runner = nullptr;
125 
126 // The runner is never re-created once it has been destroyed.
127 bool runnerDestroyed = false;
128 
129 }  // namespace
130 
131 // both the BackgroundJob and the internal thread point to JobStatus
132 struct BackgroundJob::JobStatus {
JobStatusmongo::BackgroundJob::JobStatus133     JobStatus() : state(NotStarted) {}
134 
135     stdx::mutex mutex;
136     stdx::condition_variable done;
137     State state;
138 };
139 
BackgroundJob(bool selfDelete)140 BackgroundJob::BackgroundJob(bool selfDelete) : _selfDelete(selfDelete), _status(new JobStatus) {}
141 
~BackgroundJob()142 BackgroundJob::~BackgroundJob() {}
143 
jobBody()144 void BackgroundJob::jobBody() {
145     const string threadName = name();
146     if (!threadName.empty()) {
147         setThreadName(threadName);
148     }
149 
150     LOG(1) << "BackgroundJob starting: " << threadName;
151 
152     try {
153         run();
154     } catch (const std::exception& e) {
155         error() << "backgroundjob " << threadName << " exception: " << redact(e.what());
156         throw;
157     }
158 
159     // We must cache this value so that we can use it after we leave the following scope.
160     const bool selfDelete = _selfDelete;
161 
162     {
163         // It is illegal to access any state owned by this BackgroundJob after leaving this
164         // scope, with the exception of the call to 'delete this' below.
165         stdx::unique_lock<stdx::mutex> l(_status->mutex);
166         _status->state = Done;
167         _status->done.notify_all();
168     }
169 
170     if (selfDelete)
171         delete this;
172 }
173 
go()174 void BackgroundJob::go() {
175     stdx::unique_lock<stdx::mutex> l(_status->mutex);
176     massert(17234,
177             mongoutils::str::stream() << "backgroundJob already running: " << name(),
178             _status->state != Running);
179 
180     // If the job is already 'done', for instance because it was cancelled or already
181     // finished, ignore additional requests to run the job.
182     if (_status->state == NotStarted) {
183         stdx::thread t(stdx::bind(&BackgroundJob::jobBody, this));
184         t.detach();
185         _status->state = Running;
186     }
187 }
188 
cancel()189 Status BackgroundJob::cancel() {
190     stdx::unique_lock<stdx::mutex> l(_status->mutex);
191 
192     if (_status->state == Running)
193         return Status(ErrorCodes::IllegalOperation, "Cannot cancel a running BackgroundJob");
194 
195     if (_status->state == NotStarted) {
196         _status->state = Done;
197         _status->done.notify_all();
198     }
199 
200     return Status::OK();
201 }
202 
wait(unsigned msTimeOut)203 bool BackgroundJob::wait(unsigned msTimeOut) {
204     verify(!_selfDelete);  // you cannot call wait on a self-deleting job
205     const auto deadline = Date_t::now() + Milliseconds(msTimeOut);
206     stdx::unique_lock<stdx::mutex> l(_status->mutex);
207     while (_status->state != Done) {
208         if (msTimeOut) {
209             if (stdx::cv_status::timeout ==
210                 _status->done.wait_until(l, deadline.toSystemTimePoint()))
211                 return false;
212         } else {
213             _status->done.wait(l);
214         }
215     }
216     return true;
217 }
218 
getState() const219 BackgroundJob::State BackgroundJob::getState() const {
220     stdx::unique_lock<stdx::mutex> l(_status->mutex);
221     return _status->state;
222 }
223 
running() const224 bool BackgroundJob::running() const {
225     stdx::unique_lock<stdx::mutex> l(_status->mutex);
226     return _status->state == Running;
227 }
228 
229 // -------------------------
230 
PeriodicTask()231 PeriodicTask::PeriodicTask() {
232     ConditionalScopedLock lock(runnerMutex());
233     if (runnerDestroyed)
234         return;
235 
236     if (!runner)
237         runner = new PeriodicTaskRunner;
238 
239     runner->add(this);
240 }
241 
~PeriodicTask()242 PeriodicTask::~PeriodicTask() {
243     ConditionalScopedLock lock(runnerMutex());
244     if (runnerDestroyed || !runner)
245         return;
246 
247     runner->remove(this);
248 }
249 
startRunningPeriodicTasks()250 void PeriodicTask::startRunningPeriodicTasks() {
251     ConditionalScopedLock lock(runnerMutex());
252     if (runnerDestroyed)
253         return;
254 
255     if (!runner)
256         runner = new PeriodicTaskRunner;
257 
258     runner->go();
259 }
260 
stopRunningPeriodicTasks(int gracePeriodMillis)261 Status PeriodicTask::stopRunningPeriodicTasks(int gracePeriodMillis) {
262     ConditionalScopedLock lock(runnerMutex());
263 
264     Status status = Status::OK();
265     if (runnerDestroyed || !runner)
266         return status;
267 
268     runner->cancel().transitional_ignore();
269     status = runner->stop(gracePeriodMillis);
270 
271     if (status.isOK()) {
272         delete runner;
273         runnerDestroyed = true;
274     }
275 
276     return status;
277 }
278 
add(PeriodicTask * task)279 void PeriodicTaskRunner::add(PeriodicTask* task) {
280     stdx::lock_guard<stdx::mutex> lock(_mutex);
281     _tasks.push_back(task);
282 }
283 
remove(PeriodicTask * task)284 void PeriodicTaskRunner::remove(PeriodicTask* task) {
285     stdx::lock_guard<stdx::mutex> lock(_mutex);
286     for (size_t i = 0; i != _tasks.size(); i++) {
287         if (_tasks[i] == task) {
288             _tasks[i] = NULL;
289             break;
290         }
291     }
292 }
293 
stop(int gracePeriodMillis)294 Status PeriodicTaskRunner::stop(int gracePeriodMillis) {
295     {
296         stdx::lock_guard<stdx::mutex> lock(_mutex);
297         _shutdownRequested = true;
298         _cond.notify_one();
299     }
300 
301     if (!wait(gracePeriodMillis)) {
302         return Status(ErrorCodes::ExceededTimeLimit,
303                       "Grace period expired while waiting for PeriodicTasks to terminate");
304     }
305     return Status::OK();
306 }
307 
run()308 void PeriodicTaskRunner::run() {
309     // Use a shorter cycle time in debug mode to help catch race conditions.
310     const Seconds waitTime(kDebugBuild ? 5 : 60);
311 
312     stdx::unique_lock<stdx::mutex> lock(_mutex);
313     while (!_shutdownRequested) {
314         {
315             MONGO_IDLE_THREAD_BLOCK;
316             if (stdx::cv_status::timeout != _cond.wait_for(lock, waitTime.toSystemDuration()))
317                 continue;
318         }
319         _runTasks();
320     }
321 }
322 
_isShutdownRequested() const323 bool PeriodicTaskRunner::_isShutdownRequested() const {
324     return _shutdownRequested;
325 }
326 
_runTasks()327 void PeriodicTaskRunner::_runTasks() {
328     const size_t size = _tasks.size();
329     for (size_t i = 0; i != size; ++i)
330         if (PeriodicTask* const task = _tasks[i])
331             _runTask(task);
332 }
333 
_runTask(PeriodicTask * const task)334 void PeriodicTaskRunner::_runTask(PeriodicTask* const task) {
335     Timer timer;
336 
337     const std::string taskName = task->taskName();
338 
339     try {
340         task->taskDoWork();
341     } catch (const std::exception& e) {
342         error() << "task: " << taskName << " failed: " << redact(e.what());
343     } catch (...) {
344         error() << "task: " << taskName << " failed with unknown error";
345     }
346 
347     const int ms = timer.millis();
348     const int kMinLogMs = 100;
349     LOG(ms <= kMinLogMs ? 3 : 0) << "task: " << taskName << " took: " << ms << "ms";
350 }
351 
352 }  // namespace mongo
353