1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/commands.h"
36 #include "mongo/db/concurrency/d_concurrency.h"
37 #include "mongo/db/concurrency/write_conflict_exception.h"
38 #include "mongo/db/curop.h"
39 #include "mongo/db/namespace_string.h"
40 #include "mongo/db/op_observer.h"
41 #include "mongo/db/repl/noop_writer.h"
42 #include "mongo/db/repl/oplog.h"
43 #include "mongo/db/server_parameters.h"
44 #include "mongo/stdx/functional.h"
45 #include "mongo/util/concurrency/idle_thread_block.h"
46 #include "mongo/util/log.h"
47 
48 namespace mongo {
49 namespace repl {
50 
51 namespace {
52 
53 MONGO_EXPORT_SERVER_PARAMETER(writePeriodicNoops, bool, true);
54 
55 const auto kMsgObj = BSON("msg"
56                           << "periodic noop");
57 
58 }  // namespace
59 
60 
61 /**
62  *  Runs the noopWrite argument with waitTime period until its destroyed.
63  */
64 class NoopWriter::PeriodicNoopRunner {
65     MONGO_DISALLOW_COPYING(PeriodicNoopRunner);
66 
67     using NoopWriteFn = stdx::function<void(OperationContext*)>;
68 
69 public:
PeriodicNoopRunner(Seconds waitTime,NoopWriteFn noopWrite)70     PeriodicNoopRunner(Seconds waitTime, NoopWriteFn noopWrite)
71         : _thread([this, noopWrite, waitTime] { run(waitTime, std::move(noopWrite)); }) {}
72 
~PeriodicNoopRunner()73     ~PeriodicNoopRunner() {
74         stdx::unique_lock<stdx::mutex> lk(_mutex);
75         _inShutdown = true;
76         _cv.notify_all();
77         lk.unlock();
78         _thread.join();
79     }
80 
81 private:
run(Seconds waitTime,NoopWriteFn noopWrite)82     void run(Seconds waitTime, NoopWriteFn noopWrite) {
83         Client::initThread("NoopWriter");
84         while (true) {
85             const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
86             OperationContext& opCtx = *opCtxPtr;
87             {
88                 stdx::unique_lock<stdx::mutex> lk(_mutex);
89                 MONGO_IDLE_THREAD_BLOCK;
90                 _cv.wait_for(lk, waitTime.toSystemDuration(), [&] { return _inShutdown; });
91 
92                 if (_inShutdown)
93                     return;
94             }
95             noopWrite(&opCtx);
96         }
97     }
98 
99     /**
100      *  Indicator that thread is shutting down.
101      */
102     bool _inShutdown{false};
103 
104     /**
105      *  Mutex for the CV
106      */
107     stdx::mutex _mutex;
108 
109     /**
110      * CV to wait for.
111      */
112     stdx::condition_variable _cv;
113 
114     /**
115      * Thread that runs the tasks. Must be last so all other members are initialized before
116      * starting.
117      */
118     stdx::thread _thread;
119 };
120 
NoopWriter(Seconds writeInterval)121 NoopWriter::NoopWriter(Seconds writeInterval) : _writeInterval(writeInterval) {
122     uassert(ErrorCodes::BadValue, "write interval must be positive", writeInterval > Seconds(0));
123 }
124 
~NoopWriter()125 NoopWriter::~NoopWriter() {
126     stopWritingPeriodicNoops();
127 }
128 
startWritingPeriodicNoops(OpTime lastKnownOpTime)129 Status NoopWriter::startWritingPeriodicNoops(OpTime lastKnownOpTime) {
130     stdx::lock_guard<stdx::mutex> lk(_mutex);
131     _lastKnownOpTime = lastKnownOpTime;
132 
133     invariant(!_noopRunner);
134     _noopRunner = stdx::make_unique<PeriodicNoopRunner>(
135         _writeInterval, [this](OperationContext* opCtx) { _writeNoop(opCtx); });
136     return Status::OK();
137 }
138 
stopWritingPeriodicNoops()139 void NoopWriter::stopWritingPeriodicNoops() {
140     stdx::lock_guard<stdx::mutex> lk(_mutex);
141     _noopRunner.reset();
142 }
143 
_writeNoop(OperationContext * opCtx)144 void NoopWriter::_writeNoop(OperationContext* opCtx) {
145     // Use GlobalLock + lockMMAPV1Flush instead of DBLock to allow return when the lock is not
146     // available. It may happen when the primary steps down and a shared global lock is acquired.
147     Lock::GlobalLock lock(opCtx, MODE_IX, 1);
148     if (!lock.isLocked()) {
149         LOG(1) << "Global lock is not available skipping noopWrite";
150         return;
151     }
152     opCtx->lockState()->lockMMAPV1Flush();
153 
154     auto replCoord = ReplicationCoordinator::get(opCtx);
155     // Its a proxy for being a primary
156     if (!replCoord->canAcceptWritesForDatabase(opCtx, "admin")) {
157         LOG(1) << "Not a primary, skipping the noop write";
158         return;
159     }
160 
161     auto lastAppliedOpTime = replCoord->getMyLastAppliedOpTime();
162 
163     // _lastKnownOpTime is not protected by lock as its used only by one thread.
164     if (lastAppliedOpTime != _lastKnownOpTime) {
165         LOG(1) << "Not scheduling a noop write. Last known OpTime: " << _lastKnownOpTime
166                << " != last primary OpTime: " << lastAppliedOpTime;
167     } else {
168         if (writePeriodicNoops.load()) {
169             const auto logLevel = Command::testCommandsEnabled ? 0 : 1;
170             LOG(logLevel)
171                 << "Writing noop to oplog as there has been no writes to this replica set in over "
172                 << _writeInterval;
173             writeConflictRetry(
174                 opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&opCtx] {
175                     WriteUnitOfWork uow(opCtx);
176                     opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage(opCtx,
177                                                                                           kMsgObj);
178                     uow.commit();
179                 });
180         }
181     }
182 
183     _lastKnownOpTime = replCoord->getMyLastAppliedOpTime();
184     LOG(1) << "Set last known op time to " << _lastKnownOpTime;
185 }
186 
187 }  // namespace repl
188 }  // namespace mongo
189