1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/commands.h"
36 #include "mongo/db/concurrency/d_concurrency.h"
37 #include "mongo/db/concurrency/write_conflict_exception.h"
38 #include "mongo/db/curop.h"
39 #include "mongo/db/namespace_string.h"
40 #include "mongo/db/op_observer.h"
41 #include "mongo/db/repl/noop_writer.h"
42 #include "mongo/db/repl/oplog.h"
43 #include "mongo/db/server_parameters.h"
44 #include "mongo/stdx/functional.h"
45 #include "mongo/util/concurrency/idle_thread_block.h"
46 #include "mongo/util/log.h"
47
48 namespace mongo {
49 namespace repl {
50
51 namespace {
52
53 MONGO_EXPORT_SERVER_PARAMETER(writePeriodicNoops, bool, true);
54
55 const auto kMsgObj = BSON("msg"
56 << "periodic noop");
57
58 } // namespace
59
60
61 /**
62 * Runs the noopWrite argument with waitTime period until its destroyed.
63 */
64 class NoopWriter::PeriodicNoopRunner {
65 MONGO_DISALLOW_COPYING(PeriodicNoopRunner);
66
67 using NoopWriteFn = stdx::function<void(OperationContext*)>;
68
69 public:
PeriodicNoopRunner(Seconds waitTime,NoopWriteFn noopWrite)70 PeriodicNoopRunner(Seconds waitTime, NoopWriteFn noopWrite)
71 : _thread([this, noopWrite, waitTime] { run(waitTime, std::move(noopWrite)); }) {}
72
~PeriodicNoopRunner()73 ~PeriodicNoopRunner() {
74 stdx::unique_lock<stdx::mutex> lk(_mutex);
75 _inShutdown = true;
76 _cv.notify_all();
77 lk.unlock();
78 _thread.join();
79 }
80
81 private:
run(Seconds waitTime,NoopWriteFn noopWrite)82 void run(Seconds waitTime, NoopWriteFn noopWrite) {
83 Client::initThread("NoopWriter");
84 while (true) {
85 const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
86 OperationContext& opCtx = *opCtxPtr;
87 {
88 stdx::unique_lock<stdx::mutex> lk(_mutex);
89 MONGO_IDLE_THREAD_BLOCK;
90 _cv.wait_for(lk, waitTime.toSystemDuration(), [&] { return _inShutdown; });
91
92 if (_inShutdown)
93 return;
94 }
95 noopWrite(&opCtx);
96 }
97 }
98
99 /**
100 * Indicator that thread is shutting down.
101 */
102 bool _inShutdown{false};
103
104 /**
105 * Mutex for the CV
106 */
107 stdx::mutex _mutex;
108
109 /**
110 * CV to wait for.
111 */
112 stdx::condition_variable _cv;
113
114 /**
115 * Thread that runs the tasks. Must be last so all other members are initialized before
116 * starting.
117 */
118 stdx::thread _thread;
119 };
120
NoopWriter(Seconds writeInterval)121 NoopWriter::NoopWriter(Seconds writeInterval) : _writeInterval(writeInterval) {
122 uassert(ErrorCodes::BadValue, "write interval must be positive", writeInterval > Seconds(0));
123 }
124
~NoopWriter()125 NoopWriter::~NoopWriter() {
126 stopWritingPeriodicNoops();
127 }
128
startWritingPeriodicNoops(OpTime lastKnownOpTime)129 Status NoopWriter::startWritingPeriodicNoops(OpTime lastKnownOpTime) {
130 stdx::lock_guard<stdx::mutex> lk(_mutex);
131 _lastKnownOpTime = lastKnownOpTime;
132
133 invariant(!_noopRunner);
134 _noopRunner = stdx::make_unique<PeriodicNoopRunner>(
135 _writeInterval, [this](OperationContext* opCtx) { _writeNoop(opCtx); });
136 return Status::OK();
137 }
138
stopWritingPeriodicNoops()139 void NoopWriter::stopWritingPeriodicNoops() {
140 stdx::lock_guard<stdx::mutex> lk(_mutex);
141 _noopRunner.reset();
142 }
143
_writeNoop(OperationContext * opCtx)144 void NoopWriter::_writeNoop(OperationContext* opCtx) {
145 // Use GlobalLock + lockMMAPV1Flush instead of DBLock to allow return when the lock is not
146 // available. It may happen when the primary steps down and a shared global lock is acquired.
147 Lock::GlobalLock lock(opCtx, MODE_IX, 1);
148 if (!lock.isLocked()) {
149 LOG(1) << "Global lock is not available skipping noopWrite";
150 return;
151 }
152 opCtx->lockState()->lockMMAPV1Flush();
153
154 auto replCoord = ReplicationCoordinator::get(opCtx);
155 // Its a proxy for being a primary
156 if (!replCoord->canAcceptWritesForDatabase(opCtx, "admin")) {
157 LOG(1) << "Not a primary, skipping the noop write";
158 return;
159 }
160
161 auto lastAppliedOpTime = replCoord->getMyLastAppliedOpTime();
162
163 // _lastKnownOpTime is not protected by lock as its used only by one thread.
164 if (lastAppliedOpTime != _lastKnownOpTime) {
165 LOG(1) << "Not scheduling a noop write. Last known OpTime: " << _lastKnownOpTime
166 << " != last primary OpTime: " << lastAppliedOpTime;
167 } else {
168 if (writePeriodicNoops.load()) {
169 const auto logLevel = Command::testCommandsEnabled ? 0 : 1;
170 LOG(logLevel)
171 << "Writing noop to oplog as there has been no writes to this replica set in over "
172 << _writeInterval;
173 writeConflictRetry(
174 opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&opCtx] {
175 WriteUnitOfWork uow(opCtx);
176 opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage(opCtx,
177 kMsgObj);
178 uow.commit();
179 });
180 }
181 }
182
183 _lastKnownOpTime = replCoord->getMyLastAppliedOpTime();
184 LOG(1) << "Set last known op time to " << _lastKnownOpTime;
185 }
186
187 } // namespace repl
188 } // namespace mongo
189