1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/repl/replication_coordinator_impl.h"
36 
37 #include <algorithm>
38 #include <limits>
39 
40 #include "mongo/base/status.h"
41 #include "mongo/client/fetcher.h"
42 #include "mongo/db/audit.h"
43 #include "mongo/db/client.h"
44 #include "mongo/db/commands.h"
45 #include "mongo/db/concurrency/d_concurrency.h"
46 #include "mongo/db/index/index_descriptor.h"
47 #include "mongo/db/logical_clock.h"
48 #include "mongo/db/logical_time.h"
49 #include "mongo/db/logical_time_validator.h"
50 #include "mongo/db/operation_context_noop.h"
51 #include "mongo/db/repl/always_allow_non_local_writes.h"
52 #include "mongo/db/repl/check_quorum_for_config_change.h"
53 #include "mongo/db/repl/data_replicator_external_state_initial_sync.h"
54 #include "mongo/db/repl/elect_cmd_runner.h"
55 #include "mongo/db/repl/freshness_checker.h"
56 #include "mongo/db/repl/handshake_args.h"
57 #include "mongo/db/repl/is_master_response.h"
58 #include "mongo/db/repl/last_vote.h"
59 #include "mongo/db/repl/member_data.h"
60 #include "mongo/db/repl/read_concern_args.h"
61 #include "mongo/db/repl/repl_client_info.h"
62 #include "mongo/db/repl/repl_set_config_checks.h"
63 #include "mongo/db/repl/repl_set_heartbeat_args.h"
64 #include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
65 #include "mongo/db/repl/repl_set_heartbeat_response.h"
66 #include "mongo/db/repl/repl_set_html_summary.h"
67 #include "mongo/db/repl/repl_set_request_votes_args.h"
68 #include "mongo/db/repl/repl_settings.h"
69 #include "mongo/db/repl/replication_process.h"
70 #include "mongo/db/repl/rslog.h"
71 #include "mongo/db/repl/storage_interface.h"
72 #include "mongo/db/repl/topology_coordinator.h"
73 #include "mongo/db/repl/update_position_args.h"
74 #include "mongo/db/repl/vote_requester.h"
75 #include "mongo/db/server_options.h"
76 #include "mongo/db/server_parameters.h"
77 #include "mongo/db/write_concern.h"
78 #include "mongo/db/write_concern_options.h"
79 #include "mongo/executor/connection_pool_stats.h"
80 #include "mongo/executor/network_interface.h"
81 #include "mongo/rpc/metadata/oplog_query_metadata.h"
82 #include "mongo/rpc/metadata/repl_set_metadata.h"
83 #include "mongo/stdx/functional.h"
84 #include "mongo/stdx/mutex.h"
85 #include "mongo/util/assert_util.h"
86 #include "mongo/util/fail_point_service.h"
87 #include "mongo/util/log.h"
88 #include "mongo/util/scopeguard.h"
89 #include "mongo/util/stacktrace.h"
90 #include "mongo/util/time_support.h"
91 #include "mongo/util/timer.h"
92 
93 namespace mongo {
94 namespace repl {
95 
96 MONGO_FP_DECLARE(stepdownHangBeforePerformingPostMemberStateUpdateActions);
97 MONGO_FP_DECLARE(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock);
98 
99 using CallbackArgs = executor::TaskExecutor::CallbackArgs;
100 using CallbackFn = executor::TaskExecutor::CallbackFn;
101 using CallbackHandle = executor::TaskExecutor::CallbackHandle;
102 using EventHandle = executor::TaskExecutor::EventHandle;
103 using executor::NetworkInterface;
104 using NextAction = Fetcher::NextAction;
105 
106 namespace {
107 
108 const char kLocalDB[] = "local";
109 
110 MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncAttempts, int, 10);
111 
112 MONGO_EXPORT_SERVER_PARAMETER(enableElectionHandoff, bool, true);
113 
114 // Number of seconds between noop writer writes.
115 MONGO_EXPORT_STARTUP_SERVER_PARAMETER(periodicNoopIntervalSecs, int, 10);
116 
MONGO_INITIALIZER(periodicNoopIntervalSecs)117 MONGO_INITIALIZER(periodicNoopIntervalSecs)(InitializerContext*) {
118     if (periodicNoopIntervalSecs <= 0) {
119         return Status(ErrorCodes::BadValue,
120                       str::stream() << "Periodic noop interval must be greater than 0 seconds: "
121                                     << periodicNoopIntervalSecs);
122     } else if (periodicNoopIntervalSecs > 10) {
123         return Status(ErrorCodes::BadValue,
124                       str::stream()
125                           << "Periodic noop interval must be less than or equal to 10 seconds: "
126                           << periodicNoopIntervalSecs);
127     }
128     return Status::OK();
129 }
130 
lockAndCall(stdx::unique_lock<stdx::mutex> * lk,const stdx::function<void ()> & fn)131 void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const stdx::function<void()>& fn) {
132     if (!lk->owns_lock()) {
133         lk->lock();
134     }
135     fn();
136 }
137 
138 /**
139  * Implements the force-reconfig behavior of incrementing config version by a large random
140  * number.
141  */
incrementConfigVersionByRandom(BSONObj config)142 BSONObj incrementConfigVersionByRandom(BSONObj config) {
143     BSONObjBuilder builder;
144     for (BSONObjIterator iter(config); iter.more(); iter.next()) {
145         BSONElement elem = *iter;
146         if (elem.fieldNameStringData() == ReplSetConfig::kVersionFieldName && elem.isNumber()) {
147             std::unique_ptr<SecureRandom> generator(SecureRandom::create());
148             const int random = std::abs(static_cast<int>(generator->nextInt64()) % 100000);
149             builder.appendIntOrLL(ReplSetConfig::kVersionFieldName,
150                                   elem.numberLong() + 10000 + random);
151         } else {
152             builder.append(elem);
153         }
154     }
155     return builder.obj();
156 }
157 
158 // This is a special flag that allows for testing of snapshot behavior by skipping the replication
159 // related checks and isolating the storage/query side of snapshotting.
160 // SERVER-31304 rename this parameter to something more appropriate.
161 bool testingSnapshotBehaviorInIsolation = false;
162 ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation(
163     ServerParameterSet::getGlobal(),
164     "testingSnapshotBehaviorInIsolation",
165     &testingSnapshotBehaviorInIsolation);
166 
167 }  // namespace
168 
Waiter(OpTime _opTime,const WriteConcernOptions * _writeConcern)169 ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern)
170     : opTime(std::move(_opTime)), writeConcern(_writeConcern) {}
171 
toBSON() const172 BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const {
173     BSONObjBuilder bob;
174     bob.append("opTime", opTime.toBSON());
175     if (writeConcern) {
176         bob.append("writeConcern", writeConcern->toBSON());
177     }
178     return bob.obj();
179 };
180 
toString() const181 std::string ReplicationCoordinatorImpl::Waiter::toString() const {
182     return toBSON().toString();
183 };
184 
185 
ThreadWaiter(OpTime _opTime,const WriteConcernOptions * _writeConcern,stdx::condition_variable * _condVar)186 ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime,
187                                                        const WriteConcernOptions* _writeConcern,
188                                                        stdx::condition_variable* _condVar)
189     : Waiter(_opTime, _writeConcern), condVar(_condVar) {}
190 
notify_inlock()191 void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() {
192     invariant(condVar);
193     condVar->notify_all();
194 }
195 
CallbackWaiter(OpTime _opTime,FinishFunc _finishCallback)196 ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime,
197                                                            FinishFunc _finishCallback)
198     : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {}
199 
notify_inlock()200 void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() {
201     invariant(finishCallback);
202     finishCallback();
203 }
204 
205 
206 class ReplicationCoordinatorImpl::WaiterGuard {
207 public:
208     /**
209      * Constructor takes the list of waiters and enqueues itself on the list, removing itself
210      * in the destructor.
211      *
212      * Usually waiters will be signaled and removed when their criteria are satisfied, but
213      * wait_until() with timeout may signal waiters earlier and this guard will remove the waiter
214      * properly.
215      *
216      * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one
217      * of these without holding _mutex
218      */
WaiterGuard(WaiterList * list,Waiter * waiter)219     WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) {
220         list->add_inlock(_waiter);
221     }
222 
~WaiterGuard()223     ~WaiterGuard() {
224         _list->remove_inlock(_waiter);
225     }
226 
227 private:
228     WaiterList* _list;
229     Waiter* _waiter;
230 };
231 
add_inlock(WaiterType waiter)232 void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
233     _list.push_back(waiter);
234 }
235 
signalAndRemoveIf_inlock(stdx::function<bool (WaiterType)> func)236 void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock(
237     stdx::function<bool(WaiterType)> func) {
238     // Only advance iterator when the element doesn't match.
239     for (auto it = _list.begin(); it != _list.end();) {
240         if (!func(*it)) {
241             ++it;
242             continue;
243         }
244 
245         WaiterType waiter = std::move(*it);
246         if (it == std::prev(_list.end())) {
247             // Iterator will be invalid after erasing the last element, so set it to the
248             // next one (i.e. end()).
249             it = _list.erase(it);
250         } else {
251             // Iterator is still valid after pop_back().
252             std::swap(*it, _list.back());
253             _list.pop_back();
254         }
255 
256         // It's important to call notify() after the waiter has been removed from the list
257         // since notify() might remove the waiter itself.
258         waiter->notify_inlock();
259     }
260 }
261 
signalAndRemoveAll_inlock()262 void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() {
263     std::vector<WaiterType> list = std::move(_list);
264     // Call notify() after removing the waiters from the list.
265     for (auto& waiter : list) {
266         waiter->notify_inlock();
267     }
268 }
269 
remove_inlock(WaiterType waiter)270 bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) {
271     auto it = std::find(_list.begin(), _list.end(), waiter);
272     if (it == _list.end()) {
273         return false;
274     }
275     std::swap(*it, _list.back());
276     _list.pop_back();
277     return true;
278 }
279 
280 namespace {
getReplicationModeFromSettings(const ReplSettings & settings)281 ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& settings) {
282     if (settings.usingReplSets()) {
283         return ReplicationCoordinator::modeReplSet;
284     }
285     if (settings.isMaster() || settings.isSlave()) {
286         return ReplicationCoordinator::modeMasterSlave;
287     }
288     return ReplicationCoordinator::modeNone;
289 }
290 
createInitialSyncerOptions(ReplicationCoordinator * replCoord,ReplicationCoordinatorExternalState * externalState)291 InitialSyncerOptions createInitialSyncerOptions(
292     ReplicationCoordinator* replCoord, ReplicationCoordinatorExternalState* externalState) {
293     InitialSyncerOptions options;
294     options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); };
295     options.setMyLastOptime = [replCoord, externalState](
296         const OpTime& opTime, ReplicationCoordinator::DataConsistency consistency) {
297         replCoord->setMyLastAppliedOpTimeForward(opTime, consistency);
298         externalState->setGlobalTimestamp(replCoord->getServiceContext(), opTime.getTimestamp());
299     };
300     options.resetOptimes = [replCoord]() { replCoord->resetMyLastOpTimes(); };
301     options.getSlaveDelay = [replCoord]() { return replCoord->getSlaveDelaySecs(); };
302     options.syncSourceSelector = replCoord;
303     options.batchLimits = externalState->getInitialSyncBatchLimits();
304     options.oplogFetcherMaxFetcherRestarts =
305         externalState->getOplogFetcherInitialSyncMaxFetcherRestarts();
306     return options;
307 }
308 }  // namespace
309 
ReplicationCoordinatorImpl(ServiceContext * service,const ReplSettings & settings,std::unique_ptr<ReplicationCoordinatorExternalState> externalState,std::unique_ptr<executor::TaskExecutor> executor,std::unique_ptr<TopologyCoordinator> topCoord,ReplicationProcess * replicationProcess,StorageInterface * storage,int64_t prngSeed)310 ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
311     ServiceContext* service,
312     const ReplSettings& settings,
313     std::unique_ptr<ReplicationCoordinatorExternalState> externalState,
314     std::unique_ptr<executor::TaskExecutor> executor,
315     std::unique_ptr<TopologyCoordinator> topCoord,
316     ReplicationProcess* replicationProcess,
317     StorageInterface* storage,
318     int64_t prngSeed)
319     : _service(service),
320       _settings(settings),
321       _replMode(getReplicationModeFromSettings(settings)),
322       _topCoord(std::move(topCoord)),
323       _replExecutor(std::move(executor)),
324       _externalState(std::move(externalState)),
325       _inShutdown(false),
326       _memberState(MemberState::RS_STARTUP),
327       _rsConfigState(kConfigPreStart),
328       _selfIndex(-1),
329       _sleptLastElection(false),
330       _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())),
331       _canServeNonLocalReads(0U),
332       _replicationProcess(replicationProcess),
333       _storage(storage),
334       _random(prngSeed) {
335 
336     _termShadow.store(OpTime::kUninitializedTerm);
337 
338     invariant(_service);
339 
340     if (!isReplEnabled()) {
341         return;
342     }
343 
344     _externalState->setupNoopWriter(Seconds(periodicNoopIntervalSecs));
345 }
346 
347 ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() = default;
348 
waitForStartUpComplete_forTest()349 void ReplicationCoordinatorImpl::waitForStartUpComplete_forTest() {
350     _waitForStartUpComplete();
351 }
352 
_waitForStartUpComplete()353 void ReplicationCoordinatorImpl::_waitForStartUpComplete() {
354     CallbackHandle handle;
355     {
356         stdx::unique_lock<stdx::mutex> lk(_mutex);
357         while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
358             _rsConfigStateChange.wait(lk);
359         }
360         handle = _finishLoadLocalConfigCbh;
361     }
362     if (handle.isValid()) {
363         _replExecutor->wait(handle);
364     }
365 }
366 
getReplicaSetConfig_forTest()367 ReplSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() {
368     stdx::lock_guard<stdx::mutex> lk(_mutex);
369     return _rsConfig;
370 }
371 
getElectionTimeout_forTest() const372 Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const {
373     stdx::lock_guard<stdx::mutex> lk(_mutex);
374     if (!_handleElectionTimeoutCbh.isValid()) {
375         return Date_t();
376     }
377     return _handleElectionTimeoutWhen;
378 }
379 
getRandomizedElectionOffset_forTest()380 Milliseconds ReplicationCoordinatorImpl::getRandomizedElectionOffset_forTest() {
381     stdx::lock_guard<stdx::mutex> lk(_mutex);
382     return _getRandomizedElectionOffset_inlock();
383 }
384 
getPriorityTakeover_forTest() const385 boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const {
386     stdx::lock_guard<stdx::mutex> lk(_mutex);
387     if (!_priorityTakeoverCbh.isValid()) {
388         return boost::none;
389     }
390     return _priorityTakeoverWhen;
391 }
392 
getCatchupTakeover_forTest() const393 boost::optional<Date_t> ReplicationCoordinatorImpl::getCatchupTakeover_forTest() const {
394     stdx::lock_guard<stdx::mutex> lk(_mutex);
395     if (!_catchupTakeoverCbh.isValid()) {
396         return boost::none;
397     }
398     return _catchupTakeoverWhen;
399 }
400 
getCatchupTakeoverCbh_forTest() const401 executor::TaskExecutor::CallbackHandle ReplicationCoordinatorImpl::getCatchupTakeoverCbh_forTest()
402     const {
403     return _catchupTakeoverCbh;
404 }
405 
getCurrentCommittedSnapshotOpTime() const406 OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
407     stdx::lock_guard<stdx::mutex> lk(_mutex);
408     return _getCurrentCommittedSnapshotOpTime_inlock();
409 }
410 
_getCurrentCommittedSnapshotOpTime_inlock() const411 OpTime ReplicationCoordinatorImpl::_getCurrentCommittedSnapshotOpTime_inlock() const {
412     if (_currentCommittedSnapshot) {
413         return _currentCommittedSnapshot.get();
414     }
415     return OpTime();
416 }
417 
_getCurrentCommittedLogicalTime_inlock() const418 LogicalTime ReplicationCoordinatorImpl::_getCurrentCommittedLogicalTime_inlock() const {
419     return LogicalTime(_getCurrentCommittedSnapshotOpTime_inlock().getTimestamp());
420 }
421 
appendDiagnosticBSON(mongo::BSONObjBuilder * bob)422 void ReplicationCoordinatorImpl::appendDiagnosticBSON(mongo::BSONObjBuilder* bob) {
423     BSONObjBuilder eBuilder(bob->subobjStart("executor"));
424     _replExecutor->appendDiagnosticBSON(&eBuilder);
425 }
426 
appendConnectionStats(executor::ConnectionPoolStats * stats) const427 void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) const {
428     _replExecutor->appendConnectionStats(stats);
429 }
430 
_startLoadLocalConfig(OperationContext * opCtx)431 bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) {
432     _replicationProcess->getConsistencyMarkers()->initializeMinValidDocument(opCtx);
433 
434     fassert(51240, _externalState->createLocalLastVoteCollection(opCtx));
435 
436     StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(opCtx);
437     if (!lastVote.isOK()) {
438         severe() << "Error loading local voted for document at startup; " << lastVote.getStatus();
439         fassertFailedNoTrace(40367);
440     }
441     if (lastVote.getValue().getTerm() == OpTime::kInitialTerm) {
442         // This log line is checked in unit tests.
443         log() << "Did not find local initialized voted for document at startup.";
444     }
445     {
446         stdx::lock_guard<stdx::mutex> lk(_mutex);
447         _topCoord->loadLastVote(lastVote.getValue());
448     }
449 
450     // Check that we have a local Rollback ID. If we do not have one, create one.
451     auto status = _replicationProcess->refreshRollbackID(opCtx);
452     if (!status.isOK()) {
453         if (status == ErrorCodes::NamespaceNotFound) {
454             log() << "Did not find local Rollback ID document at startup. Creating one.";
455             auto initializingStatus = _replicationProcess->initializeRollbackID(opCtx);
456             fassertStatusOK(40424, initializingStatus);
457         } else {
458             severe() << "Error loading local Rollback ID document at startup; " << status;
459             fassertFailedNoTrace(40428);
460         }
461     }
462 
463     StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(opCtx);
464     if (!cfg.isOK()) {
465         log() << "Did not find local replica set configuration document at startup;  "
466               << cfg.getStatus();
467         return true;
468     }
469     ReplSetConfig localConfig;
470     status = localConfig.initialize(cfg.getValue());
471     if (!status.isOK()) {
472         error() << "Locally stored replica set configuration does not parse; See "
473                    "http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config "
474                    "for information on how to recover from this. Got \""
475                 << status << "\" while parsing " << cfg.getValue();
476         fassertFailedNoTrace(28545);
477     }
478 
479     // Read the last op from the oplog after cleaning up any partially applied batches.
480     _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx);
481     auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
482 
483     // Use a callback here, because _finishLoadLocalConfig calls isself() which requires
484     // that the server's networking layer be up and running and accepting connections, which
485     // doesn't happen until startReplication finishes.
486     auto handle =
487         _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig,
488                                                this,
489                                                stdx::placeholders::_1,
490                                                localConfig,
491                                                lastOpTimeStatus,
492                                                lastVote));
493     if (handle == ErrorCodes::ShutdownInProgress) {
494         handle = CallbackHandle{};
495     }
496     fassertStatusOK(40446, handle);
497     stdx::lock_guard<stdx::mutex> lk(_mutex);
498     _finishLoadLocalConfigCbh = std::move(handle.getValue());
499 
500     return false;
501 }
502 
_finishLoadLocalConfig(const executor::TaskExecutor::CallbackArgs & cbData,const ReplSetConfig & localConfig,const StatusWith<OpTime> & lastOpTimeStatus,const StatusWith<LastVote> & lastVoteStatus)503 void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
504     const executor::TaskExecutor::CallbackArgs& cbData,
505     const ReplSetConfig& localConfig,
506     const StatusWith<OpTime>& lastOpTimeStatus,
507     const StatusWith<LastVote>& lastVoteStatus) {
508     if (!cbData.status.isOK()) {
509         LOG(1) << "Loading local replica set configuration failed due to " << cbData.status;
510         return;
511     }
512 
513     StatusWith<int> myIndex =
514         validateConfigForStartUp(_externalState.get(), localConfig, getServiceContext());
515     if (!myIndex.isOK()) {
516         if (myIndex.getStatus() == ErrorCodes::NodeNotFound ||
517             myIndex.getStatus() == ErrorCodes::DuplicateKey) {
518             warning() << "Locally stored replica set configuration does not have a valid entry "
519                          "for the current node; waiting for reconfig or remote heartbeat; Got \""
520                       << myIndex.getStatus() << "\" while validating " << localConfig.toBSON();
521             myIndex = StatusWith<int>(-1);
522         } else {
523             error() << "Locally stored replica set configuration is invalid; See "
524                        "http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config"
525                        " for information on how to recover from this. Got \""
526                     << myIndex.getStatus() << "\" while validating " << localConfig.toBSON();
527             fassertFailedNoTrace(28544);
528         }
529     }
530 
531     if (localConfig.getReplSetName() != _settings.ourSetName()) {
532         warning() << "Local replica set configuration document reports set name of "
533                   << localConfig.getReplSetName() << ", but command line reports "
534                   << _settings.ourSetName() << "; waiting for reconfig or remote heartbeat";
535         myIndex = StatusWith<int>(-1);
536     }
537 
538     if (serverGlobalParams.enableMajorityReadConcern && localConfig.getNumMembers() == 3 &&
539         localConfig.getNumDataBearingMembers() == 2) {
540         log() << startupWarningsLog;
541         log() << "** WARNING: This replica set has a Primary-Secondary-Arbiter architecture, but "
542                  "readConcern:majority is enabled "
543               << startupWarningsLog;
544         log() << "**          for this node. This is not a recommended configuration. Please see "
545               << startupWarningsLog;
546         log() << "**          https://dochub.mongodb.org/core/psa-disable-rc-majority-3.6"
547               << startupWarningsLog;
548         log() << startupWarningsLog;
549     }
550 
551     // Do not check optime, if this node is an arbiter.
552     bool isArbiter =
553         myIndex.getValue() != -1 && localConfig.getMemberAt(myIndex.getValue()).isArbiter();
554     OpTime lastOpTime;
555     if (!isArbiter) {
556         if (!lastOpTimeStatus.isOK()) {
557             warning() << "Failed to load timestamp of most recently applied operation: "
558                       << lastOpTimeStatus.getStatus();
559         } else {
560             lastOpTime = lastOpTimeStatus.getValue();
561         }
562     } else {
563         // The node is an arbiter hence will not need logical clock for external operations.
564         LogicalClock::get(getServiceContext())->setEnabled(false);
565         auto validator = LogicalTimeValidator::get(getServiceContext());
566         if (validator) {
567             validator->stopKeyManager();
568         }
569     }
570 
571     long long term = OpTime::kUninitializedTerm;
572     if (localConfig.getProtocolVersion() == 1) {
573         // Restore the current term according to the terms of last oplog entry and last vote.
574         // The initial term of OpTime() is 0.
575         term = lastOpTime.getTerm();
576         if (lastVoteStatus.isOK()) {
577             long long lastVoteTerm = lastVoteStatus.getValue().getTerm();
578             if (term < lastVoteTerm) {
579                 term = lastVoteTerm;
580             }
581         }
582     }
583 
584     auto opCtx = cc().makeOperationContext();
585     auto consistency = DataConsistency::Inconsistent;
586     if (!lastOpTime.isNull()) {
587 
588         // If we have an oplog, it is still possible that our data is not in a consistent state. For
589         // example, if we are starting up after a crash following a post-rollback RECOVERING state.
590         // To detect this, we see if our last optime is >= the 'minValid' optime, which
591         // should be persistent across node crashes.
592         OpTime minValid = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
593         consistency =
594             (lastOpTime >= minValid) ? DataConsistency::Consistent : DataConsistency::Inconsistent;
595     }
596 
597     stdx::unique_lock<stdx::mutex> lock(_mutex);
598     invariant(_rsConfigState == kConfigStartingUp);
599     const PostMemberStateUpdateAction action =
600         _setCurrentRSConfig_inlock(opCtx.get(), localConfig, myIndex.getValue());
601 
602     // Set our last applied and durable optimes to the top of the oplog, if we have one.
603     if (!lastOpTime.isNull()) {
604         bool isRollbackAllowed = false;
605         _setMyLastAppliedOpTime_inlock(lastOpTime, isRollbackAllowed, consistency);
606         _setMyLastDurableOpTime_inlock(lastOpTime, isRollbackAllowed);
607         _reportUpstream_inlock(std::move(lock));  // unlocks _mutex.
608     } else {
609         lock.unlock();
610     }
611 
612     _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp());
613     {
614         stdx::lock_guard<stdx::mutex> lk(_mutex);
615         // Step down is impossible, so we don't need to wait for the returned event.
616         _updateTerm_inlock(term);
617     }
618     LOG(1) << "Current term is now " << term;
619     if (action == kActionWinElection) {
620         stdx::lock_guard<stdx::mutex> lk(_mutex);
621         _postWonElectionUpdateMemberState_inlock();
622     } else {
623         _performPostMemberStateUpdateAction(action);
624     }
625 
626     if (!isArbiter && myIndex.getValue() != -1) {
627         _externalState->startThreads(_settings);
628         _startDataReplication(opCtx.get());
629     }
630 }
631 
_stopDataReplication(OperationContext * opCtx)632 void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* opCtx) {
633     std::shared_ptr<InitialSyncer> initialSyncerCopy;
634     {
635         stdx::lock_guard<stdx::mutex> lk(_mutex);
636         if (!_startedSteadyStateReplication) {
637             return;
638         }
639 
640         _startedSteadyStateReplication = false;
641         _initialSyncer.swap(initialSyncerCopy);
642     }
643     if (initialSyncerCopy) {
644         LOG(1)
645             << "ReplicationCoordinatorImpl::_stopDataReplication calling InitialSyncer::shutdown.";
646         const auto status = initialSyncerCopy->shutdown();
647         if (!status.isOK()) {
648             warning() << "InitialSyncer shutdown failed: " << status;
649         }
650         initialSyncerCopy.reset();
651         // Do not return here, fall through.
652     }
653     LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling "
654               "ReplCoordExtState::stopDataReplication.";
655     _externalState->stopDataReplication(opCtx);
656 }
657 
_startDataReplication(OperationContext * opCtx,stdx::function<void ()> startCompleted)658 void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
659                                                        stdx::function<void()> startCompleted) {
660     stdx::unique_lock<stdx::mutex> lk(_mutex);
661     if (_startedSteadyStateReplication) {
662         return;
663     }
664 
665     _startedSteadyStateReplication = true;
666 
667     // Check to see if we need to do an initial sync.
668     const auto lastOpTime = _getMyLastAppliedOpTime_inlock();
669     const auto needsInitialSync =
670         lastOpTime.isNull() || _externalState->isInitialSyncFlagSet(opCtx);
671     if (!needsInitialSync) {
672         // Start steady replication, since we already have data.
673         // ReplSetConfig has been installed, so it's either in STARTUP2 or REMOVED.
674         auto memberState = _getMemberState_inlock();
675         invariant(memberState.startup2() || memberState.removed());
676 
677         lk.unlock();
678         invariantOK(setFollowerMode(MemberState::RS_RECOVERING));
679         _externalState->startSteadyStateReplication(opCtx, this);
680         return;
681     }
682 
683     // Do initial sync.
684     if (!_externalState->getTaskExecutor()) {
685         log() << "not running initial sync during test.";
686         return;
687     }
688 
689     auto onCompletion = [this, startCompleted](const StatusWith<OpTimeWithHash>& status) {
690         {
691             stdx::lock_guard<stdx::mutex> lock(_mutex);
692             if (status == ErrorCodes::CallbackCanceled) {
693                 log() << "Initial Sync has been cancelled: " << status.getStatus();
694                 return;
695             } else if (!status.isOK()) {
696                 if (_inShutdown) {
697                     log() << "Initial Sync failed during shutdown due to " << status.getStatus();
698                     return;
699                 } else {
700                     error() << "Initial sync failed, shutting down now. Restart the server "
701                                "to attempt a new initial sync.";
702                     fassertFailedWithStatusNoTrace(40088, status.getStatus());
703                 }
704             }
705 
706             const auto lastApplied = status.getValue();
707             _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false, DataConsistency::Consistent);
708         }
709 
710         // Clear maint. mode.
711         while (getMaintenanceMode()) {
712             setMaintenanceMode(false).transitional_ignore();
713         }
714 
715         if (startCompleted) {
716             startCompleted();
717         }
718         // Repair local db (to compact it).
719         auto opCtxHolder = cc().makeOperationContext();
720         uassertStatusOK(_externalState->runRepairOnLocalDB(opCtxHolder.get()));
721         // Because initial sync completed, we can only be in STARTUP2, not REMOVED.
722         // Transition from STARTUP2 to RECOVERING and start the producer and the applier.
723         invariant(getMemberState().startup2());
724         invariantOK(setFollowerMode(MemberState::RS_RECOVERING));
725         _externalState->startSteadyStateReplication(opCtxHolder.get(), this);
726     };
727 
728     std::shared_ptr<InitialSyncer> initialSyncerCopy;
729     try {
730         {
731             initialSyncerCopy = std::make_shared<InitialSyncer>(
732                 createInitialSyncerOptions(this, _externalState.get()),
733                 stdx::make_unique<DataReplicatorExternalStateInitialSync>(this,
734                                                                           _externalState.get()),
735                 _storage,
736                 _replicationProcess,
737                 onCompletion);
738             _initialSyncer = initialSyncerCopy;
739         }
740         // InitialSyncer::startup() must be called outside lock because it uses features (eg.
741         // setting the initial sync flag) which depend on the ReplicationCoordinatorImpl.
742         lk.unlock();
743         uassertStatusOK(initialSyncerCopy->startup(opCtx, numInitialSyncAttempts.load()));
744     } catch (...) {
745         auto status = exceptionToStatus();
746         log() << "Initial Sync failed to start: " << status;
747         if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
748             return;
749         }
750         fassertFailedWithStatusNoTrace(40354, status);
751     }
752 }
753 
startup(OperationContext * opCtx)754 void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
755     if (!isReplEnabled()) {
756         stdx::lock_guard<stdx::mutex> lk(_mutex);
757         _setConfigState_inlock(kConfigReplicationDisabled);
758         return;
759     }
760 
761     {
762         OID rid = _externalState->ensureMe(opCtx);
763 
764         stdx::lock_guard<stdx::mutex> lk(_mutex);
765         fassert(18822, !_inShutdown);
766         _setConfigState_inlock(kConfigStartingUp);
767         _myRID = rid;
768         _topCoord->getMyMemberData()->setRid(rid);
769     }
770 
771     if (!_settings.usingReplSets()) {
772         // Must be Master/Slave
773         invariant(_settings.isMaster() || _settings.isSlave());
774         _externalState->startMasterSlave(opCtx);
775         return;
776     }
777 
778     _replExecutor->startup();
779 
780     {
781         stdx::lock_guard<stdx::mutex> lk(_mutex);
782         _topCoord->setStorageEngineSupportsReadCommitted(
783             _externalState->isReadCommittedSupportedByStorageEngine(opCtx));
784     }
785 
786     bool doneLoadingConfig = _startLoadLocalConfig(opCtx);
787     if (doneLoadingConfig) {
788         // If we're not done loading the config, then the config state will be set by
789         // _finishLoadLocalConfig.
790         stdx::lock_guard<stdx::mutex> lk(_mutex);
791         invariant(!_rsConfig.isInitialized());
792         _setConfigState_inlock(kConfigUninitialized);
793     }
794 }
795 
enterTerminalShutdown()796 void ReplicationCoordinatorImpl::enterTerminalShutdown() {
797     stdx::lock_guard<stdx::mutex> lk(_mutex);
798     _inTerminalShutdown = true;
799 }
800 
shutdown(OperationContext * opCtx)801 void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
802     // Shutdown must:
803     // * prevent new threads from blocking in awaitReplication
804     // * wake up all existing threads blocking in awaitReplication
805     // * Shut down and join the execution resources it owns.
806 
807     if (!_settings.usingReplSets()) {
808         return;
809     }
810 
811     log() << "shutting down replication subsystems";
812 
813     // Used to shut down outside of the lock.
814     std::shared_ptr<InitialSyncer> initialSyncerCopy;
815     {
816         stdx::unique_lock<stdx::mutex> lk(_mutex);
817         fassert(28533, !_inShutdown);
818         _inShutdown = true;
819         if (_rsConfigState == kConfigPreStart) {
820             warning() << "ReplicationCoordinatorImpl::shutdown() called before "
821                          "startup() finished.  Shutting down without cleaning up the "
822                          "replication system";
823             return;
824         }
825         if (_rsConfigState == kConfigStartingUp) {
826             // Wait until we are finished starting up, so that we can cleanly shut everything down.
827             lk.unlock();
828             _waitForStartUpComplete();
829             lk.lock();
830             fassert(18823, _rsConfigState != kConfigStartingUp);
831         }
832         _replicationWaiterList.signalAndRemoveAll_inlock();
833         _opTimeWaiterList.signalAndRemoveAll_inlock();
834         _currentCommittedSnapshotCond.notify_all();
835         _initialSyncer.swap(initialSyncerCopy);
836         _stepDownWaiters.notify_all();
837     }
838 
839 
840     // joining the replication executor is blocking so it must be run outside of the mutex
841     if (initialSyncerCopy) {
842         LOG(1) << "ReplicationCoordinatorImpl::shutdown calling InitialSyncer::shutdown.";
843         const auto status = initialSyncerCopy->shutdown();
844         if (!status.isOK()) {
845             warning() << "InitialSyncer shutdown failed: " << status;
846         }
847         initialSyncerCopy->join();
848         initialSyncerCopy.reset();
849     }
850     _externalState->shutdown(opCtx);
851     _replExecutor->shutdown();
852     _replExecutor->join();
853 }
854 
getSettings() const855 const ReplSettings& ReplicationCoordinatorImpl::getSettings() const {
856     return _settings;
857 }
858 
getReplicationMode() const859 ReplicationCoordinator::Mode ReplicationCoordinatorImpl::getReplicationMode() const {
860     return _replMode;
861 }
862 
getMemberState() const863 MemberState ReplicationCoordinatorImpl::getMemberState() const {
864     stdx::lock_guard<stdx::mutex> lk(_mutex);
865     return _getMemberState_inlock();
866 }
867 
_getMemberState_inlock() const868 MemberState ReplicationCoordinatorImpl::_getMemberState_inlock() const {
869     return _memberState;
870 }
871 
waitForMemberState(MemberState expectedState,Milliseconds timeout)872 Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState,
873                                                       Milliseconds timeout) {
874     if (timeout < Milliseconds(0)) {
875         return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative");
876     }
877 
878     stdx::unique_lock<stdx::mutex> lk(_mutex);
879     auto pred = [this, expectedState]() { return _memberState == expectedState; };
880     if (!_memberStateChange.wait_for(lk, timeout.toSystemDuration(), pred)) {
881         return Status(ErrorCodes::ExceededTimeLimit,
882                       str::stream() << "Timed out waiting for state to become "
883                                     << expectedState.toString()
884                                     << ". Current state is "
885                                     << _memberState.toString());
886     }
887     return Status::OK();
888 }
889 
getSlaveDelaySecs() const890 Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
891     stdx::lock_guard<stdx::mutex> lk(_mutex);
892     invariant(_rsConfig.isInitialized());
893     if (_selfIndex == -1) {
894         // We aren't currently in the set. Return 0 seconds so we can clear out the applier's
895         // queue of work.
896         return Seconds(0);
897     }
898     return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay();
899 }
900 
clearSyncSourceBlacklist()901 void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() {
902     stdx::lock_guard<stdx::mutex> lk(_mutex);
903     _topCoord->clearSyncSourceBlacklist();
904 }
905 
setFollowerMode(const MemberState & newState)906 Status ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) {
907     stdx::unique_lock<stdx::mutex> lk(_mutex);
908     if (newState == _topCoord->getMemberState()) {
909         return Status::OK();
910     }
911     if (_topCoord->getRole() == TopologyCoordinator::Role::kLeader) {
912         return Status(ErrorCodes::NotSecondary,
913                       "Cannot set follower mode when node is currently the leader");
914     }
915 
916     if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
917         // We were a candidate, which means _topCoord believed us to be in state RS_SECONDARY, and
918         // we know that newState != RS_SECONDARY because we would have returned early, above if
919         // the old and new state were equal. So, try again after the election is over to
920         // finish setting the follower mode.  We cannot wait for the election to finish here as we
921         // may be holding a global X lock, so we return a bad status and rely on the caller to
922         // retry.
923         return Status(ErrorCodes::ElectionInProgress,
924                       str::stream() << "Cannot set follower mode to " << newState.toString()
925                                     << " because we are in the middle of running an election");
926     }
927 
928     _topCoord->setFollowerMode(newState.s);
929 
930     const PostMemberStateUpdateAction action =
931         _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
932 
933     if (action == kActionWinElection) {
934         _postWonElectionUpdateMemberState_inlock();
935     } else {
936         lk.unlock();
937         _performPostMemberStateUpdateAction(action);
938     }
939 
940     return Status::OK();
941 }
942 
getApplierState()943 ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() {
944     stdx::lock_guard<stdx::mutex> lk(_mutex);
945     return _applierState;
946 }
947 
signalDrainComplete(OperationContext * opCtx,long long termWhenBufferIsEmpty)948 void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
949                                                      long long termWhenBufferIsEmpty) {
950     // This logic is a little complicated in order to avoid acquiring the global exclusive lock
951     // unnecessarily.  This is important because the applier may call signalDrainComplete()
952     // whenever it wants, not only when the ReplicationCoordinator is expecting it.
953     //
954     // The steps are:
955     // 1.) Check to see if we're waiting for this signal.  If not, return early.
956     // 2.) Otherwise, release the mutex while acquiring the global exclusive lock,
957     //     since that might take a while (NB there's a deadlock cycle otherwise, too).
958     // 3.) Re-check to see if we've somehow left drain mode.  If we have not, clear
959     //     producer and applier's states, set the flag allowing non-local database writes and
960     //     drop the mutex.  At this point, no writes can occur from other threads, due to the
961     //     global exclusive lock.
962     // 4.) Drop all temp collections, and log the drops to the oplog.
963     // 5.) Log transition to primary in the oplog and set that OpTime as the floor for what we will
964     //     consider to be committed.
965     // 6.) Drop the global exclusive lock.
966     //
967     // Because replicatable writes are forbidden while in drain mode, and we don't exit drain
968     // mode until we have the global exclusive lock, which forbids all other threads from making
969     // writes, we know that from the time that _canAcceptNonLocalWrites is set until
970     // this method returns, no external writes will be processed.  This is important so that a new
971     // temp collection isn't introduced on the new primary before we drop all the temp collections.
972 
973     // When we go to drop all temp collections, we must replicate the drops.
974     invariant(opCtx->writesAreReplicated());
975 
976     stdx::unique_lock<stdx::mutex> lk(_mutex);
977     if (_applierState != ApplierState::Draining) {
978         return;
979     }
980     lk.unlock();
981 
982     _externalState->onDrainComplete(opCtx);
983 
984     if (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) {
985         log() << "transition to primary - "
986                  "transitionToPrimaryHangBeforeTakingGlobalExclusiveLock fail point enabled. "
987                  "Blocking until fail point is disabled.";
988         while (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) {
989             mongo::sleepsecs(1);
990             {
991                 stdx::lock_guard<stdx::mutex> lock(_mutex);
992                 if (_inShutdown) {
993                     break;
994                 }
995             }
996         }
997     }
998 
999     Lock::GlobalWrite globalWriteLock(opCtx);
1000     lk.lock();
1001 
1002     // Exit drain mode only if we're actually in draining mode, the apply buffer is empty in the
1003     // current term, and we're allowed to become the write master.
1004     if (_applierState != ApplierState::Draining ||
1005         !_topCoord->canCompleteTransitionToPrimary(termWhenBufferIsEmpty)) {
1006         return;
1007     }
1008     _applierState = ApplierState::Stopped;
1009 
1010     invariant(_getMemberState_inlock().primary());
1011     invariant(!_canAcceptNonLocalWrites.loadRelaxed());
1012 
1013     {
1014         lk.unlock();
1015         AllowNonLocalWritesBlock writesAllowed(opCtx);
1016         OpTime firstOpTime = _externalState->onTransitionToPrimary(opCtx, isV1ElectionProtocol());
1017         lk.lock();
1018 
1019         auto status = _topCoord->completeTransitionToPrimary(firstOpTime);
1020         if (status.code() == ErrorCodes::PrimarySteppedDown) {
1021             log() << "Transition to primary failed" << causedBy(status);
1022             return;
1023         }
1024         invariantOK(status);
1025     }
1026 
1027     // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged
1028     // our election in onTransitionToPrimary(), above.
1029     _updateLastCommittedOpTime_inlock();
1030 
1031     // Update _canAcceptNonLocalWrites
1032     _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
1033 
1034     log() << "transition to primary complete; database writes are now permitted" << rsLog;
1035     _drainFinishedCond.notify_all();
1036     _externalState->startNoopWriter(_getMyLastAppliedOpTime_inlock());
1037 }
1038 
waitForDrainFinish(Milliseconds timeout)1039 Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
1040     if (timeout < Milliseconds(0)) {
1041         return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative");
1042     }
1043 
1044     stdx::unique_lock<stdx::mutex> lk(_mutex);
1045     auto pred = [this]() { return _applierState != ApplierState::Draining; };
1046     if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
1047         return Status(ErrorCodes::ExceededTimeLimit,
1048                       "Timed out waiting to finish draining applier buffer");
1049     }
1050 
1051     return Status::OK();
1052 }
1053 
signalUpstreamUpdater()1054 void ReplicationCoordinatorImpl::signalUpstreamUpdater() {
1055     _externalState->forwardSlaveProgress();
1056 }
1057 
setLastOptimeForSlave(const OID & rid,const Timestamp & ts)1058 Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const Timestamp& ts) {
1059     stdx::unique_lock<stdx::mutex> lock(_mutex);
1060     massert(28576,
1061             "Received an old style replication progress update, which is only used for Master/"
1062             "Slave replication now, but this node is not using Master/Slave replication. "
1063             "This is likely caused by an old (pre-2.6) member syncing from this node.",
1064             getReplicationMode() == modeMasterSlave);
1065 
1066     // term == -1 for master-slave
1067     OpTime opTime(ts, OpTime::kUninitializedTerm);
1068     MemberData* memberData = _topCoord->findMemberDataByRid(rid);
1069     if (memberData) {
1070         memberData->advanceLastAppliedOpTime(opTime, _replExecutor->now());
1071     } else {
1072         auto* memberData = _topCoord->addSlaveMemberData(rid);
1073         memberData->setLastAppliedOpTime(opTime, _replExecutor->now());
1074     }
1075     _updateLastCommittedOpTime_inlock();
1076     return Status::OK();
1077 }
1078 
setMyHeartbeatMessage(const std::string & msg)1079 void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
1080     stdx::unique_lock<stdx::mutex> lock(_mutex);
1081     _topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg);
1082 }
1083 
setMyLastAppliedOpTimeForward(const OpTime & opTime,DataConsistency consistency)1084 void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime,
1085                                                                DataConsistency consistency) {
1086     stdx::unique_lock<stdx::mutex> lock(_mutex);
1087     auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
1088     if (opTime > myLastAppliedOpTime) {
1089         _setMyLastAppliedOpTime_inlock(opTime, false, consistency);
1090         _reportUpstream_inlock(std::move(lock));
1091     } else {
1092         if (opTime != myLastAppliedOpTime) {
1093             // In pv1, oplog entries are ordered by non-decreasing term and strictly increasing
1094             // timestamp. So, in pv1, its not possible for us to get opTime with lower term and
1095             // timestamp higher than or equal to our current lastAppliedOptime.
1096             invariant(opTime.getTerm() == OpTime::kUninitializedTerm ||
1097                       myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
1098                       opTime.getTimestamp() < myLastAppliedOpTime.getTimestamp());
1099         }
1100 
1101         if (consistency == DataConsistency::Consistent && _canAcceptNonLocalWrites.loadRelaxed() &&
1102             _rsConfig.getWriteMajority() == 1) {
1103             // Single vote primaries may have a lagged stable timestamp due to paring back the
1104             // stable timestamp to the all committed timestamp.
1105             _setStableTimestampForStorage_inlock();
1106         }
1107     }
1108 }
1109 
setMyLastDurableOpTimeForward(const OpTime & opTime)1110 void ReplicationCoordinatorImpl::setMyLastDurableOpTimeForward(const OpTime& opTime) {
1111     stdx::unique_lock<stdx::mutex> lock(_mutex);
1112     if (opTime > _getMyLastDurableOpTime_inlock()) {
1113         _setMyLastDurableOpTime_inlock(opTime, false);
1114         _reportUpstream_inlock(std::move(lock));
1115     }
1116 }
1117 
setMyLastAppliedOpTime(const OpTime & opTime)1118 void ReplicationCoordinatorImpl::setMyLastAppliedOpTime(const OpTime& opTime) {
1119     stdx::unique_lock<stdx::mutex> lock(_mutex);
1120     // The optime passed to this function is required to represent a consistent database state.
1121     _setMyLastAppliedOpTime_inlock(opTime, false, DataConsistency::Consistent);
1122     _reportUpstream_inlock(std::move(lock));
1123 }
1124 
setMyLastDurableOpTime(const OpTime & opTime)1125 void ReplicationCoordinatorImpl::setMyLastDurableOpTime(const OpTime& opTime) {
1126     stdx::unique_lock<stdx::mutex> lock(_mutex);
1127     _setMyLastDurableOpTime_inlock(opTime, false);
1128     _reportUpstream_inlock(std::move(lock));
1129 }
1130 
resetMyLastOpTimes()1131 void ReplicationCoordinatorImpl::resetMyLastOpTimes() {
1132     stdx::unique_lock<stdx::mutex> lock(_mutex);
1133     _resetMyLastOpTimes_inlock();
1134     _reportUpstream_inlock(std::move(lock));
1135 }
1136 
_resetMyLastOpTimes_inlock()1137 void ReplicationCoordinatorImpl::_resetMyLastOpTimes_inlock() {
1138     LOG(1) << "resetting durable/applied optimes.";
1139     // Reset to uninitialized OpTime
1140     bool isRollbackAllowed = true;
1141     _setMyLastAppliedOpTime_inlock(OpTime(), isRollbackAllowed, DataConsistency::Inconsistent);
1142     _setMyLastDurableOpTime_inlock(OpTime(), isRollbackAllowed);
1143     _stableOpTimeCandidates.clear();
1144 }
1145 
_reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock)1146 void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock) {
1147     invariant(lock.owns_lock());
1148 
1149     if (getReplicationMode() != modeReplSet) {
1150         return;
1151     }
1152 
1153     if (_getMemberState_inlock().primary()) {
1154         return;
1155     }
1156 
1157     lock.unlock();
1158 
1159     _externalState->forwardSlaveProgress();  // Must do this outside _mutex
1160 }
1161 
_setMyLastAppliedOpTime_inlock(const OpTime & opTime,bool isRollbackAllowed,DataConsistency consistency)1162 void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& opTime,
1163                                                                 bool isRollbackAllowed,
1164                                                                 DataConsistency consistency) {
1165     auto* myMemberData = _topCoord->getMyMemberData();
1166     auto myLastAppliedOpTime = myMemberData->getLastAppliedOpTime();
1167 
1168     if (!(isRollbackAllowed || opTime == myLastAppliedOpTime)) {
1169         invariant(opTime > myLastAppliedOpTime);
1170         // In pv1, oplog entries are ordered by non-decreasing term and strictly increasing
1171         // timestamp. So, in pv1, its not possible for us to get opTime with higher term and
1172         // timestamp lesser than or equal to our current lastAppliedOptime.
1173         invariant(opTime.getTerm() == OpTime::kUninitializedTerm ||
1174                   myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
1175                   opTime.getTimestamp() > myLastAppliedOpTime.getTimestamp());
1176     }
1177     myMemberData->setLastAppliedOpTime(opTime, _replExecutor->now());
1178     // If we are using applied times to calculate the commit level, update it now.
1179     if (!_rsConfig.getWriteConcernMajorityShouldJournal()) {
1180         _updateLastCommittedOpTime_inlock();
1181     }
1182 
1183     // Signal anyone waiting on optime changes.
1184     _opTimeWaiterList.signalAndRemoveIf_inlock(
1185         [opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
1186 
1187 
1188     // Note that master-slave mode has no automatic fail over, and so rollbacks never occur.
1189     // Additionally, the commit point for a master-slave set will never advance, since it doesn't
1190     // use any consensus protocol. Since the set of stable optime candidates can only get cleaned up
1191     // when the commit point advances, we should refrain from updating stable optime candidates in
1192     // master-slave mode, to avoid the candidates list from growing unbounded.
1193     if (opTime.isNull() || getReplicationMode() != Mode::modeReplSet) {
1194         return;
1195     }
1196 
1197     // On PV0 secondaries, we do not track the commit point, so we do not use the commit point to
1198     // set the stable timestamp. Instead, we always set the stable timestamp to the last applied
1199     // write, to ensure that timestamp history can be purged. This means we do not need to track
1200     // stable optime candidates (and we should not track them, since we only purge the candidate
1201     // list when setting the stable timestamp due to the commit point advancing).
1202     const bool opTimeIsStableCandidate = isV1ElectionProtocol() || _memberState.primary();
1203 
1204     // Add the new applied optime to the list of stable optime candidates and then set the last
1205     // stable optime. Stable optimes are used to determine the last optime that it is safe to revert
1206     // the database to, in the event of a rollback via the 'recover to timestamp' method. If we are
1207     // setting our applied optime to a value that doesn't represent a consistent database state, we
1208     // should not add it to the set of stable optime candidates. For example, if we are in
1209     // RECOVERING after a rollback using the 'rollbackViaRefetch' algorithm, we will be inconsistent
1210     // until we reach the 'minValid' optime.
1211     if (consistency == DataConsistency::Consistent) {
1212         invariant(opTime.getTimestamp().getInc() > 0,
1213                   str::stream() << "Impossible optime received: " << opTime.toString());
1214         if (opTimeIsStableCandidate) {
1215             _stableOpTimeCandidates.insert(opTime);
1216         }
1217         // If we are lagged behind the commit optime, set a new stable timestamp here.
1218         if (opTime <= _topCoord->getLastCommittedOpTime()) {
1219             _setStableTimestampForStorage_inlock();
1220         }
1221     } else if (_getMemberState_inlock().startup2()) {
1222         // The oplog application phase of initial sync starts timestamping writes, causing
1223         // WiredTiger to pin this data in memory. Advancing the oldest timestamp in step with the
1224         // last applied optime here will permit WiredTiger to evict this data as it sees fit.
1225         _service->getGlobalStorageEngine()->setOldestTimestamp(opTime.getTimestamp());
1226     }
1227     if (!opTimeIsStableCandidate) {
1228         auto storageEngine = _service->getGlobalStorageEngine();
1229         if (storageEngine) {
1230             auto newOldestTimestamp = opTime.getTimestamp();
1231             if (!newOldestTimestamp.isNull()) {
1232                 _storage->setStableTimestamp(getServiceContext(), newOldestTimestamp);
1233             }
1234         }
1235     }
1236 }
1237 
_setMyLastDurableOpTime_inlock(const OpTime & opTime,bool isRollbackAllowed)1238 void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
1239                                                                 bool isRollbackAllowed) {
1240     auto* myMemberData = _topCoord->getMyMemberData();
1241     invariant(isRollbackAllowed || myMemberData->getLastDurableOpTime() <= opTime);
1242     myMemberData->setLastDurableOpTime(opTime, _replExecutor->now());
1243     // If we are using durable times to calculate the commit level, update it now.
1244     if (_rsConfig.getWriteConcernMajorityShouldJournal()) {
1245         _updateLastCommittedOpTime_inlock();
1246     }
1247 }
1248 
getMyLastAppliedOpTime() const1249 OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const {
1250     stdx::lock_guard<stdx::mutex> lock(_mutex);
1251     return _getMyLastAppliedOpTime_inlock();
1252 }
1253 
getMyLastDurableOpTime() const1254 OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
1255     stdx::lock_guard<stdx::mutex> lock(_mutex);
1256     return _getMyLastDurableOpTime_inlock();
1257 }
1258 
_validateReadConcern(OperationContext * opCtx,const ReadConcernArgs & readConcern)1259 Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
1260                                                         const ReadConcernArgs& readConcern) {
1261     // We should never wait for replication if we are holding any locks, because this can
1262     // potentially block for long time while doing network activity.
1263     if (opCtx->lockState()->isLocked()) {
1264         return {ErrorCodes::IllegalOperation,
1265                 "Waiting for replication not allowed while holding a lock"};
1266     }
1267 
1268     if (readConcern.getArgsClusterTime() &&
1269         readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern &&
1270         readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern) {
1271         return {ErrorCodes::BadValue,
1272                 "Only readConcern level 'majority' or 'local' is allowed when specifying "
1273                 "afterClusterTime"};
1274     }
1275 
1276     if (readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern &&
1277         !_externalState->isReadCommittedSupportedByStorageEngine(opCtx)) {
1278         return {ErrorCodes::ReadConcernMajorityNotEnabled,
1279                 "Majority read concern requested, but it is not supported by the storage engine."};
1280     }
1281 
1282     return Status::OK();
1283 }
1284 
waitUntilOpTimeForRead(OperationContext * opCtx,const ReadConcernArgs & readConcern)1285 Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCtx,
1286                                                           const ReadConcernArgs& readConcern) {
1287     auto verifyStatus = _validateReadConcern(opCtx, readConcern);
1288     if (!verifyStatus.isOK()) {
1289         return verifyStatus;
1290     }
1291 
1292     // nothing to wait for
1293     if (!readConcern.getArgsClusterTime() && !readConcern.getArgsOpTime()) {
1294         return Status::OK();
1295     }
1296 
1297     return waitUntilOpTimeForReadUntil(opCtx, readConcern, boost::none);
1298 }
1299 
waitUntilOpTimeForReadUntil(OperationContext * opCtx,const ReadConcernArgs & readConcern,boost::optional<Date_t> deadline)1300 Status ReplicationCoordinatorImpl::waitUntilOpTimeForReadUntil(OperationContext* opCtx,
1301                                                                const ReadConcernArgs& readConcern,
1302                                                                boost::optional<Date_t> deadline) {
1303     if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) {
1304         // For master/slave and standalone nodes, readAfterOpTime is not supported, so we return an
1305         // error. However, we consider all writes "committed" and can treat MajorityReadConcern as
1306         // LocalReadConcern, which is immediately satisfied since there is no OpTime to wait for.
1307         return {ErrorCodes::NotAReplicaSet,
1308                 "node needs to be a replica set member to use read concern"};
1309     }
1310 
1311     if (_rsConfigState == kConfigUninitialized || _rsConfigState == kConfigInitiating) {
1312         return {ErrorCodes::NotYetInitialized,
1313                 "Cannot use non-local read concern until replica set is finished initializing."};
1314     }
1315 
1316     if (readConcern.getArgsClusterTime()) {
1317         return _waitUntilClusterTimeForRead(opCtx, readConcern, deadline);
1318     } else {
1319         return _waitUntilOpTimeForReadDeprecated(opCtx, readConcern);
1320     }
1321 }
1322 
_waitUntilOpTime(OperationContext * opCtx,bool isMajorityReadConcern,OpTime targetOpTime,boost::optional<Date_t> deadline)1323 Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
1324                                                     bool isMajorityReadConcern,
1325                                                     OpTime targetOpTime,
1326                                                     boost::optional<Date_t> deadline) {
1327     if (!isMajorityReadConcern) {
1328         // This assumes the read concern is "local" level.
1329         // We need to wait for all committed writes to be visible, even in the oplog (which uses
1330         // special visibility rules).
1331         _externalState->waitForAllEarlierOplogWritesToBeVisible(opCtx);
1332     }
1333 
1334     stdx::unique_lock<stdx::mutex> lock(_mutex);
1335 
1336     if (isMajorityReadConcern && !_externalState->snapshotsEnabled()) {
1337         return {ErrorCodes::CommandNotSupported,
1338                 "Current storage engine does not support majority readConcerns"};
1339     }
1340 
1341     auto getCurrentOpTime = [this, isMajorityReadConcern]() {
1342         return isMajorityReadConcern ? _getCurrentCommittedSnapshotOpTime_inlock()
1343                                      : _getMyLastAppliedOpTime_inlock();
1344     };
1345 
1346     if (isMajorityReadConcern && targetOpTime > getCurrentOpTime()) {
1347         LOG(1) << "waitUntilOpTime: waiting for optime:" << targetOpTime
1348                << " to be in a snapshot -- current snapshot: " << getCurrentOpTime();
1349     }
1350 
1351     while (targetOpTime > getCurrentOpTime()) {
1352         if (_inShutdown) {
1353             return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
1354         }
1355 
1356         // If we are doing a majority read concern we only need to wait for a new snapshot.
1357         if (isMajorityReadConcern) {
1358             // Wait for a snapshot that meets our needs (< targetOpTime).
1359             LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << opCtx->getDeadline();
1360 
1361             auto waitStatus =
1362                 opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
1363             if (!waitStatus.isOK()) {
1364                 return waitStatus.withContext(str::stream()
1365                                               << "Error waiting for snapshot not less than "
1366                                               << targetOpTime.toString()
1367                                               << ", current relevant optime is "
1368                                               << getCurrentOpTime().toString()
1369                                               << ".");
1370             }
1371             LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
1372             continue;
1373         }
1374 
1375         // We just need to wait for the opTime to catch up to what we need (not majority RC).
1376         stdx::condition_variable condVar;
1377         ThreadWaiter waiter(targetOpTime, nullptr, &condVar);
1378         WaiterGuard guard(&_opTimeWaiterList, &waiter);
1379 
1380         LOG(3) << "waitUntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
1381                << waiter << " until " << opCtx->getDeadline();
1382 
1383         auto waitStatus = Status::OK();
1384         if (deadline) {
1385             auto waitUntilStatus =
1386                 opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, lock, *deadline);
1387             waitStatus = waitUntilStatus.getStatus();
1388         } else {
1389             waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
1390         }
1391 
1392         if (!waitStatus.isOK()) {
1393             return waitStatus.withContext(str::stream() << "Error waiting for optime "
1394                                                         << targetOpTime.toString()
1395                                                         << ", current relevant optime is "
1396                                                         << getCurrentOpTime().toString()
1397                                                         << ".");
1398         }
1399 
1400         // If deadline is set no need to wait until the targetTime time is reached.
1401         if (deadline) {
1402             return Status::OK();
1403         }
1404     }
1405 
1406     return Status::OK();
1407 }
1408 
_waitUntilClusterTimeForRead(OperationContext * opCtx,const ReadConcernArgs & readConcern,boost::optional<Date_t> deadline)1409 Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext* opCtx,
1410                                                                 const ReadConcernArgs& readConcern,
1411                                                                 boost::optional<Date_t> deadline) {
1412     auto clusterTime = *readConcern.getArgsClusterTime();
1413     invariant(clusterTime != LogicalTime::kUninitialized);
1414 
1415     // convert clusterTime to opTime so it can be used by the _opTimeWaiterList for wait on
1416     // readConcern level local.
1417     auto targetOpTime = OpTime(clusterTime.asTimestamp(), OpTime::kUninitializedTerm);
1418     invariant(!readConcern.getArgsOpTime());
1419 
1420     const bool isMajorityReadConcern =
1421         readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
1422 
1423     return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime, deadline);
1424 }
1425 
1426 // TODO: remove when SERVER-29729 is done
_waitUntilOpTimeForReadDeprecated(OperationContext * opCtx,const ReadConcernArgs & readConcern)1427 Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
1428     OperationContext* opCtx, const ReadConcernArgs& readConcern) {
1429     const bool isMajorityReadConcern =
1430         readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
1431 
1432     const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime());
1433     return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime);
1434 }
1435 
_getMyLastAppliedOpTime_inlock() const1436 OpTime ReplicationCoordinatorImpl::_getMyLastAppliedOpTime_inlock() const {
1437     return _topCoord->getMyLastAppliedOpTime();
1438 }
1439 
_getMyLastDurableOpTime_inlock() const1440 OpTime ReplicationCoordinatorImpl::_getMyLastDurableOpTime_inlock() const {
1441     return _topCoord->getMyLastDurableOpTime();
1442 }
1443 
setLastDurableOptime_forTest(long long cfgVer,long long memberId,const OpTime & opTime)1444 Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer,
1445                                                                 long long memberId,
1446                                                                 const OpTime& opTime) {
1447     stdx::lock_guard<stdx::mutex> lock(_mutex);
1448     invariant(getReplicationMode() == modeReplSet);
1449 
1450     const UpdatePositionArgs::UpdateInfo update(OpTime(), opTime, cfgVer, memberId);
1451     long long configVersion;
1452     const auto status = _setLastOptime_inlock(update, &configVersion);
1453     _updateLastCommittedOpTime_inlock();
1454     return status;
1455 }
1456 
setLastAppliedOptime_forTest(long long cfgVer,long long memberId,const OpTime & opTime)1457 Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer,
1458                                                                 long long memberId,
1459                                                                 const OpTime& opTime) {
1460     stdx::lock_guard<stdx::mutex> lock(_mutex);
1461     invariant(getReplicationMode() == modeReplSet);
1462 
1463     const UpdatePositionArgs::UpdateInfo update(opTime, OpTime(), cfgVer, memberId);
1464     long long configVersion;
1465     const auto status = _setLastOptime_inlock(update, &configVersion);
1466     _updateLastCommittedOpTime_inlock();
1467     return status;
1468 }
1469 
_setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo & args,long long * configVersion)1470 Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args,
1471                                                          long long* configVersion) {
1472     if (_selfIndex == -1) {
1473         // Ignore updates when we're in state REMOVED.
1474         return Status(ErrorCodes::NotMasterOrSecondary,
1475                       "Received replSetUpdatePosition command but we are in state REMOVED");
1476     }
1477     invariant(getReplicationMode() == modeReplSet);
1478 
1479     if (args.memberId < 0) {
1480         std::string errmsg = str::stream()
1481             << "Received replSetUpdatePosition for node with memberId " << args.memberId
1482             << " which is negative and therefore invalid";
1483         LOG(1) << errmsg;
1484         return Status(ErrorCodes::NodeNotFound, errmsg);
1485     }
1486 
1487     if (args.memberId == _rsConfig.getMemberAt(_selfIndex).getId()) {
1488         // Do not let remote nodes tell us what our optime is.
1489         return Status::OK();
1490     }
1491 
1492     LOG(2) << "received notification that node with memberID " << args.memberId
1493            << " in config with version " << args.cfgver
1494            << " has reached optime: " << args.appliedOpTime
1495            << " and is durable through: " << args.durableOpTime;
1496 
1497     if (args.cfgver != _rsConfig.getConfigVersion()) {
1498         std::string errmsg = str::stream()
1499             << "Received replSetUpdatePosition for node with memberId " << args.memberId
1500             << " whose config version of " << args.cfgver << " doesn't match our config version of "
1501             << _rsConfig.getConfigVersion();
1502         LOG(1) << errmsg;
1503         *configVersion = _rsConfig.getConfigVersion();
1504         return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
1505     }
1506 
1507     auto* memberData = _topCoord->findMemberDataByMemberId(args.memberId);
1508     if (!memberData) {
1509         invariant(!_rsConfig.findMemberByID(args.memberId));
1510 
1511         std::string errmsg = str::stream()
1512             << "Received replSetUpdatePosition for node with memberId " << args.memberId
1513             << " which doesn't exist in our config";
1514         LOG(1) << errmsg;
1515         return Status(ErrorCodes::NodeNotFound, errmsg);
1516     }
1517 
1518     invariant(args.memberId == memberData->getMemberId());
1519 
1520     LOG(3) << "Node with memberID " << args.memberId << " currently has optime "
1521            << memberData->getLastAppliedOpTime() << " durable through "
1522            << memberData->getLastDurableOpTime() << "; updating to optime " << args.appliedOpTime
1523            << " and durable through " << args.durableOpTime;
1524 
1525 
1526     auto now(_replExecutor->now());
1527     bool advancedOpTime = memberData->advanceLastAppliedOpTime(args.appliedOpTime, now);
1528     advancedOpTime =
1529         memberData->advanceLastDurableOpTime(args.durableOpTime, now) || advancedOpTime;
1530 
1531     // Only update committed optime if the remote optimes increased.
1532     if (advancedOpTime) {
1533         _updateLastCommittedOpTime_inlock();
1534     }
1535 
1536     _cancelAndRescheduleLivenessUpdate_inlock(args.memberId);
1537     return Status::OK();
1538 }
1539 
_doneWaitingForReplication_inlock(const OpTime & opTime,Timestamp minSnapshot,const WriteConcernOptions & writeConcern)1540 bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
1541     const OpTime& opTime, Timestamp minSnapshot, const WriteConcernOptions& writeConcern) {
1542     // The syncMode cannot be unset.
1543     invariant(writeConcern.syncMode != WriteConcernOptions::SyncMode::UNSET);
1544     Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
1545     if (!status.isOK()) {
1546         return true;
1547     }
1548 
1549     const bool useDurableOpTime = writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL;
1550 
1551     if (writeConcern.wMode.empty()) {
1552         return _topCoord->haveNumNodesReachedOpTime(
1553             opTime, writeConcern.wNumNodes, useDurableOpTime);
1554     }
1555 
1556     StringData patternName;
1557     if (writeConcern.wMode == WriteConcernOptions::kMajority) {
1558         if (_externalState->snapshotsEnabled() && !testingSnapshotBehaviorInIsolation) {
1559             // Make sure we have a valid "committed" snapshot up to the needed optime.
1560             if (!_currentCommittedSnapshot) {
1561                 return false;
1562             }
1563 
1564             // Wait for the "current" snapshot to advance to/past the opTime.
1565             const auto haveSnapshot = (_currentCommittedSnapshot >= opTime &&
1566                                        _currentCommittedSnapshot->getTimestamp() >= minSnapshot);
1567             if (!haveSnapshot) {
1568                 LOG(1) << "Required snapshot optime: " << opTime << " is not yet part of the "
1569                        << "current 'committed' snapshot: " << *_currentCommittedSnapshot;
1570                 return false;
1571             }
1572 
1573             // Fallthrough to wait for "majority" write concern.
1574         }
1575         // Continue and wait for replication to the majority (of voters).
1576         // *** Needed for J:True, writeConcernMajorityShouldJournal:False (appliedOpTime snapshot).
1577         patternName = ReplSetConfig::kMajorityWriteConcernModeName;
1578     } else {
1579         patternName = writeConcern.wMode;
1580     }
1581 
1582     StatusWith<ReplSetTagPattern> tagPattern = _rsConfig.findCustomWriteMode(patternName);
1583     if (!tagPattern.isOK()) {
1584         return true;
1585     }
1586     return _topCoord->haveTaggedNodesReachedOpTime(opTime, tagPattern.getValue(), useDurableOpTime);
1587 }
1588 
awaitReplication(OperationContext * opCtx,const OpTime & opTime,const WriteConcernOptions & writeConcern)1589 ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication(
1590     OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
1591     Timer timer;
1592     WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
1593     stdx::unique_lock<stdx::mutex> lock(_mutex);
1594     auto status = _awaitReplication_inlock(&lock, opCtx, opTime, Timestamp(), fixedWriteConcern);
1595     return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
1596 }
1597 
1598 ReplicationCoordinator::StatusAndDuration
awaitReplicationOfLastOpForClient(OperationContext * opCtx,const WriteConcernOptions & writeConcern)1599 ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient(
1600     OperationContext* opCtx, const WriteConcernOptions& writeConcern) {
1601     Timer timer;
1602     WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
1603     stdx::unique_lock<stdx::mutex> lock(_mutex);
1604     const auto& clientInfo = ReplClientInfo::forClient(opCtx->getClient());
1605     auto status = _awaitReplication_inlock(
1606         &lock, opCtx, clientInfo.getLastOp(), clientInfo.getLastSnapshot(), fixedWriteConcern);
1607     return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
1608 }
1609 
_awaitReplication_inlock(stdx::unique_lock<stdx::mutex> * lock,OperationContext * opCtx,const OpTime & opTime,Timestamp minSnapshot,const WriteConcernOptions & writeConcern)1610 Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
1611     stdx::unique_lock<stdx::mutex>* lock,
1612     OperationContext* opCtx,
1613     const OpTime& opTime,
1614     Timestamp minSnapshot,
1615     const WriteConcernOptions& writeConcern) {
1616 
1617     // We should never wait for replication if we are holding any locks, because this can
1618     // potentially block for long time while doing network activity.
1619     if (opCtx->lockState()->isLocked()) {
1620         return {ErrorCodes::IllegalOperation,
1621                 "Waiting for replication not allowed while holding a lock"};
1622     }
1623 
1624     const Mode replMode = getReplicationMode();
1625     if (replMode == modeNone) {
1626         // no replication check needed (validated above)
1627         return Status::OK();
1628     }
1629 
1630     if (replMode == modeMasterSlave && writeConcern.wMode == WriteConcernOptions::kMajority) {
1631         // with master/slave, majority is equivalent to w=1
1632         return Status::OK();
1633     }
1634 
1635     if (opTime.isNull() && minSnapshot == Timestamp()) {
1636         // If waiting for the empty optime, always say it's been replicated.
1637         return Status::OK();
1638     }
1639 
1640     auto checkForStepDown = [&]() -> Status {
1641         if (replMode == modeReplSet && !_memberState.primary()) {
1642             return {ErrorCodes::PrimarySteppedDown,
1643                     "Primary stepped down while waiting for replication"};
1644         }
1645 
1646         if (opTime.getTerm() != _topCoord->getTerm()) {
1647             return {
1648                 ErrorCodes::PrimarySteppedDown,
1649                 str::stream() << "Term changed from " << opTime.getTerm() << " to "
1650                               << _topCoord->getTerm()
1651                               << " while waiting for replication, indicating that this node must "
1652                                  "have stepped down."};
1653         }
1654 
1655         if (_topCoord->isSteppingDown()) {
1656             return {ErrorCodes::PrimarySteppedDown,
1657                     "Received stepdown request while waiting for replication"};
1658         }
1659         return Status::OK();
1660     };
1661 
1662     Status stepdownStatus = checkForStepDown();
1663     if (!stepdownStatus.isOK()) {
1664         return stepdownStatus;
1665     }
1666 
1667     auto interruptStatus = opCtx->checkForInterruptNoAssert();
1668     if (!interruptStatus.isOK()) {
1669         return interruptStatus;
1670     }
1671 
1672     if (writeConcern.wMode.empty()) {
1673         if (writeConcern.wNumNodes < 1) {
1674             return Status::OK();
1675         } else if (writeConcern.wNumNodes == 1 && _getMyLastAppliedOpTime_inlock() >= opTime) {
1676             return Status::OK();
1677         }
1678     }
1679 
1680     auto clockSource = opCtx->getServiceContext()->getPreciseClockSource();
1681     const auto wTimeoutDate = [&]() -> const Date_t {
1682         if (writeConcern.wDeadline != Date_t::max()) {
1683             return writeConcern.wDeadline;
1684         }
1685         if (writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
1686             return Date_t::max();
1687         }
1688         return clockSource->now() + clockSource->getPrecision() +
1689             Milliseconds{writeConcern.wTimeout};
1690     }();
1691 
1692     // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
1693     stdx::condition_variable condVar;
1694     ThreadWaiter waiter(opTime, &writeConcern, &condVar);
1695     WaiterGuard guard(&_replicationWaiterList, &waiter);
1696     while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
1697 
1698         if (_inShutdown) {
1699             return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"};
1700         }
1701 
1702         auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate);
1703         if (!status.isOK()) {
1704             return status.getStatus();
1705         }
1706 
1707         if (status.getValue() == stdx::cv_status::timeout) {
1708             if (Command::testCommandsEnabled) {
1709                 // log state of replica set on timeout to help with diagnosis.
1710                 BSONObjBuilder progress;
1711                 _topCoord->fillMemberData(&progress);
1712                 log() << "Replication for failed WC: " << writeConcern.toBSON()
1713                       << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID()
1714                       << ", progress: " << progress.done();
1715             }
1716             return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
1717         }
1718 
1719         stepdownStatus = checkForStepDown();
1720         if (!stepdownStatus.isOK()) {
1721             return stepdownStatus;
1722         }
1723     }
1724 
1725     return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
1726 }
1727 
waitForStepDownAttempt_forTest()1728 void ReplicationCoordinatorImpl::waitForStepDownAttempt_forTest() {
1729     stdx::unique_lock<stdx::mutex> lk(_mutex);
1730     while (!_topCoord->isSteppingDown()) {
1731         _stepDownWaiters.wait(lk);
1732     }
1733 }
1734 
stepDown(OperationContext * opCtx,const bool force,const Milliseconds & waitTime,const Milliseconds & stepdownTime)1735 Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
1736                                             const bool force,
1737                                             const Milliseconds& waitTime,
1738                                             const Milliseconds& stepdownTime) {
1739 
1740     const Date_t startTime = _replExecutor->now();
1741     const Date_t stepDownUntil = startTime + stepdownTime;
1742     const Date_t waitUntil = startTime + waitTime;
1743 
1744     if (!getMemberState().primary()) {
1745         // Note this check is inherently racy - it's always possible for the node to
1746         // stepdown from some other path before we acquire the global exclusive lock.  This check
1747         // is just to try to save us from acquiring the global X lock unnecessarily.
1748         return {ErrorCodes::NotMaster, "not primary so can't step down"};
1749     }
1750 
1751     auto globalLock = stdx::make_unique<Lock::GlobalLock>(
1752         opCtx, MODE_X, durationCount<Milliseconds>(stepdownTime), Lock::GlobalLock::EnqueueOnly());
1753 
1754     // We've requested the global exclusive lock which will stop new operations from coming in,
1755     // but existing operations could take a long time to finish, so kill all user operations
1756     // to help us get the global lock faster.
1757     _externalState->killAllUserOperations(opCtx);
1758 
1759     globalLock->waitForLock(durationCount<Milliseconds>(stepdownTime));
1760     if (!globalLock->isLocked()) {
1761         return {ErrorCodes::ExceededTimeLimit,
1762                 "Could not acquire the global shared lock within the amount of time "
1763                 "specified that we should step down for"};
1764     }
1765 
1766     stdx::unique_lock<stdx::mutex> lk(_mutex);
1767 
1768     auto status = opCtx->checkForInterruptNoAssert();
1769     if (!status.isOK()) {
1770         return status;
1771     }
1772 
1773     const long long termAtStart = _topCoord->getTerm();
1774 
1775     auto statusWithAbortFn = _topCoord->prepareForStepDownAttempt();
1776     if (!statusWithAbortFn.isOK()) {
1777         // This will cause us to fail if we're already in the process of stepping down.
1778         // It is also possible to get here even if we're done stepping down via another path,
1779         // and this will also elicit a failure from this call.
1780         return statusWithAbortFn.getStatus();
1781     }
1782     const auto& abortFn = statusWithAbortFn.getValue();
1783 
1784     // Wake up threads blocked in waitForStepDownAttempt_forTest.
1785     _stepDownWaiters.notify_all();
1786 
1787     // Update _canAcceptNonLocalWrites from the TopologyCoordinator now that we're in the middle
1788     // of a stepdown attempt.  This will prevent us from accepting writes so that if our stepdown
1789     // attempt fails later we can release the global lock and go to sleep to allow secondaries to
1790     // catch up without allowing new writes in.
1791     auto action = _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
1792     invariant(action == PostMemberStateUpdateAction::kActionNone);
1793     invariant(!_canAcceptNonLocalWrites.loadRelaxed());
1794 
1795     // Make sure that we leave _canAcceptNonLocalWrites in the proper state.
1796     auto updateMemberState = [&] {
1797         invariant(lk.owns_lock());
1798         invariant(opCtx->lockState()->isW());
1799 
1800         auto action = _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
1801         // Seems unlikely but handle kActionWinElection in case some surprising sequence leads here.
1802         if (action == kActionWinElection) {
1803             _postWonElectionUpdateMemberState_inlock();
1804             lk.unlock();
1805         } else {
1806             lk.unlock();
1807 
1808             if (MONGO_FAIL_POINT(stepdownHangBeforePerformingPostMemberStateUpdateActions)) {
1809                 log() << "stepping down from primary - "
1810                          "stepdownHangBeforePerformingPostMemberStateUpdateActions fail point "
1811                          "enabled. Blocking until fail point is disabled.";
1812                 while (MONGO_FAIL_POINT(stepdownHangBeforePerformingPostMemberStateUpdateActions)) {
1813                     mongo::sleepsecs(1);
1814                     {
1815                         stdx::lock_guard<stdx::mutex> lock(_mutex);
1816                         if (_inShutdown) {
1817                             break;
1818                         }
1819                     }
1820                 }
1821             }
1822 
1823             _performPostMemberStateUpdateAction(action);
1824         }
1825     };
1826     ScopeGuard onExitGuard = MakeGuard([&] {
1827         abortFn();
1828         updateMemberState();
1829     });
1830 
1831     try {
1832 
1833         while (!_topCoord->attemptStepDown(
1834             termAtStart, _replExecutor->now(), waitUntil, stepDownUntil, force)) {
1835 
1836             // The stepdown attempt failed. Now release the global lock to allow secondaries
1837             // to read the oplog, then wait until enough secondaries are caught up for us to
1838             // finish stepdown.
1839             globalLock.reset();
1840             invariant(!opCtx->lockState()->isLocked());
1841 
1842             // Make sure we re-acquire the global lock before returning so that we're always holding
1843             // the
1844             // global lock when the onExitGuard set up earlier runs.
1845             ON_BLOCK_EXIT([&] {
1846                 // Need to release _mutex before re-acquiring the global lock to preserve lock
1847                 // acquisition order rules.
1848                 lk.unlock();
1849 
1850                 // Need to re-acquire the global lock before re-attempting stepdown.
1851                 // We use no timeout here even though that means the lock acquisition could take
1852                 // longer
1853                 // than the stepdown window.  If that happens, the call to _tryToStepDown
1854                 // immediately
1855                 // after will error.  Since we'll need the global lock no matter what to clean up a
1856                 // failed stepdown attempt, we might as well spend whatever time we need to acquire
1857                 // it
1858                 // now.
1859                 globalLock.reset(new Lock::GlobalLock(opCtx, MODE_X, UINT_MAX));
1860                 invariant(globalLock->isLocked());
1861                 lk.lock();
1862             });
1863 
1864             // We ignore the case where waitForConditionOrInterruptUntil returns
1865             // stdx::cv_status::timeout because in that case coming back around the loop and calling
1866             // attemptStepDown again will cause attemptStepDown to return ExceededTimeLimit with
1867             // the proper error message.
1868             opCtx->waitForConditionOrInterruptUntil(
1869                 _stepDownWaiters, lk, std::min(stepDownUntil, waitUntil));
1870         }
1871     } catch (const DBException& e) {
1872         return e.toStatus();
1873     }
1874 
1875     // Stepdown success!
1876     onExitGuard.Dismiss();
1877     updateMemberState();
1878 
1879     // Schedule work to (potentially) step back up once the stepdown period has ended.
1880     _scheduleWorkAt(
1881         stepDownUntil,
1882         stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, this, stdx::placeholders::_1));
1883 
1884     // If election handoff is enabled, schedule a step-up immediately instead of waiting for the
1885     // election timeout to expire. ReplSetStepUp is only supported in PV1.
1886     if (isV1ElectionProtocol() && !force && enableElectionHandoff.load()) {
1887         _performElectionHandoff();
1888     }
1889     return Status::OK();
1890 }
1891 
_signalStepDownWaiterIfReady_inlock()1892 void ReplicationCoordinatorImpl::_signalStepDownWaiterIfReady_inlock() {
1893     if (_topCoord->isSteppingDown() && _topCoord->isSafeToStepDown()) {
1894         _stepDownWaiters.notify_all();
1895     }
1896 }
1897 
_performElectionHandoff()1898 void ReplicationCoordinatorImpl::_performElectionHandoff() {
1899     stdx::lock_guard<stdx::mutex> lock(_mutex);
1900     auto candidateIndex = _topCoord->chooseElectionHandoffCandidate();
1901 
1902     if (candidateIndex < 0) {
1903         log() << "Could not find node to hand off election to.";
1904         return;
1905     }
1906 
1907     auto target = _rsConfig.getMemberAt(candidateIndex).getHostAndPort();
1908     executor::RemoteCommandRequest request(
1909         target, "admin", BSON("replSetStepUp" << 1 << "skipDryRun" << true), nullptr);
1910     log() << "Handing off election to " << target;
1911 
1912     auto callbackHandleSW = _replExecutor->scheduleRemoteCommand(
1913         request, [target](const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackData) {
1914             auto status = callbackData.response.status;
1915 
1916             if (status.isOK()) {
1917                 LOG(1) << "replSetStepUp request to " << target << " succeeded with response -- "
1918                        << callbackData.response.data;
1919             } else {
1920                 log() << "replSetStepUp request to " << target << " failed due to " << status;
1921             }
1922         });
1923 
1924     auto callbackHandleStatus = callbackHandleSW.getStatus();
1925     if (!callbackHandleStatus.isOK()) {
1926         error() << "Failed to schedule ReplSetStepUp request to " << target
1927                 << " for election handoff: " << callbackHandleStatus;
1928     }
1929 }
1930 
_handleTimePassing(const executor::TaskExecutor::CallbackArgs & cbData)1931 void ReplicationCoordinatorImpl::_handleTimePassing(
1932     const executor::TaskExecutor::CallbackArgs& cbData) {
1933     if (!cbData.status.isOK()) {
1934         return;
1935     }
1936 
1937     // For election protocol v1, call _startElectSelfIfEligibleV1 to avoid race
1938     // against other elections caused by events like election timeout, replSetStepUp etc.
1939     if (isV1ElectionProtocol()) {
1940         _startElectSelfIfEligibleV1(
1941             TopologyCoordinator::StartElectionReason::kSingleNodePromptElection);
1942         return;
1943     }
1944 
1945     bool wonSingleNodeElection = [this]() {
1946         stdx::lock_guard<stdx::mutex> lk(_mutex);
1947         return _topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor->now());
1948     }();
1949 
1950     if (wonSingleNodeElection) {
1951         stdx::lock_guard<stdx::mutex> lk(_mutex);
1952         _postWonElectionUpdateMemberState_inlock();
1953     }
1954 }
1955 
isMasterForReportingPurposes()1956 bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() {
1957     if (_settings.usingReplSets()) {
1958         stdx::lock_guard<stdx::mutex> lock(_mutex);
1959         if (getReplicationMode() == modeReplSet && _getMemberState_inlock().primary()) {
1960             return true;
1961         }
1962         return false;
1963     }
1964 
1965     if (!_settings.isSlave())
1966         return true;
1967 
1968 
1969     // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed
1970     if (replAllDead) {
1971         return false;
1972     }
1973 
1974     if (_settings.isMaster()) {
1975         // if running with --master --slave, allow.
1976         return true;
1977     }
1978 
1979     return false;
1980 }
1981 
canAcceptWritesForDatabase(OperationContext * opCtx,StringData dbName)1982 bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* opCtx,
1983                                                             StringData dbName) {
1984     // The answer isn't meaningful unless we hold the global lock.
1985     invariant(opCtx->lockState()->isLocked());
1986     return canAcceptWritesForDatabase_UNSAFE(opCtx, dbName);
1987 }
1988 
canAcceptWritesForDatabase_UNSAFE(OperationContext * opCtx,StringData dbName)1989 bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx,
1990                                                                    StringData dbName) {
1991     // _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes
1992     // started with --slave, and adjusted based on primary+drain state in replica sets.
1993     //
1994     // That is, stand-alone nodes, non-slave nodes and drained replica set primaries can always
1995     // accept writes.  Similarly, writes are always permitted to the "local" database.  Finally,
1996     // in the event that a node is started with --slave and --master, we allow writes unless the
1997     // master/slave system has set the replAllDead flag.
1998     if (_canAcceptNonLocalWrites.loadRelaxed() || alwaysAllowNonLocalWrites(opCtx)) {
1999         return true;
2000     }
2001     if (dbName == kLocalDB) {
2002         return true;
2003     }
2004     return !replAllDead && _settings.isMaster();
2005 }
2006 
canAcceptWritesFor(OperationContext * opCtx,const NamespaceString & ns)2007 bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* opCtx,
2008                                                     const NamespaceString& ns) {
2009     invariant(opCtx->lockState()->isLocked());
2010     return canAcceptWritesFor_UNSAFE(opCtx, ns);
2011 }
2012 
canAcceptWritesFor_UNSAFE(OperationContext * opCtx,const NamespaceString & ns)2013 bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* opCtx,
2014                                                            const NamespaceString& ns) {
2015     StringData dbName = ns.db();
2016     bool canWriteToDB = canAcceptWritesForDatabase_UNSAFE(opCtx, dbName);
2017 
2018     if (!canWriteToDB && !ns.isSystemDotProfile()) {
2019         return false;
2020     }
2021 
2022     // Even if we think we can write to the database we need to make sure we're not trying
2023     // to write to the oplog in ROLLBACK.
2024     // If we can accept non local writes (ie we're PRIMARY) then we must not be in ROLLBACK.
2025     // This check is redundant of the check of _memberState below, but since this can be checked
2026     // without locking, we do it as an optimization.
2027     if (_canAcceptNonLocalWrites.loadRelaxed() || alwaysAllowNonLocalWrites(opCtx)) {
2028         return true;
2029     }
2030 
2031     if (!ns.isOplog()) {
2032         return true;
2033     }
2034 
2035     stdx::lock_guard<stdx::mutex> lock(_mutex);
2036     if (_memberState.rollback()) {
2037         return false;
2038     }
2039     return true;
2040 }
2041 
checkCanServeReadsFor(OperationContext * opCtx,const NamespaceString & ns,bool slaveOk)2042 Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* opCtx,
2043                                                          const NamespaceString& ns,
2044                                                          bool slaveOk) {
2045     invariant(opCtx->lockState()->isLocked());
2046     return checkCanServeReadsFor_UNSAFE(opCtx, ns, slaveOk);
2047 }
2048 
checkCanServeReadsFor_UNSAFE(OperationContext * opCtx,const NamespaceString & ns,bool slaveOk)2049 Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext* opCtx,
2050                                                                 const NamespaceString& ns,
2051                                                                 bool slaveOk) {
2052     auto client = opCtx->getClient();
2053     bool isPrimaryOrSecondary = _canServeNonLocalReads.loadRelaxed();
2054 
2055     // Oplog reads are not allowed during STARTUP state, but we make an exception for internal
2056     // reads. Internal reads are required for cleaning up unfinished apply batches.
2057     if (!isPrimaryOrSecondary && getReplicationMode() == modeReplSet && ns.isOplog()) {
2058         stdx::lock_guard<stdx::mutex> lock(_mutex);
2059         if ((_memberState.startup() && client->isFromUserConnection()) || _memberState.startup2() ||
2060             _memberState.rollback()) {
2061             return Status{ErrorCodes::NotMasterOrSecondary,
2062                           "Oplog collection reads are not allowed while in the rollback or "
2063                           "startup state."};
2064         }
2065     }
2066 
2067     if (client->isInDirectClient()) {
2068         return Status::OK();
2069     }
2070     if (canAcceptWritesFor_UNSAFE(opCtx, ns)) {
2071         return Status::OK();
2072     }
2073     if (getReplicationMode() == modeMasterSlave) {
2074         return Status::OK();
2075     }
2076     if (slaveOk) {
2077         if (isPrimaryOrSecondary) {
2078             return Status::OK();
2079         }
2080         return Status(ErrorCodes::NotMasterOrSecondary,
2081                       "not master or secondary; cannot currently read from this replSet member");
2082     }
2083     return Status(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false");
2084 }
2085 
isInPrimaryOrSecondaryState() const2086 bool ReplicationCoordinatorImpl::isInPrimaryOrSecondaryState() const {
2087     return _canServeNonLocalReads.loadRelaxed();
2088 }
2089 
shouldRelaxIndexConstraints(OperationContext * opCtx,const NamespaceString & ns)2090 bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* opCtx,
2091                                                              const NamespaceString& ns) {
2092     return !canAcceptWritesFor(opCtx, ns);
2093 }
2094 
getElectionId()2095 OID ReplicationCoordinatorImpl::getElectionId() {
2096     stdx::lock_guard<stdx::mutex> lock(_mutex);
2097     return _electionId;
2098 }
2099 
getMyRID() const2100 OID ReplicationCoordinatorImpl::getMyRID() const {
2101     stdx::lock_guard<stdx::mutex> lock(_mutex);
2102     return _getMyRID_inlock();
2103 }
2104 
_getMyRID_inlock() const2105 OID ReplicationCoordinatorImpl::_getMyRID_inlock() const {
2106     return _myRID;
2107 }
2108 
getMyId() const2109 int ReplicationCoordinatorImpl::getMyId() const {
2110     stdx::lock_guard<stdx::mutex> lock(_mutex);
2111     return _getMyId_inlock();
2112 }
2113 
_getMyId_inlock() const2114 int ReplicationCoordinatorImpl::_getMyId_inlock() const {
2115     const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex);
2116     return self.getId();
2117 }
2118 
resyncData(OperationContext * opCtx,bool waitUntilCompleted)2119 Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool waitUntilCompleted) {
2120     _stopDataReplication(opCtx);
2121     auto finishedEvent = uassertStatusOK(_replExecutor->makeEvent());
2122     stdx::function<void()> f;
2123     if (waitUntilCompleted)
2124         f = [&finishedEvent, this]() { _replExecutor->signalEvent(finishedEvent); };
2125 
2126     {
2127         stdx::lock_guard<stdx::mutex> lk(_mutex);
2128         _resetMyLastOpTimes_inlock();
2129     }
2130     // unlock before calling _startDataReplication().
2131     _startDataReplication(opCtx, f);
2132     if (waitUntilCompleted) {
2133         _replExecutor->waitForEvent(finishedEvent);
2134     }
2135     return Status::OK();
2136 }
2137 
prepareReplSetUpdatePositionCommand() const2138 StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand() const {
2139     stdx::lock_guard<stdx::mutex> lock(_mutex);
2140     return _topCoord->prepareReplSetUpdatePositionCommand(
2141         _getCurrentCommittedSnapshotOpTime_inlock());
2142 }
2143 
processReplSetGetStatus(BSONObjBuilder * response,ReplSetGetStatusResponseStyle responseStyle)2144 Status ReplicationCoordinatorImpl::processReplSetGetStatus(
2145     BSONObjBuilder* response, ReplSetGetStatusResponseStyle responseStyle) {
2146 
2147     BSONObj initialSyncProgress;
2148     if (responseStyle == ReplSetGetStatusResponseStyle::kInitialSync) {
2149         std::shared_ptr<InitialSyncer> initialSyncerCopy;
2150         {
2151             stdx::lock_guard<stdx::mutex> lk(_mutex);
2152             initialSyncerCopy = _initialSyncer;
2153         }
2154 
2155         // getInitialSyncProgress must be called outside the ReplicationCoordinatorImpl::_mutex
2156         // lock. Else it might deadlock with InitialSyncer::_multiApplierCallback where it first
2157         // acquires InitialSyncer::_mutex and then ReplicationCoordinatorImpl::_mutex.
2158         if (initialSyncerCopy) {
2159             initialSyncProgress = initialSyncerCopy->getInitialSyncProgress();
2160         }
2161     }
2162 
2163     stdx::lock_guard<stdx::mutex> lk(_mutex);
2164     Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse");
2165     _topCoord->prepareStatusResponse(
2166         TopologyCoordinator::ReplSetStatusArgs{
2167             _replExecutor->now(),
2168             static_cast<unsigned>(time(0) - serverGlobalParams.started),
2169             _getCurrentCommittedSnapshotOpTime_inlock(),
2170             initialSyncProgress},
2171         response,
2172         &result);
2173     return result;
2174 }
2175 
fillIsMasterForReplSet(IsMasterResponse * response,const SplitHorizon::Parameters & horizonParams)2176 void ReplicationCoordinatorImpl::fillIsMasterForReplSet(
2177     IsMasterResponse* response, const SplitHorizon::Parameters& horizonParams) {
2178     invariant(getSettings().usingReplSets());
2179 
2180     stdx::lock_guard<stdx::mutex> lk(_mutex);
2181     _topCoord->fillIsMasterForReplSet(response, horizonParams);
2182 
2183     OpTime lastOpTime = _getMyLastAppliedOpTime_inlock();
2184     response->setLastWrite(lastOpTime, lastOpTime.getTimestamp().getSecs());
2185     if (_currentCommittedSnapshot) {
2186         response->setLastMajorityWrite(_currentCommittedSnapshot.get(),
2187                                        _currentCommittedSnapshot->getTimestamp().getSecs());
2188     }
2189 
2190     if (response->isMaster() && !_canAcceptNonLocalWrites.loadRelaxed()) {
2191         // Report that we are secondary to ismaster callers until drain completes.
2192         response->setIsMaster(false);
2193         response->setIsSecondary(true);
2194     }
2195 
2196     if (_inShutdown) {
2197         response->setIsMaster(false);
2198         response->setIsSecondary(false);
2199     }
2200 }
2201 
appendSlaveInfoData(BSONObjBuilder * result)2202 void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) {
2203     stdx::lock_guard<stdx::mutex> lock(_mutex);
2204     _topCoord->fillMemberData(result);
2205 }
2206 
getConfig() const2207 ReplSetConfig ReplicationCoordinatorImpl::getConfig() const {
2208     stdx::lock_guard<stdx::mutex> lock(_mutex);
2209     return _rsConfig;
2210 }
2211 
processReplSetGetConfig(BSONObjBuilder * result)2212 void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) {
2213     stdx::lock_guard<stdx::mutex> lock(_mutex);
2214     result->append("config", _rsConfig.toBSON());
2215 }
2216 
processReplSetMetadata(const rpc::ReplSetMetadata & replMetadata)2217 void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {
2218     EventHandle evh;
2219 
2220     {
2221         stdx::lock_guard<stdx::mutex> lock(_mutex);
2222         evh = _processReplSetMetadata_inlock(replMetadata);
2223     }
2224 
2225     if (evh) {
2226         _replExecutor->waitForEvent(evh);
2227     }
2228 }
2229 
cancelAndRescheduleElectionTimeout()2230 void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() {
2231     stdx::lock_guard<stdx::mutex> lock(_mutex);
2232     _cancelAndRescheduleElectionTimeout_inlock();
2233 }
2234 
_processReplSetMetadata_inlock(const rpc::ReplSetMetadata & replMetadata)2235 EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_inlock(
2236     const rpc::ReplSetMetadata& replMetadata) {
2237     if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
2238         return EventHandle();
2239     }
2240     return _updateTerm_inlock(replMetadata.getTerm());
2241 }
2242 
getMaintenanceMode()2243 bool ReplicationCoordinatorImpl::getMaintenanceMode() {
2244     stdx::lock_guard<stdx::mutex> lk(_mutex);
2245     return _topCoord->getMaintenanceCount() > 0;
2246 }
2247 
setMaintenanceMode(bool activate)2248 Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
2249     if (getReplicationMode() != modeReplSet) {
2250         return Status(ErrorCodes::NoReplicationEnabled,
2251                       "can only set maintenance mode on replica set members");
2252     }
2253 
2254     stdx::unique_lock<stdx::mutex> lk(_mutex);
2255     if (_topCoord->getRole() == TopologyCoordinator::Role::kCandidate) {
2256         return Status(ErrorCodes::NotSecondary, "currently running for election");
2257     }
2258 
2259     if (_getMemberState_inlock().primary()) {
2260         return Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode");
2261     }
2262 
2263     int curMaintenanceCalls = _topCoord->getMaintenanceCount();
2264     if (activate) {
2265         log() << "going into maintenance mode with " << curMaintenanceCalls
2266               << " other maintenance mode tasks in progress" << rsLog;
2267         _topCoord->adjustMaintenanceCountBy(1);
2268     } else if (curMaintenanceCalls > 0) {
2269         invariant(_topCoord->getRole() == TopologyCoordinator::Role::kFollower);
2270 
2271         _topCoord->adjustMaintenanceCountBy(-1);
2272 
2273         log() << "leaving maintenance mode (" << curMaintenanceCalls - 1
2274               << " other maintenance mode tasks ongoing)" << rsLog;
2275     } else {
2276         warning() << "Attempted to leave maintenance mode but it is not currently active";
2277         return Status(ErrorCodes::OperationFailed, "already out of maintenance mode");
2278     }
2279 
2280     const PostMemberStateUpdateAction action =
2281         _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
2282 
2283     if (action == kActionWinElection) {
2284         _postWonElectionUpdateMemberState_inlock();
2285     } else {
2286         lk.unlock();
2287         _performPostMemberStateUpdateAction(action);
2288     }
2289 
2290     return Status::OK();
2291 }
2292 
processReplSetSyncFrom(OperationContext * opCtx,const HostAndPort & target,BSONObjBuilder * resultObj)2293 Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCtx,
2294                                                           const HostAndPort& target,
2295                                                           BSONObjBuilder* resultObj) {
2296     Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
2297     auto doResync = false;
2298     {
2299         stdx::lock_guard<stdx::mutex> lk(_mutex);
2300         _topCoord->prepareSyncFromResponse(target, resultObj, &result);
2301         // If we are in the middle of an initial sync, do a resync.
2302         doResync = result.isOK() && _initialSyncer && _initialSyncer->isActive();
2303     }
2304 
2305     if (doResync) {
2306         return resyncData(opCtx, false);
2307     }
2308 
2309     return result;
2310 }
2311 
processReplSetFreeze(int secs,BSONObjBuilder * resultObj)2312 Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) {
2313     auto result = [=]() {
2314         stdx::lock_guard<stdx::mutex> lock(_mutex);
2315         return _topCoord->prepareFreezeResponse(_replExecutor->now(), secs, resultObj);
2316     }();
2317     if (!result.isOK()) {
2318         return result.getStatus();
2319     }
2320 
2321     if (TopologyCoordinator::PrepareFreezeResponseResult::kSingleNodeSelfElect ==
2322         result.getValue()) {
2323         if (isV1ElectionProtocol()) {
2324             // For election protocol v1, call _startElectSelfIfEligibleV1 to avoid race
2325             // against other elections caused by events like election timeout, replSetStepUp etc.
2326             _startElectSelfIfEligibleV1(
2327                 TopologyCoordinator::StartElectionReason::kSingleNodePromptElection);
2328         } else {
2329             // If we just unfroze and ended our stepdown period and we are a one node replica set,
2330             // the topology coordinator will have gone into the candidate role to signal that we
2331             // need to elect ourself.
2332             _performPostMemberStateUpdateAction(kActionWinElection);
2333         }
2334     }
2335 
2336     return Status::OK();
2337 }
2338 
processHeartbeat(const ReplSetHeartbeatArgs & args,ReplSetHeartbeatResponse * response)2339 Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& args,
2340                                                     ReplSetHeartbeatResponse* response) {
2341     {
2342         stdx::lock_guard<stdx::mutex> lock(_mutex);
2343         if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
2344             return Status(ErrorCodes::NotYetInitialized,
2345                           "Received heartbeat while still initializing replication system");
2346         }
2347     }
2348 
2349     auto senderHost(args.getSenderHost());
2350 
2351     stdx::lock_guard<stdx::mutex> lk(_mutex);
2352 
2353     const Date_t now = _replExecutor->now();
2354     Status result =
2355         _topCoord->prepareHeartbeatResponse(now, args, _settings.ourSetName(), response);
2356     if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
2357         // If this node does not belong to the configuration it knows about, send heartbeats
2358         // back to any node that sends us a heartbeat, in case one of those remote nodes has
2359         // a configuration that contains us.  Chances are excellent that it will, since that
2360         // is the only reason for a remote node to send this node a heartbeat request.
2361         if (!senderHost.empty() && _seedList.insert(senderHost).second) {
2362             _scheduleHeartbeatToTarget_inlock(senderHost, -1, now);
2363         }
2364     } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) {
2365         // Schedule a heartbeat to the sender to fetch the new config.
2366         // We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
2367         // will trigger reconfig, which cancels and reschedules all heartbeats.
2368 
2369         if (args.hasSenderHost()) {
2370             int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
2371             _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now);
2372         }
2373     }
2374     return result;
2375 }
2376 
processReplSetReconfig(OperationContext * opCtx,const ReplSetReconfigArgs & args,BSONObjBuilder * resultObj)2377 Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCtx,
2378                                                           const ReplSetReconfigArgs& args,
2379                                                           BSONObjBuilder* resultObj) {
2380     log() << "replSetReconfig admin command received from client; new config: "
2381           << args.newConfigObj;
2382 
2383     stdx::unique_lock<stdx::mutex> lk(_mutex);
2384 
2385     while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
2386         _rsConfigStateChange.wait(lk);
2387     }
2388 
2389     switch (_rsConfigState) {
2390         case kConfigSteady:
2391             break;
2392         case kConfigUninitialized:
2393             return Status(ErrorCodes::NotYetInitialized,
2394                           "Node not yet initialized; use the replSetInitiate command");
2395         case kConfigReplicationDisabled:
2396             invariant(
2397                 false);  // should be unreachable due to !_settings.usingReplSets() check above
2398         case kConfigInitiating:
2399         case kConfigReconfiguring:
2400         case kConfigHBReconfiguring:
2401             return Status(ErrorCodes::ConfigurationInProgress,
2402                           "Cannot run replSetReconfig because the node is currently updating "
2403                           "its configuration");
2404         default:
2405             severe() << "Unexpected _rsConfigState " << int(_rsConfigState);
2406             fassertFailed(18914);
2407     }
2408 
2409     invariant(_rsConfig.isInitialized());
2410 
2411     if (!args.force && !_getMemberState_inlock().primary()) {
2412         return Status(ErrorCodes::NotMaster,
2413                       str::stream()
2414                           << "replSetReconfig should only be run on PRIMARY, but my state is "
2415                           << _getMemberState_inlock().toString()
2416                           << "; use the \"force\" argument to override");
2417     }
2418 
2419     _setConfigState_inlock(kConfigReconfiguring);
2420     ScopeGuard configStateGuard = MakeGuard(
2421         lockAndCall,
2422         &lk,
2423         stdx::bind(&ReplicationCoordinatorImpl::_setConfigState_inlock, this, kConfigSteady));
2424 
2425     ReplSetConfig oldConfig = _rsConfig;
2426     lk.unlock();
2427 
2428     ReplSetConfig newConfig;
2429     BSONObj newConfigObj = args.newConfigObj;
2430     if (args.force) {
2431         newConfigObj = incrementConfigVersionByRandom(newConfigObj);
2432     }
2433 
2434     BSONObj oldConfigObj = oldConfig.toBSON();
2435     audit::logReplSetReconfig(opCtx->getClient(), &oldConfigObj, &newConfigObj);
2436 
2437     Status status = newConfig.initialize(
2438         newConfigObj, oldConfig.getProtocolVersion() == 1, oldConfig.getReplicaSetId());
2439 
2440     if (!status.isOK()) {
2441         error() << "replSetReconfig got " << status << " while parsing " << newConfigObj;
2442         return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());
2443         ;
2444     }
2445     if (newConfig.getReplSetName() != _settings.ourSetName()) {
2446         str::stream errmsg;
2447         errmsg << "Attempting to reconfigure a replica set with name " << newConfig.getReplSetName()
2448                << ", but command line reports " << _settings.ourSetName() << "; rejecting";
2449         error() << std::string(errmsg);
2450         return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
2451     }
2452 
2453     StatusWith<int> myIndex = validateConfigForReconfig(
2454         _externalState.get(), oldConfig, newConfig, opCtx->getServiceContext(), args.force);
2455     if (!myIndex.isOK()) {
2456         error() << "replSetReconfig got " << myIndex.getStatus() << " while validating "
2457                 << newConfigObj;
2458         return Status(ErrorCodes::NewReplicaSetConfigurationIncompatible,
2459                       myIndex.getStatus().reason());
2460     }
2461 
2462     log() << "replSetReconfig config object with " << newConfig.getNumMembers()
2463           << " members parses ok";
2464 
2465     if (!args.force) {
2466         status = checkQuorumForReconfig(
2467             _replExecutor.get(), newConfig, myIndex.getValue(), _topCoord->getTerm());
2468         if (!status.isOK()) {
2469             error() << "replSetReconfig failed; " << status;
2470             return status;
2471         }
2472     }
2473 
2474     status = _externalState->storeLocalConfigDocument(opCtx, newConfig.toBSON());
2475     if (!status.isOK()) {
2476         error() << "replSetReconfig failed to store config document; " << status;
2477         return status;
2478     }
2479 
2480     auto reconfigFinished = uassertStatusOK(_replExecutor->makeEvent());
2481     uassertStatusOK(
2482         _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
2483                                                this,
2484                                                stdx::placeholders::_1,
2485                                                newConfig,
2486                                                args.force,
2487                                                myIndex.getValue(),
2488                                                reconfigFinished)));
2489     configStateGuard.Dismiss();
2490     _replExecutor->waitForEvent(reconfigFinished);
2491     return Status::OK();
2492 }
2493 
_finishReplSetReconfig(const executor::TaskExecutor::CallbackArgs & cbData,const ReplSetConfig & newConfig,const bool isForceReconfig,int myIndex,const executor::TaskExecutor::EventHandle & finishedEvent)2494 void ReplicationCoordinatorImpl::_finishReplSetReconfig(
2495     const executor::TaskExecutor::CallbackArgs& cbData,
2496     const ReplSetConfig& newConfig,
2497     const bool isForceReconfig,
2498     int myIndex,
2499     const executor::TaskExecutor::EventHandle& finishedEvent) {
2500 
2501     if (cbData.status == ErrorCodes::CallbackCanceled) {
2502         return;
2503     }
2504     auto opCtx = cc().makeOperationContext();
2505     boost::optional<Lock::GlobalWrite> globalExclusiveLock;
2506     if (isForceReconfig) {
2507         // Since it's a force reconfig, the primary node may not be electable after the
2508         // configuration change.  In case we are that primary node, finish the reconfig under the
2509         // global lock, so that the step down occurs safely.
2510         globalExclusiveLock.emplace(opCtx.get());
2511     }
2512     stdx::unique_lock<stdx::mutex> lk(_mutex);
2513 
2514     invariant(_rsConfigState == kConfigReconfiguring);
2515     invariant(_rsConfig.isInitialized());
2516 
2517     // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
2518     if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
2519         // Wait for the election to complete and the node's Role to be set to follower.
2520         _replExecutor
2521             ->onEvent(electionFinishedEvent,
2522                       stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
2523                                  this,
2524                                  stdx::placeholders::_1,
2525                                  newConfig,
2526                                  isForceReconfig,
2527                                  myIndex,
2528                                  finishedEvent))
2529             .status_with_transitional_ignore();
2530         return;
2531     }
2532 
2533     const ReplSetConfig oldConfig = _rsConfig;
2534     const PostMemberStateUpdateAction action =
2535         _setCurrentRSConfig_inlock(opCtx.get(), newConfig, myIndex);
2536 
2537     // On a reconfig we drop all snapshots so we don't mistakenly read from the wrong one.
2538     // For example, if we change the meaning of the "committed" snapshot from applied -> durable.
2539     _dropAllSnapshots_inlock();
2540 
2541     lk.unlock();
2542     _resetElectionInfoOnProtocolVersionUpgrade(opCtx.get(), oldConfig, newConfig);
2543     if (action == kActionWinElection) {
2544         lk.lock();
2545         _postWonElectionUpdateMemberState_inlock();
2546         lk.unlock();
2547     } else {
2548         _performPostMemberStateUpdateAction(action);
2549     }
2550 
2551     _replExecutor->signalEvent(finishedEvent);
2552 }
2553 
processReplSetInitiate(OperationContext * opCtx,const BSONObj & configObj,BSONObjBuilder * resultObj)2554 Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCtx,
2555                                                           const BSONObj& configObj,
2556                                                           BSONObjBuilder* resultObj) {
2557     log() << "replSetInitiate admin command received from client";
2558 
2559     const auto replEnabled = _settings.usingReplSets();
2560     stdx::unique_lock<stdx::mutex> lk(_mutex);
2561     if (!replEnabled) {
2562         return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet");
2563     }
2564     while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
2565         _rsConfigStateChange.wait(lk);
2566     }
2567 
2568     if (_rsConfigState != kConfigUninitialized) {
2569         resultObj->append("info", "try querying local.system.replset to see current configuration");
2570         return Status(ErrorCodes::AlreadyInitialized, "already initialized");
2571     }
2572     invariant(!_rsConfig.isInitialized());
2573     _setConfigState_inlock(kConfigInitiating);
2574 
2575     ScopeGuard configStateGuard = MakeGuard(
2576         lockAndCall,
2577         &lk,
2578         stdx::bind(
2579             &ReplicationCoordinatorImpl::_setConfigState_inlock, this, kConfigUninitialized));
2580     lk.unlock();
2581 
2582     ReplSetConfig newConfig;
2583     Status status = newConfig.initializeForInitiate(configObj, true);
2584     if (!status.isOK()) {
2585         error() << "replSet initiate got " << status << " while parsing " << configObj;
2586         return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());
2587     }
2588     if (newConfig.getReplSetName() != _settings.ourSetName()) {
2589         str::stream errmsg;
2590         errmsg << "Attempting to initiate a replica set with name " << newConfig.getReplSetName()
2591                << ", but command line reports " << _settings.ourSetName() << "; rejecting";
2592         error() << std::string(errmsg);
2593         return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
2594     }
2595 
2596     StatusWith<int> myIndex =
2597         validateConfigForInitiate(_externalState.get(), newConfig, opCtx->getServiceContext());
2598     if (!myIndex.isOK()) {
2599         error() << "replSet initiate got " << myIndex.getStatus() << " while validating "
2600                 << configObj;
2601         return Status(ErrorCodes::InvalidReplicaSetConfig, myIndex.getStatus().reason());
2602     }
2603 
2604     log() << "replSetInitiate config object with " << newConfig.getNumMembers()
2605           << " members parses ok";
2606 
2607     // In pv1, the TopologyCoordinator has not set the term yet. It will be set to kInitialTerm if
2608     // the initiate succeeds so we pass that here.
2609     status = checkQuorumForInitiate(
2610         _replExecutor.get(),
2611         newConfig,
2612         myIndex.getValue(),
2613         newConfig.getProtocolVersion() == 1 ? OpTime::kInitialTerm : OpTime::kUninitializedTerm);
2614 
2615     if (!status.isOK()) {
2616         error() << "replSetInitiate failed; " << status;
2617         return status;
2618     }
2619 
2620     status = _externalState->initializeReplSetStorage(opCtx, newConfig.toBSON());
2621     if (!status.isOK()) {
2622         error() << "replSetInitiate failed to store config document or create the oplog; "
2623                 << status;
2624         return status;
2625     }
2626 
2627     _replicationProcess->getConsistencyMarkers()->initializeMinValidDocument(opCtx);
2628 
2629     auto lastAppliedOpTime = getMyLastAppliedOpTime();
2630 
2631     // Since the JournalListener has not yet been set up, we must manually set our
2632     // durableOpTime.
2633     setMyLastDurableOpTime(lastAppliedOpTime);
2634 
2635     // Sets the initial data timestamp on the storage engine so it can assign a timestamp
2636     // to data on disk. We do this after writing the "initiating set" oplog entry.
2637     _storage->setInitialDataTimestamp(getServiceContext(), lastAppliedOpTime.getTimestamp());
2638 
2639     _finishReplSetInitiate(opCtx, newConfig, myIndex.getValue());
2640 
2641     // A configuration passed to replSetInitiate() with the current node as an arbiter
2642     // will fail validation with a "replSet initiate got ... while validating" reason.
2643     invariant(!newConfig.getMemberAt(myIndex.getValue()).isArbiter());
2644     _externalState->startThreads(_settings);
2645     _startDataReplication(opCtx);
2646 
2647     configStateGuard.Dismiss();
2648     return Status::OK();
2649 }
2650 
_finishReplSetInitiate(OperationContext * opCtx,const ReplSetConfig & newConfig,int myIndex)2651 void ReplicationCoordinatorImpl::_finishReplSetInitiate(OperationContext* opCtx,
2652                                                         const ReplSetConfig& newConfig,
2653                                                         int myIndex) {
2654     stdx::unique_lock<stdx::mutex> lk(_mutex);
2655     invariant(_rsConfigState == kConfigInitiating);
2656     invariant(!_rsConfig.isInitialized());
2657     auto action = _setCurrentRSConfig_inlock(opCtx, newConfig, myIndex);
2658     if (action == kActionWinElection) {
2659         _postWonElectionUpdateMemberState_inlock();
2660     } else {
2661         lk.unlock();
2662         _performPostMemberStateUpdateAction(action);
2663     }
2664 }
2665 
_setConfigState_inlock(ConfigState newState)2666 void ReplicationCoordinatorImpl::_setConfigState_inlock(ConfigState newState) {
2667     if (newState != _rsConfigState) {
2668         _rsConfigState = newState;
2669         _rsConfigStateChange.notify_all();
2670     }
2671 }
2672 
2673 ReplicationCoordinatorImpl::PostMemberStateUpdateAction
_updateMemberStateFromTopologyCoordinator_inlock(OperationContext * opCtx)2674 ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock(
2675     OperationContext* opCtx) {
2676     {
2677         // We have to do this check even if our current and target state are the same as we might
2678         // have just failed a stepdown attempt and thus are staying in PRIMARY state but restoring
2679         // our ability to accept writes.
2680         bool canAcceptWrites = _topCoord->canAcceptWrites();
2681         if (canAcceptWrites != _canAcceptNonLocalWrites.loadRelaxed()) {
2682             // We must be holding the global X lock to change _canAcceptNonLocalWrites.
2683             invariant(opCtx);
2684             invariant(opCtx->lockState()->isW());
2685         }
2686         _canAcceptNonLocalWrites.store(canAcceptWrites);
2687     }
2688 
2689 
2690     const MemberState newState = _topCoord->getMemberState();
2691     if (newState == _memberState) {
2692         if (_topCoord->getRole() == TopologyCoordinator::Role::kCandidate) {
2693             invariant(_rsConfig.getNumMembers() == 1 && _selfIndex == 0 &&
2694                       _rsConfig.getMemberAt(0).isElectable());
2695             if (isV1ElectionProtocol()) {
2696                 // Start election in protocol version 1
2697                 return kActionStartSingleNodeElection;
2698             }
2699             return kActionWinElection;
2700         }
2701         return kActionNone;
2702     }
2703 
2704     PostMemberStateUpdateAction result;
2705     if (_memberState.primary() || newState.removed() || newState.rollback()) {
2706         // Wake up any threads blocked in awaitReplication, close connections, etc.
2707         _replicationWaiterList.signalAndRemoveAll_inlock();
2708         // Wake up the optime waiter that is waiting for primary catch-up to finish.
2709         _opTimeWaiterList.signalAndRemoveAll_inlock();
2710         // If there are any pending stepdown command requests wake them up.
2711         _stepDownWaiters.notify_all();
2712 
2713         // _canAcceptNonLocalWrites should already be set above.
2714         invariant(!_canAcceptNonLocalWrites.loadRelaxed());
2715 
2716         serverGlobalParams.validateFeaturesAsMaster.store(false);
2717         result = kActionCloseAllConnections;
2718     } else {
2719         result = kActionFollowerModeStateChange;
2720     }
2721 
2722     // Exit catchup mode if we're in it and enable replication producer and applier on stepdown.
2723     if (_memberState.primary()) {
2724         if (_catchupState) {
2725             _catchupState->abort_inlock();
2726         }
2727         _applierState = ApplierState::Running;
2728         _externalState->startProducerIfStopped();
2729     }
2730 
2731     if (_memberState.secondary() && !newState.primary()) {
2732         // Switching out of SECONDARY, but not to PRIMARY.
2733         _canServeNonLocalReads.store(0U);
2734     } else if (!_memberState.primary() && newState.secondary()) {
2735         // Switching into SECONDARY, but not from PRIMARY.
2736         _canServeNonLocalReads.store(1U);
2737     }
2738 
2739     if (newState.secondary() && _topCoord->getRole() == TopologyCoordinator::Role::kCandidate) {
2740         // When transitioning to SECONDARY, the only way for _topCoord to report the candidate
2741         // role is if the configuration represents a single-node replica set.  In that case, the
2742         // overriding requirement is to elect this singleton node primary.
2743         invariant(_rsConfig.getNumMembers() == 1 && _selfIndex == 0 &&
2744                   _rsConfig.getMemberAt(0).isElectable());
2745         if (isV1ElectionProtocol()) {
2746             // Start election in protocol version 1
2747             result = kActionStartSingleNodeElection;
2748         } else {
2749             result = kActionWinElection;
2750         }
2751     }
2752 
2753     if (newState.rollback()) {
2754         // When we start rollback, we need to drop all snapshots since we may need to create
2755         // out-of-order snapshots. This would be necessary even if the SnapshotName was completely
2756         // monotonically increasing because we don't necessarily have a snapshot of every write.
2757         // If we didn't drop all snapshots on rollback it could lead to the following situation:
2758         //
2759         //  |--------|-------------|-------------|
2760         //  | OpTime | HasSnapshot | Committed   |
2761         //  |--------|-------------|-------------|
2762         //  | (0, 1) | *           | *           |
2763         //  | (0, 2) | *           | ROLLED BACK |
2764         //  | (1, 2) |             | *           |
2765         //  |--------|-------------|-------------|
2766         //
2767         // When we try to make (1,2) the commit point, we'd find (0,2) as the newest snapshot
2768         // before the commit point, but it would be invalid to mark it as the committed snapshot
2769         // since it was never committed.
2770         //
2771         // TODO SERVER-19209 We also need to clear snapshots before a resync.
2772         _dropAllSnapshots_inlock();
2773     }
2774 
2775     // Upon transitioning out of ROLLBACK, we must clear any stable optime candidates that may have
2776     // been rolled back.
2777     if (_memberState.rollback()) {
2778         // Our 'lastApplied' optime at this point should be the rollback common point. We should
2779         // remove any stable optime candidates greater than the common point.
2780         auto lastApplied = _getMyLastAppliedOpTime_inlock();
2781         // The upper bound will give us the first optime T such that T > lastApplied.
2782         auto deletePoint = _stableOpTimeCandidates.upper_bound(lastApplied);
2783         _stableOpTimeCandidates.erase(deletePoint, _stableOpTimeCandidates.end());
2784 
2785         // Ensure that no snapshots were created while we were in rollback.
2786         invariant(!_currentCommittedSnapshot);
2787     }
2788 
2789     // If we are transitioning from secondary, cancel any scheduled takeovers.
2790     if (_memberState.secondary()) {
2791         _cancelCatchupTakeover_inlock();
2792         _cancelPriorityTakeover_inlock();
2793     }
2794 
2795     // Ensure replication is running if we are no longer REMOVED.
2796     if (_memberState.removed() && !newState.arbiter()) {
2797         log() << "Scheduling a task to begin or continue replication";
2798         _scheduleWorkAt(_replExecutor->now(),
2799                         [=](const mongo::executor::TaskExecutor::CallbackArgs& cbData) {
2800                             _externalState->startThreads(_settings);
2801                             auto opCtx = cc().makeOperationContext();
2802                             _startDataReplication(opCtx.get());
2803                         });
2804     }
2805 
2806     log() << "transition to " << newState << " from " << _memberState << rsLog;
2807     // Initializes the featureCompatibilityVersion to the default value, because arbiters do not
2808     // receive the replicated version.
2809     if (newState.arbiter()) {
2810         serverGlobalParams.featureCompatibility.reset();
2811     }
2812 
2813     _memberState = newState;
2814 
2815     _cancelAndRescheduleElectionTimeout_inlock();
2816 
2817     // Notifies waiters blocked in waitForMemberState().
2818     // For testing only.
2819     _memberStateChange.notify_all();
2820 
2821     return result;
2822 }
2823 
_performPostMemberStateUpdateAction(PostMemberStateUpdateAction action)2824 void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
2825     PostMemberStateUpdateAction action) {
2826     invariant(action != kActionWinElection);
2827     switch (action) {
2828         case kActionNone:
2829             break;
2830         case kActionFollowerModeStateChange:
2831             _onFollowerModeStateChange();
2832             break;
2833         case kActionCloseAllConnections:
2834             _externalState->closeConnections();
2835             _externalState->shardingOnStepDownHook();
2836             _externalState->stopNoopWriter();
2837             break;
2838         case kActionStartSingleNodeElection:
2839             // In protocol version 1, single node replset will run an election instead of directly
2840             // calling _postWonElectionUpdateMemberState_inlock as in protocol version 0.
2841             _startElectSelfV1(TopologyCoordinator::StartElectionReason::kElectionTimeout);
2842             break;
2843         default:
2844             severe() << "Unknown post member state update action " << static_cast<int>(action);
2845             fassertFailed(26010);
2846     }
2847 }
2848 
_postWonElectionUpdateMemberState_inlock()2849 void ReplicationCoordinatorImpl::_postWonElectionUpdateMemberState_inlock() {
2850     if (isV1ElectionProtocol()) {
2851         invariant(_topCoord->getTerm() != OpTime::kUninitializedTerm);
2852         _electionId = OID::fromTerm(_topCoord->getTerm());
2853     } else {
2854         _electionId = OID::gen();
2855     }
2856 
2857     auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp();
2858     _topCoord->processWinElection(_electionId, ts);
2859     const PostMemberStateUpdateAction nextAction =
2860         _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
2861     invariant(nextAction == kActionFollowerModeStateChange,
2862               str::stream() << "nextAction == " << static_cast<int>(nextAction));
2863     invariant(_getMemberState_inlock().primary());
2864     // Clear the sync source.
2865     _onFollowerModeStateChange();
2866     // Notify all secondaries of the election win.
2867     _restartHeartbeats_inlock();
2868     if (isV1ElectionProtocol()) {
2869         invariant(!_catchupState);
2870         _catchupState = stdx::make_unique<CatchupState>(this);
2871         _catchupState->start_inlock();
2872     } else {
2873         _enterDrainMode_inlock();
2874     }
2875 }
2876 
_onFollowerModeStateChange()2877 void ReplicationCoordinatorImpl::_onFollowerModeStateChange() {
2878     _externalState->signalApplierToChooseNewSyncSource();
2879 }
2880 
start_inlock()2881 void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
2882     log() << "Entering primary catch-up mode.";
2883 
2884     // No catchup in single node replica set.
2885     if (_repl->_rsConfig.getNumMembers() == 1) {
2886         abort_inlock();
2887         return;
2888     }
2889 
2890     auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod();
2891 
2892     // When catchUpTimeoutMillis is 0, we skip doing catchup entirely.
2893     if (catchupTimeout == ReplSetConfig::kCatchUpDisabled) {
2894         log() << "Skipping primary catchup since the catchup timeout is 0.";
2895         abort_inlock();
2896         return;
2897     }
2898 
2899     auto mutex = &_repl->_mutex;
2900     auto timeoutCB = [this, mutex](const CallbackArgs& cbData) {
2901         if (!cbData.status.isOK()) {
2902             return;
2903         }
2904         stdx::lock_guard<stdx::mutex> lk(*mutex);
2905         // Check whether the callback has been cancelled while holding mutex.
2906         if (cbData.myHandle.isCanceled()) {
2907             return;
2908         }
2909         log() << "Catchup timed out after becoming primary.";
2910         abort_inlock();
2911     };
2912 
2913     // Deal with infinity and overflow - no timeout.
2914     if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout ||
2915         Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) {
2916         return;
2917     }
2918     // Schedule timeout callback.
2919     auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout;
2920     auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB);
2921     if (!status.isOK()) {
2922         log() << "Failed to schedule catchup timeout work.";
2923         abort_inlock();
2924         return;
2925     }
2926     _timeoutCbh = status.getValue();
2927 }
2928 
abort_inlock()2929 void ReplicationCoordinatorImpl::CatchupState::abort_inlock() {
2930     invariant(_repl->_getMemberState_inlock().primary());
2931 
2932     log() << "Exited primary catch-up mode.";
2933     // Clean up its own members.
2934     if (_timeoutCbh) {
2935         _repl->_replExecutor->cancel(_timeoutCbh);
2936     }
2937     if (_waiter) {
2938         _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
2939     }
2940 
2941     // Enter primary drain mode.
2942     _repl->_enterDrainMode_inlock();
2943     // Destroy the state itself.
2944     _repl->_catchupState.reset();
2945 }
2946 
signalHeartbeatUpdate_inlock()2947 void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() {
2948     auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart();
2949     // Haven't collected all heartbeat responses.
2950     if (!targetOpTime) {
2951         return;
2952     }
2953 
2954     // We've caught up.
2955     const auto myLastApplied = _repl->_getMyLastAppliedOpTime_inlock();
2956     if (*targetOpTime <= myLastApplied) {
2957         log() << "Caught up to the latest optime known via heartbeats after becoming primary. "
2958               << "Target optime: " << *targetOpTime << ". My Last Applied: " << myLastApplied;
2959         abort_inlock();
2960         return;
2961     }
2962 
2963     // Reset the target optime if it has changed.
2964     if (_waiter && _waiter->opTime == *targetOpTime) {
2965         return;
2966     }
2967 
2968     log() << "Heartbeats updated catchup target optime to " << *targetOpTime;
2969     if (_waiter) {
2970         _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
2971     }
2972     auto targetOpTimeCB = [this, targetOpTime]() {
2973         // Double check the target time since stepdown may signal us too.
2974         const auto myLastApplied = _repl->_getMyLastAppliedOpTime_inlock();
2975         if (*targetOpTime <= myLastApplied) {
2976             log() << "Caught up to the latest known optime successfully after becoming primary. "
2977                   << "Target optime: " << *targetOpTime << ". My Last Applied: " << myLastApplied;
2978             abort_inlock();
2979         }
2980     };
2981     _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB);
2982     _repl->_opTimeWaiterList.add_inlock(_waiter.get());
2983 }
2984 
abortCatchupIfNeeded()2985 Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() {
2986     if (!isV1ElectionProtocol()) {
2987         return Status(ErrorCodes::CommandNotSupported,
2988                       "Primary catch-up is only supported by Protocol Version 1");
2989     }
2990 
2991     stdx::lock_guard<stdx::mutex> lk(_mutex);
2992     if (_catchupState) {
2993         _catchupState->abort_inlock();
2994         return Status::OK();
2995     }
2996     return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode.");
2997 }
2998 
_enterDrainMode_inlock()2999 void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
3000     _applierState = ApplierState::Draining;
3001     _externalState->stopProducer();
3002 }
3003 
processReplSetFresh(const ReplSetFreshArgs & args,BSONObjBuilder * resultObj)3004 Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args,
3005                                                        BSONObjBuilder* resultObj) {
3006     stdx::lock_guard<stdx::mutex> lk(_mutex);
3007     Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse");
3008     _topCoord->prepareFreshResponse(args, _replExecutor->now(), resultObj, &result);
3009     return result;
3010 }
3011 
processReplSetElect(const ReplSetElectArgs & args,BSONObjBuilder * responseObj)3012 Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args,
3013                                                        BSONObjBuilder* responseObj) {
3014     stdx::lock_guard<stdx::mutex> lk(_mutex);
3015     Status result = Status(ErrorCodes::InternalError, "status not set by callback");
3016     _topCoord->prepareElectResponse(args, _replExecutor->now(), responseObj, &result);
3017     return result;
3018 }
3019 
3020 ReplicationCoordinatorImpl::PostMemberStateUpdateAction
_setCurrentRSConfig_inlock(OperationContext * opCtx,const ReplSetConfig & newConfig,int myIndex)3021 ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(OperationContext* opCtx,
3022                                                        const ReplSetConfig& newConfig,
3023                                                        int myIndex) {
3024     invariant(_settings.usingReplSets());
3025     _cancelHeartbeats_inlock();
3026     _setConfigState_inlock(kConfigSteady);
3027 
3028     _topCoord->updateConfig(newConfig, myIndex, _replExecutor->now());
3029 
3030     // updateConfig() can change terms, so update our term shadow to match.
3031     _termShadow.store(_topCoord->getTerm());
3032 
3033     const ReplSetConfig oldConfig = _rsConfig;
3034     _rsConfig = newConfig;
3035     _protVersion.store(_rsConfig.getProtocolVersion());
3036 
3037     // Warn if this config has protocol version 0
3038     if (newConfig.getProtocolVersion() == 0 &&
3039         (!oldConfig.isInitialized() || oldConfig.getProtocolVersion() == 1)) {
3040         log() << startupWarningsLog;
3041         log() << "** WARNING: This replica set was configured with protocol version 0."
3042               << startupWarningsLog;
3043         log() << "**          This protocol version is deprecated and subject to be removed "
3044               << startupWarningsLog;
3045         log() << "**          in a future version." << startupWarningsLog;
3046     }
3047 
3048     // Warn if running --nojournal and writeConcernMajorityJournalDefault = true
3049     StorageEngine* storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine();
3050     if (storageEngine && !storageEngine->isDurable() &&
3051         (newConfig.getWriteConcernMajorityShouldJournal() &&
3052          (!oldConfig.isInitialized() || !oldConfig.getWriteConcernMajorityShouldJournal()))) {
3053         log() << startupWarningsLog;
3054         log() << "** WARNING: This replica set node is running without journaling enabled but the "
3055               << startupWarningsLog;
3056         log() << "**          writeConcernMajorityJournalDefault option to the replica set config "
3057               << startupWarningsLog;
3058         log() << "**          is set to true. The writeConcernMajorityJournalDefault "
3059               << startupWarningsLog;
3060         log() << "**          option to the replica set config must be set to false "
3061               << startupWarningsLog;
3062         log() << "**          or w:majority write concerns will never complete."
3063               << startupWarningsLog;
3064         log() << "**          In addition, this node's memory consumption may increase until all"
3065               << startupWarningsLog;
3066         log() << "**          available free RAM is exhausted." << startupWarningsLog;
3067         log() << startupWarningsLog;
3068     }
3069 
3070     // Warn if using the in-memory (ephemeral) storage engine with
3071     // writeConcernMajorityJournalDefault = true
3072     if (storageEngine && storageEngine->isEphemeral() &&
3073         (newConfig.getWriteConcernMajorityShouldJournal() &&
3074          (!oldConfig.isInitialized() || !oldConfig.getWriteConcernMajorityShouldJournal()))) {
3075         log() << startupWarningsLog;
3076         log() << "** WARNING: This replica set node is using in-memory (ephemeral) storage with the"
3077               << startupWarningsLog;
3078         log() << "**          writeConcernMajorityJournalDefault option to the replica set config "
3079               << startupWarningsLog;
3080         log() << "**          set to true. The writeConcernMajorityJournalDefault option to the "
3081               << startupWarningsLog;
3082         log() << "**          replica set config must be set to false " << startupWarningsLog;
3083         log() << "**          or w:majority write concerns will never complete."
3084               << startupWarningsLog;
3085         log() << "**          In addition, this node's memory consumption may increase until all"
3086               << startupWarningsLog;
3087         log() << "**          available free RAM is exhausted." << startupWarningsLog;
3088         log() << startupWarningsLog;
3089     }
3090 
3091     log() << "New replica set config in use: " << _rsConfig.toBSON() << rsLog;
3092     _selfIndex = myIndex;
3093     if (_selfIndex >= 0) {
3094         log() << "This node is " << _rsConfig.getMemberAt(_selfIndex).getHostAndPort()
3095               << " in the config";
3096     } else {
3097         log() << "This node is not a member of the config";
3098     }
3099 
3100     _cancelCatchupTakeover_inlock();
3101     _cancelPriorityTakeover_inlock();
3102     _cancelAndRescheduleElectionTimeout_inlock();
3103 
3104     const PostMemberStateUpdateAction action =
3105         _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
3106     if (_selfIndex >= 0) {
3107         // Don't send heartbeats if we're not in the config, if we get re-added one of the
3108         // nodes in the set will contact us.
3109         _startHeartbeats_inlock();
3110     }
3111 
3112     /**
3113      * Protocol Version upgrade and downgrade.
3114      *
3115      * Since PV upgrade resets its term to 0, in-memory states that involve terms should be
3116      * reset on either PV downgrade or upgrade unless they can be updated in both PV0 and PV1 by
3117      * data replication or heartbeats, like the last applied. Otherwise PV downgrade then upgrade
3118      * will pollute the terms with higher terms from the first PV1.
3119      *
3120      * Here we reset those in-memory states including:
3121      * - last committed optime
3122      * - election id (reset for drivers' primary discovery)
3123      * - first optime of term (used by primary)
3124      * - optime used by snapshots
3125      * - stable optime candidates.
3126      *
3127      * _replicationWaiterList and _opTimeWaiterList are used by awaitReplication() and awaitOptime()
3128      * for write/read concerns. Because we are holding the global lock here, there should be no
3129      * running client requests and these waiter lists should be empty. One exception I can think of
3130      * is the target optime of catchup, which doesn't acquire the global lock. It's potentially
3131      * problematic, but very rare.
3132      */
3133     if (oldConfig.isInitialized()) {
3134         if (oldConfig.getProtocolVersion() > newConfig.getProtocolVersion()) {
3135             // Downgrade
3136             // Reset last committed optime for all nodes. After this reset, the commit point will
3137             // not move forward on secondaries since it's only maintained on primary in PV0 and
3138             // never get propagated to secondaries.
3139             _topCoord->resetLastCommittedOpTime();
3140             // Set election id if we're primary.
3141             if (_memberState.primary()) {
3142                 invariant(newConfig.getProtocolVersion() == 0);
3143                 _electionId = OID::gen();
3144                 auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp();
3145                 _topCoord->setElectionInfo(_electionId, ts);
3146             }
3147 
3148             // On PV downgrade, we drop all snapshots and clear the stable optime candidates because
3149             // read majority isn't supported in PV0.
3150             //
3151             // TODO: On a reconfig in PV1, we should also drop all snapshots so we don't mistakenly
3152             // read from the wrong one, for example, if we change the meaning of the "committed"
3153             // snapshot from applied -> durable or the quorum gets changed.
3154             // We currently only do this on the primary that processes the replSetReconfig command,
3155             // but not on nodes that learn of the new config via a heartbeat.
3156             _dropAllSnapshots_inlock();
3157             // Also clear all stable snapshots. This is critical for PV downgrade and then upgrade
3158             // to make sure OpTimes before the downgrade don't interfere the second upgrade.
3159             _stableOpTimeCandidates.clear();
3160 
3161         } else if (oldConfig.getProtocolVersion() < newConfig.getProtocolVersion()) {
3162             // Upgrade
3163             if (_memberState.primary()) {
3164                 invariant(newConfig.getProtocolVersion() == 1);
3165                 // The term is only set to "uninitialized" at startup or in PV0.
3166                 invariant(_topCoord->getTerm() != OpTime::kUninitializedTerm);
3167                 _electionId = OID::fromTerm(_topCoord->getTerm());
3168                 // Allow anything to commit, because, technically, this node is also the previous
3169                 // primary.
3170                 OpTime firstOpTimeOfTerm(Timestamp(), _topCoord->getTerm());
3171                 invariantOK(_topCoord->completeTransitionToPrimary(firstOpTimeOfTerm));
3172                 auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp();
3173                 _topCoord->setElectionInfo(_electionId, ts);
3174             }
3175         }
3176     }
3177 
3178     // Update commit point for the primary. Called by every reconfig because the config
3179     // may change the definition of majority.
3180     //
3181     // On PV downgrade, commit point is probably still from PV1 but will advance to an OpTime with
3182     // term -1 once any write gets committed in PV0.
3183     _updateLastCommittedOpTime_inlock();
3184 
3185     return action;
3186 }
3187 
_wakeReadyWaiters_inlock()3188 void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() {
3189     _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) {
3190         return _doneWaitingForReplication_inlock(
3191             waiter->opTime, Timestamp(), *waiter->writeConcern);
3192     });
3193 }
3194 
processReplSetUpdatePosition(const UpdatePositionArgs & updates,long long * configVersion)3195 Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates,
3196                                                                 long long* configVersion) {
3197     stdx::unique_lock<stdx::mutex> lock(_mutex);
3198     Status status = Status::OK();
3199     bool somethingChanged = false;
3200     for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin();
3201          update != updates.updatesEnd();
3202          ++update) {
3203         status = _setLastOptime_inlock(*update, configVersion);
3204         if (!status.isOK()) {
3205             break;
3206         }
3207         somethingChanged = true;
3208     }
3209 
3210     if (somethingChanged && !_getMemberState_inlock().primary()) {
3211         lock.unlock();
3212         // Must do this outside _mutex
3213         _externalState->forwardSlaveProgress();
3214     }
3215     return status;
3216 }
3217 
processHandshake(OperationContext * opCtx,const HandshakeArgs & handshake)3218 Status ReplicationCoordinatorImpl::processHandshake(OperationContext* opCtx,
3219                                                     const HandshakeArgs& handshake) {
3220     LOG(2) << "Received handshake " << handshake.toBSON();
3221 
3222     stdx::lock_guard<stdx::mutex> lock(_mutex);
3223 
3224     if (getReplicationMode() != modeMasterSlave) {
3225         return Status(ErrorCodes::IllegalOperation,
3226                       "The handshake command is only used for master/slave replication");
3227     }
3228 
3229     auto* memberData = _topCoord->findMemberDataByRid(handshake.getRid());
3230     if (memberData) {
3231         return Status::OK();  // nothing to do
3232     }
3233 
3234     memberData = _topCoord->addSlaveMemberData(handshake.getRid());
3235     memberData->setHostAndPort(_externalState->getClientHostAndPort(opCtx));
3236 
3237     return Status::OK();
3238 }
3239 
buildsIndexes()3240 bool ReplicationCoordinatorImpl::buildsIndexes() {
3241     stdx::lock_guard<stdx::mutex> lk(_mutex);
3242     if (_selfIndex == -1) {
3243         return true;
3244     }
3245     const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex);
3246     return self.shouldBuildIndexes();
3247 }
3248 
getHostsWrittenTo(const OpTime & op,bool durablyWritten)3249 std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op,
3250                                                                        bool durablyWritten) {
3251     stdx::lock_guard<stdx::mutex> lk(_mutex);
3252     /* skip self in master-slave mode because our own HostAndPort is unknown */
3253     const bool skipSelf = getReplicationMode() == modeMasterSlave;
3254     return _topCoord->getHostsWrittenTo(op, durablyWritten, skipSelf);
3255 }
3256 
getOtherNodesInReplSet() const3257 std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const {
3258     stdx::lock_guard<stdx::mutex> lk(_mutex);
3259     invariant(_settings.usingReplSets());
3260 
3261     std::vector<HostAndPort> nodes;
3262     if (_selfIndex == -1) {
3263         return nodes;
3264     }
3265 
3266     for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
3267         if (i == _selfIndex)
3268             continue;
3269 
3270         nodes.push_back(_rsConfig.getMemberAt(i).getHostAndPort());
3271     }
3272     return nodes;
3273 }
3274 
checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions & writeConcern) const3275 Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied(
3276     const WriteConcernOptions& writeConcern) const {
3277     stdx::lock_guard<stdx::mutex> lock(_mutex);
3278     return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
3279 }
3280 
_checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions & writeConcern) const3281 Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock(
3282     const WriteConcernOptions& writeConcern) const {
3283     if (getReplicationMode() == modeNone) {
3284         return Status(ErrorCodes::NoReplicationEnabled,
3285                       "No replication enabled when checking if write concern can be satisfied");
3286     }
3287 
3288     if (getReplicationMode() == modeMasterSlave) {
3289         if (!writeConcern.wMode.empty()) {
3290             return Status(ErrorCodes::UnknownReplWriteConcern,
3291                           "Cannot use named write concern modes in master-slave");
3292         }
3293         // No way to know how many slaves there are, so assume any numeric mode is possible.
3294         return Status::OK();
3295     }
3296 
3297     invariant(getReplicationMode() == modeReplSet);
3298     return _rsConfig.checkIfWriteConcernCanBeSatisfied(writeConcern);
3299 }
3300 
getGetLastErrorDefault()3301 WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() {
3302     stdx::lock_guard<stdx::mutex> lock(_mutex);
3303     if (_rsConfig.isInitialized()) {
3304         return _rsConfig.getDefaultWriteConcern();
3305     }
3306     return WriteConcernOptions();
3307 }
3308 
checkReplEnabledForCommand(BSONObjBuilder * result)3309 Status ReplicationCoordinatorImpl::checkReplEnabledForCommand(BSONObjBuilder* result) {
3310     if (!_settings.usingReplSets()) {
3311         if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
3312             result->append("info", "configsvr");  // for shell prompt
3313         }
3314         return Status(ErrorCodes::NoReplicationEnabled, "not running with --replSet");
3315     }
3316 
3317     if (getMemberState().startup()) {
3318         result->append("info", "run rs.initiate(...) if not yet done for the set");
3319         return Status(ErrorCodes::NotYetInitialized, "no replset config has been received");
3320     }
3321 
3322     return Status::OK();
3323 }
3324 
isReplEnabled() const3325 bool ReplicationCoordinatorImpl::isReplEnabled() const {
3326     return getReplicationMode() != modeNone;
3327 }
3328 
chooseNewSyncSource(const OpTime & lastOpTimeFetched)3329 HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
3330     stdx::lock_guard<stdx::mutex> lk(_mutex);
3331 
3332     HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
3333     // Always allow chaining while in catchup and drain mode.
3334     auto chainingPreference = _getMemberState_inlock().primary()
3335         ? TopologyCoordinator::ChainingPreference::kAllowChaining
3336         : TopologyCoordinator::ChainingPreference::kUseConfiguration;
3337     HostAndPort newSyncSource =
3338         _topCoord->chooseNewSyncSource(_replExecutor->now(), lastOpTimeFetched, chainingPreference);
3339 
3340     // If we lost our sync source, schedule new heartbeats immediately to update our knowledge
3341     // of other members's state, allowing us to make informed sync source decisions.
3342     if (newSyncSource.empty() && !oldSyncSource.empty() && _selfIndex >= 0 &&
3343         !_getMemberState_inlock().primary()) {
3344         _restartHeartbeats_inlock();
3345     }
3346 
3347     return newSyncSource;
3348 }
3349 
_unblacklistSyncSource(const executor::TaskExecutor::CallbackArgs & cbData,const HostAndPort & host)3350 void ReplicationCoordinatorImpl::_unblacklistSyncSource(
3351     const executor::TaskExecutor::CallbackArgs& cbData, const HostAndPort& host) {
3352     if (cbData.status == ErrorCodes::CallbackCanceled)
3353         return;
3354 
3355     stdx::lock_guard<stdx::mutex> lock(_mutex);
3356     _topCoord->unblacklistSyncSource(host, _replExecutor->now());
3357 }
3358 
blacklistSyncSource(const HostAndPort & host,Date_t until)3359 void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
3360     stdx::lock_guard<stdx::mutex> lock(_mutex);
3361     _topCoord->blacklistSyncSource(host, until);
3362     _scheduleWorkAt(until,
3363                     stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource,
3364                                this,
3365                                stdx::placeholders::_1,
3366                                host));
3367 }
3368 
resetLastOpTimesFromOplog(OperationContext * opCtx,DataConsistency consistency)3369 void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx,
3370                                                            DataConsistency consistency) {
3371     StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
3372     OpTime lastOpTime;
3373     if (!lastOpTimeStatus.isOK()) {
3374         warning() << "Failed to load timestamp of most recently applied operation; "
3375                   << lastOpTimeStatus.getStatus();
3376     } else {
3377         lastOpTime = lastOpTimeStatus.getValue();
3378     }
3379 
3380     stdx::unique_lock<stdx::mutex> lock(_mutex);
3381     bool isRollbackAllowed = true;
3382     _setMyLastAppliedOpTime_inlock(lastOpTime, isRollbackAllowed, consistency);
3383     _setMyLastDurableOpTime_inlock(lastOpTime, isRollbackAllowed);
3384     _reportUpstream_inlock(std::move(lock));
3385     // Unlocked below.
3386 
3387     _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTime.getTimestamp());
3388 }
3389 
shouldChangeSyncSource(const HostAndPort & currentSource,const rpc::ReplSetMetadata & replMetadata,boost::optional<rpc::OplogQueryMetadata> oqMetadata)3390 bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
3391     const HostAndPort& currentSource,
3392     const rpc::ReplSetMetadata& replMetadata,
3393     boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
3394     stdx::lock_guard<stdx::mutex> lock(_mutex);
3395     return _topCoord->shouldChangeSyncSource(
3396         currentSource, replMetadata, oqMetadata, _replExecutor->now());
3397 }
3398 
_updateLastCommittedOpTime_inlock()3399 void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
3400     if (_topCoord->updateLastCommittedOpTime()) {
3401         _setStableTimestampForStorage_inlock();
3402     }
3403     // Wake up any threads waiting for replication that now have their replication
3404     // check satisfied.  We must do this regardless of whether we updated the lastCommittedOpTime,
3405     // as lastCommittedOpTime may be based on durable optimes whereas some waiters may be
3406     // waiting on applied (but not necessarily durable) optimes.
3407     _wakeReadyWaiters_inlock();
3408     _signalStepDownWaiterIfReady_inlock();
3409 }
3410 
_calculateStableOpTime_inlock(const std::set<OpTime> & candidates,const OpTime & commitPoint)3411 boost::optional<OpTime> ReplicationCoordinatorImpl::_calculateStableOpTime_inlock(
3412     const std::set<OpTime>& candidates, const OpTime& commitPoint) {
3413 
3414     // No optime candidates.
3415     if (candidates.empty()) {
3416         return boost::none;
3417     }
3418 
3419     auto maximumStableTimestamp = commitPoint.getTimestamp();
3420     if (_canAcceptNonLocalWrites.loadRelaxed() && _storage->supportsDocLocking(_service)) {
3421         // If the storage engine supports document level locking, then it is possible for oplog
3422         // writes to commit out of order. In that case, we don't want to set the stable timestamp
3423         // ahead of the all committed timestamp. This is not a problem for oplog application
3424         // because we only set lastApplied between batches when the all committed timestamp cannot
3425         // be behind. During oplog application the all committed timestamp can jump around since
3426         // we first write oplog entries to the oplog and then go back and apply them.
3427         //
3428         // If the all committed timestamp is less than the commit point, then we are guaranteed that
3429         // there are no stable timestamp candidates with a greater timestamp than the all committed
3430         // timestamp and a lower term than the commit point. Thus we can consider the all committed
3431         // timestamp to have the same term as the commit point. When a primary enters a new term, it
3432         // first storage-commits a 'new primary' oplog entry in the new term before accepting any
3433         // new writes. This will ensure that the all committed timestamp is in the new term before
3434         // any writes in the new term are replication committed.
3435         maximumStableTimestamp =
3436             std::min(_storage->getAllCommittedTimestamp(_service), commitPoint.getTimestamp());
3437     }
3438     const auto maximumStableOpTime = OpTime(maximumStableTimestamp, commitPoint.getTerm());
3439 
3440     // Find the greatest optime candidate that is less than or equal to the commit point.
3441     // To do this we first find the upper bound of 'commitPoint', which points to the smallest
3442     // element in 'candidates' that is greater than 'commitPoint'. We then step back one element,
3443     // which should give us the largest element in 'candidates' that is less than or equal to the
3444     // 'commitPoint'.
3445     auto upperBoundIter = candidates.upper_bound(maximumStableOpTime);
3446 
3447     // All optime candidates are greater than the commit point.
3448     if (upperBoundIter == candidates.begin()) {
3449         return boost::none;
3450     }
3451     // There is a valid stable optime.
3452     else {
3453         return *std::prev(upperBoundIter);
3454     }
3455 }
3456 
_cleanupStableOpTimeCandidates(std::set<OpTime> * candidates,OpTime stableOpTime)3457 void ReplicationCoordinatorImpl::_cleanupStableOpTimeCandidates(std::set<OpTime>* candidates,
3458                                                                 OpTime stableOpTime) {
3459     // Discard optime candidates earlier than the current stable optime, since we don't need
3460     // them anymore. To do this, we find the lower bound of the 'stableOpTime' which is the first
3461     // element that is greater than or equal to the 'stableOpTime'. Then we discard everything up
3462     // to but not including this lower bound i.e. 'deletePoint'.
3463     auto deletePoint = candidates->lower_bound(stableOpTime);
3464 
3465     // Delete the entire range of unneeded optimes.
3466     candidates->erase(candidates->begin(), deletePoint);
3467 }
3468 
calculateStableOpTime_forTest(const std::set<OpTime> & candidates,const OpTime & commitPoint)3469 boost::optional<OpTime> ReplicationCoordinatorImpl::calculateStableOpTime_forTest(
3470     const std::set<OpTime>& candidates, const OpTime& commitPoint) {
3471     stdx::lock_guard<stdx::mutex> lk(_mutex);
3472     return _calculateStableOpTime_inlock(candidates, commitPoint);
3473 }
cleanupStableOpTimeCandidates_forTest(std::set<OpTime> * candidates,OpTime stableOpTime)3474 void ReplicationCoordinatorImpl::cleanupStableOpTimeCandidates_forTest(std::set<OpTime>* candidates,
3475                                                                        OpTime stableOpTime) {
3476     _cleanupStableOpTimeCandidates(candidates, stableOpTime);
3477 }
3478 
getStableOpTimeCandidates_forTest()3479 std::set<OpTime> ReplicationCoordinatorImpl::getStableOpTimeCandidates_forTest() {
3480     stdx::unique_lock<stdx::mutex> lk(_mutex);
3481     return _stableOpTimeCandidates;
3482 }
3483 
getStableOpTime_forTest()3484 boost::optional<OpTime> ReplicationCoordinatorImpl::getStableOpTime_forTest() {
3485     return _getStableOpTime_inlock();
3486 }
3487 
_getStableOpTime_inlock()3488 boost::optional<OpTime> ReplicationCoordinatorImpl::_getStableOpTime_inlock() {
3489     auto commitPoint = _topCoord->getLastCommittedOpTime();
3490     if (_currentCommittedSnapshot) {
3491         auto snapshotOpTime = *_currentCommittedSnapshot;
3492         invariant(snapshotOpTime.getTimestamp() <= commitPoint.getTimestamp());
3493         invariant(snapshotOpTime <= commitPoint);
3494     }
3495 
3496     // Compute the current stable optime.
3497     auto stableOpTime = _calculateStableOpTime_inlock(_stableOpTimeCandidates, commitPoint);
3498     if (stableOpTime) {
3499         // By definition, the stable optime should never be greater than the commit point.
3500         invariant(stableOpTime->getTimestamp() <= commitPoint.getTimestamp());
3501         invariant(*stableOpTime <= commitPoint);
3502     }
3503 
3504     return stableOpTime;
3505 }
3506 
_setStableTimestampForStorage_inlock()3507 void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() {
3508 
3509     // Get the current stable optime.
3510     auto stableOpTime = _getStableOpTime_inlock();
3511 
3512     // If there is a valid stable optime, set it for the storage engine, and then remove any
3513     // old, unneeded stable optime candidates.
3514     if (stableOpTime) {
3515         LOG(2) << "Setting replication's stable optime to " << stableOpTime.value();
3516 
3517         if (!testingSnapshotBehaviorInIsolation) {
3518             // Update committed snapshot and wake up any threads waiting on read concern or
3519             // write concern.
3520             _updateCommittedSnapshot_inlock(stableOpTime.get());
3521             // Update the stable timestamp for the storage engine.
3522             _storage->setStableTimestamp(getServiceContext(), stableOpTime->getTimestamp());
3523         }
3524         _cleanupStableOpTimeCandidates(&_stableOpTimeCandidates, stableOpTime.get());
3525     }
3526 }
3527 
advanceCommitPoint(const OpTime & committedOpTime)3528 void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTime) {
3529     stdx::unique_lock<stdx::mutex> lk(_mutex);
3530     _advanceCommitPoint_inlock(committedOpTime);
3531 }
3532 
_advanceCommitPoint_inlock(const OpTime & committedOpTime)3533 void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& committedOpTime) {
3534     if (_topCoord->advanceLastCommittedOpTime(committedOpTime)) {
3535         if (_getMemberState_inlock().arbiter()) {
3536             // Arbiters do not store replicated data, so we consider their data trivially
3537             // consistent.
3538             _setMyLastAppliedOpTime_inlock(committedOpTime, false, DataConsistency::Consistent);
3539         }
3540 
3541         _setStableTimestampForStorage_inlock();
3542         // Even if we have no new snapshot, we need to notify waiters that the commit point moved.
3543         _externalState->notifyOplogMetadataWaiters(committedOpTime);
3544     }
3545 }
3546 
getLastCommittedOpTime() const3547 OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
3548     stdx::unique_lock<stdx::mutex> lk(_mutex);
3549     return _topCoord->getLastCommittedOpTime();
3550 }
3551 
processReplSetRequestVotes(OperationContext * opCtx,const ReplSetRequestVotesArgs & args,ReplSetRequestVotesResponse * response)3552 Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
3553     OperationContext* opCtx,
3554     const ReplSetRequestVotesArgs& args,
3555     ReplSetRequestVotesResponse* response) {
3556     if (!isV1ElectionProtocol()) {
3557         return {ErrorCodes::BadValue, "not using election protocol v1"};
3558     }
3559 
3560     auto termStatus = updateTerm(opCtx, args.getTerm());
3561     if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm)
3562         return termStatus;
3563 
3564     {
3565         stdx::lock_guard<stdx::mutex> lk(_mutex);
3566 
3567         // We should only enter terminal shutdown from global terminal exit.  In that case, rather
3568         // than voting in a term we don't plan to stay alive in, refuse to vote.
3569         if (_inTerminalShutdown) {
3570             return Status(ErrorCodes::ShutdownInProgress, "In the process of shutting down");
3571         }
3572 
3573         _topCoord->processReplSetRequestVotes(args, response);
3574     }
3575 
3576     if (!args.isADryRun() && response->getVoteGranted()) {
3577         LastVote lastVote{args.getTerm(), args.getCandidateIndex()};
3578 
3579         Status status = _externalState->storeLocalLastVoteDocument(opCtx, lastVote);
3580         if (!status.isOK()) {
3581             error() << "replSetRequestVotes failed to store LastVote document; " << status;
3582             return status;
3583         }
3584     }
3585     return Status::OK();
3586 }
3587 
prepareReplMetadata(const BSONObj & metadataRequestObj,const OpTime & lastOpTimeFromClient,BSONObjBuilder * builder) const3588 void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequestObj,
3589                                                      const OpTime& lastOpTimeFromClient,
3590                                                      BSONObjBuilder* builder) const {
3591 
3592     bool hasReplSetMetadata = metadataRequestObj.hasField(rpc::kReplSetMetadataFieldName);
3593     bool hasOplogQueryMetadata = metadataRequestObj.hasField(rpc::kOplogQueryMetadataFieldName);
3594     // Don't take any locks if we do not need to.
3595     if (!hasReplSetMetadata && !hasOplogQueryMetadata) {
3596         return;
3597     }
3598 
3599     // Avoid retrieving Rollback ID if we do not need it for _prepareOplogQueryMetadata_inlock().
3600     int rbid = -1;
3601     if (hasOplogQueryMetadata) {
3602         rbid = _replicationProcess->getRollbackID();
3603         invariant(-1 != rbid);
3604     }
3605 
3606     stdx::lock_guard<stdx::mutex> lk(_mutex);
3607 
3608     if (hasReplSetMetadata) {
3609         _prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder);
3610     }
3611 
3612     if (hasOplogQueryMetadata) {
3613         _prepareOplogQueryMetadata_inlock(rbid, builder);
3614     }
3615 }
3616 
_prepareReplSetMetadata_inlock(const OpTime & lastOpTimeFromClient,BSONObjBuilder * builder) const3617 void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient,
3618                                                                 BSONObjBuilder* builder) const {
3619     OpTime lastVisibleOpTime =
3620         std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock());
3621     auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime);
3622     metadata.writeToMetadata(builder).transitional_ignore();
3623 }
3624 
_prepareOplogQueryMetadata_inlock(int rbid,BSONObjBuilder * builder) const3625 void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(int rbid,
3626                                                                    BSONObjBuilder* builder) const {
3627     _topCoord->prepareOplogQueryMetadata(rbid).writeToMetadata(builder).transitional_ignore();
3628 }
3629 
isV1ElectionProtocol() const3630 bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const {
3631     return _protVersion.load() == 1;
3632 }
3633 
getWriteConcernMajorityShouldJournal()3634 bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal() {
3635     return getConfig().getWriteConcernMajorityShouldJournal();
3636 }
3637 
getWriteConcernMajorityShouldJournal_inlock() const3638 bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal_inlock() const {
3639     return _rsConfig.getWriteConcernMajorityShouldJournal();
3640 }
3641 
processHeartbeatV1(const ReplSetHeartbeatArgsV1 & args,ReplSetHeartbeatResponse * response)3642 Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
3643                                                       ReplSetHeartbeatResponse* response) {
3644     {
3645         stdx::lock_guard<stdx::mutex> lock(_mutex);
3646         if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
3647             return Status(ErrorCodes::NotYetInitialized,
3648                           "Received heartbeat while still initializing replication system");
3649         }
3650     }
3651 
3652     Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
3653     stdx::lock_guard<stdx::mutex> lk(_mutex);
3654 
3655     auto senderHost(args.getSenderHost());
3656     const Date_t now = _replExecutor->now();
3657     result = _topCoord->prepareHeartbeatResponseV1(now, args, _settings.ourSetName(), response);
3658 
3659     if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
3660         // If this node does not belong to the configuration it knows about, send heartbeats
3661         // back to any node that sends us a heartbeat, in case one of those remote nodes has
3662         // a configuration that contains us.  Chances are excellent that it will, since that
3663         // is the only reason for a remote node to send this node a heartbeat request.
3664         if (!senderHost.empty() && _seedList.insert(senderHost).second) {
3665             _scheduleHeartbeatToTarget_inlock(senderHost, -1, now);
3666         }
3667     } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) {
3668         // Schedule a heartbeat to the sender to fetch the new config.
3669         // We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
3670         // will trigger reconfig, which cancels and reschedules all heartbeats.
3671         if (args.hasSender()) {
3672             int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
3673             _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now);
3674         }
3675     } else if (result.isOK()) {
3676         // Update liveness for sending node.
3677         auto* memberData = _topCoord->findMemberDataByMemberId(args.getSenderId());
3678         if (!memberData) {
3679             return result;
3680         }
3681         memberData->updateLiveness(_replExecutor->now());
3682     }
3683     return result;
3684 }
3685 
summarizeAsHtml(ReplSetHtmlSummary * output)3686 void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
3687     stdx::lock_guard<stdx::mutex> lock(_mutex);
3688 
3689     // TODO(dannenberg) consider putting both optimes into the htmlsummary.
3690     output->setSelfOptime(_getMyLastAppliedOpTime_inlock());
3691     output->setSelfUptime(time(0) - serverGlobalParams.started);
3692     output->setNow(_replExecutor->now());
3693 
3694     _topCoord->summarizeAsHtml(output);
3695 }
3696 
getTerm()3697 long long ReplicationCoordinatorImpl::getTerm() {
3698     // Note: no mutex acquisition here, as we are reading an Atomic variable.
3699     return _termShadow.load();
3700 }
3701 
updateTerm_forTest(long long term,TopologyCoordinator::UpdateTermResult * updateResult)3702 EventHandle ReplicationCoordinatorImpl::updateTerm_forTest(
3703     long long term, TopologyCoordinator::UpdateTermResult* updateResult) {
3704     stdx::lock_guard<stdx::mutex> lock(_mutex);
3705 
3706     EventHandle finishEvh;
3707     finishEvh = _updateTerm_inlock(term, updateResult);
3708     return finishEvh;
3709 }
3710 
updateTerm(OperationContext * opCtx,long long term)3711 Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long term) {
3712     // Term is only valid if we are replicating.
3713     if (getReplicationMode() != modeReplSet) {
3714         return {ErrorCodes::BadValue, "cannot supply 'term' without active replication"};
3715     }
3716 
3717     if (!isV1ElectionProtocol()) {
3718         // Do not update if not in V1 protocol.
3719         return Status::OK();
3720     }
3721 
3722     // Check we haven't acquired any lock, because potential stepdown needs global lock.
3723     dassert(!opCtx->lockState()->isLocked());
3724     TopologyCoordinator::UpdateTermResult updateTermResult;
3725     EventHandle finishEvh;
3726 
3727     {
3728         stdx::lock_guard<stdx::mutex> lock(_mutex);
3729         finishEvh = _updateTerm_inlock(term, &updateTermResult);
3730     }
3731 
3732     // Wait for potential stepdown to finish.
3733     if (finishEvh.isValid()) {
3734         _replExecutor->waitForEvent(finishEvh);
3735     }
3736     if (updateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm ||
3737         updateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
3738         return {ErrorCodes::StaleTerm, "Replication term of this node was stale; retry query"};
3739     }
3740 
3741     return Status::OK();
3742 }
3743 
_updateTerm_inlock(long long term,TopologyCoordinator::UpdateTermResult * updateTermResult)3744 EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock(
3745     long long term, TopologyCoordinator::UpdateTermResult* updateTermResult) {
3746     if (!isV1ElectionProtocol()) {
3747         LOG(3) << "Cannot update term in election protocol version 0";
3748         return EventHandle();
3749     }
3750 
3751     auto now = _replExecutor->now();
3752     TopologyCoordinator::UpdateTermResult localUpdateTermResult = _topCoord->updateTerm(term, now);
3753     {
3754         if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm) {
3755             _termShadow.store(term);
3756             _cancelPriorityTakeover_inlock();
3757             _cancelAndRescheduleElectionTimeout_inlock();
3758         }
3759     }
3760 
3761     if (updateTermResult) {
3762         *updateTermResult = localUpdateTermResult;
3763     }
3764 
3765     if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
3766         if (!_pendingTermUpdateDuringStepDown || *_pendingTermUpdateDuringStepDown < term) {
3767             _pendingTermUpdateDuringStepDown = term;
3768         }
3769         if (_topCoord->prepareForUnconditionalStepDown()) {
3770             log() << "stepping down from primary, because a new term has begun: " << term;
3771             return _stepDownStart();
3772         } else {
3773             LOG(2) << "Updated term but not triggering stepdown because we are already in the "
3774                       "process of stepping down";
3775         }
3776     }
3777     return EventHandle();
3778 }
3779 
reserveSnapshotName(OperationContext * opCtx)3780 Timestamp ReplicationCoordinatorImpl::reserveSnapshotName(OperationContext* opCtx) {
3781     Timestamp reservedName;
3782     if (getReplicationMode() == Mode::modeReplSet) {
3783         invariant(opCtx->lockState()->isLocked());
3784         if (getMemberState().primary()) {
3785             // Use the current optime on the node, for primary nodes.
3786             reservedName = LogicalClock::get(getServiceContext())->getClusterTime().asTimestamp();
3787         } else {
3788             // Use lastApplied time, for secondary nodes.
3789             reservedName = getMyLastAppliedOpTime().getTimestamp();
3790         }
3791     } else {
3792         // All snapshots are the same for a standalone node.
3793         reservedName = Timestamp();
3794     }
3795     // This was just in case the snapshot name was different from the lastOp in the client.
3796     ReplClientInfo::forClient(opCtx->getClient()).setLastSnapshot(reservedName);
3797     return reservedName;
3798 }
3799 
waitUntilSnapshotCommitted(OperationContext * opCtx,const Timestamp & untilSnapshot)3800 void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx,
3801                                                             const Timestamp& untilSnapshot) {
3802     stdx::unique_lock<stdx::mutex> lock(_mutex);
3803 
3804     uassert(ErrorCodes::NotYetInitialized,
3805             "Cannot use snapshots until replica set is finished initializing.",
3806             _rsConfigState != kConfigUninitialized && _rsConfigState != kConfigInitiating);
3807     while (!_currentCommittedSnapshot ||
3808            _currentCommittedSnapshot->getTimestamp() < untilSnapshot) {
3809         opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock);
3810     }
3811 }
3812 
getNumUncommittedSnapshots()3813 size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() {
3814     return _uncommittedSnapshotsSize.load();
3815 }
3816 
3817 MONGO_FP_DECLARE(disableSnapshotting);
3818 
_updateCommittedSnapshot_inlock(const OpTime & newCommittedSnapshot)3819 void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
3820     const OpTime& newCommittedSnapshot) {
3821     if (testingSnapshotBehaviorInIsolation) {
3822         return;
3823     }
3824 
3825     // If we are in ROLLBACK state, do not set any new _currentCommittedSnapshot, as it will be
3826     // cleared at the end of rollback anyway.
3827     if (_memberState.rollback()) {
3828         log() << "Not updating committed snapshot because we are in rollback";
3829         return;
3830     }
3831 
3832     invariant(!newCommittedSnapshot.isNull());
3833     invariant(newCommittedSnapshot.getTimestamp() <=
3834               _topCoord->getLastCommittedOpTime().getTimestamp());
3835 
3836     // The new committed snapshot should be >= the current snapshot.
3837     if (_currentCommittedSnapshot) {
3838         invariant(newCommittedSnapshot >= _currentCommittedSnapshot);
3839     }
3840 
3841     if (MONGO_FAIL_POINT(disableSnapshotting))
3842         return;
3843     _currentCommittedSnapshot = newCommittedSnapshot;
3844     _currentCommittedSnapshotCond.notify_all();
3845 
3846     _externalState->updateCommittedSnapshot(newCommittedSnapshot);
3847 
3848     // Wake up any threads waiting for read concern or write concern.
3849     _wakeReadyWaiters_inlock();
3850 }
3851 
dropAllSnapshots()3852 void ReplicationCoordinatorImpl::dropAllSnapshots() {
3853     stdx::lock_guard<stdx::mutex> lock(_mutex);
3854     _dropAllSnapshots_inlock();
3855 }
3856 
_dropAllSnapshots_inlock()3857 void ReplicationCoordinatorImpl::_dropAllSnapshots_inlock() {
3858     _currentCommittedSnapshot = boost::none;
3859     _externalState->dropAllSnapshots();
3860 }
3861 
waitForElectionFinish_forTest()3862 void ReplicationCoordinatorImpl::waitForElectionFinish_forTest() {
3863     if (_electionFinishedEvent.isValid()) {
3864         _replExecutor->waitForEvent(_electionFinishedEvent);
3865     }
3866 }
3867 
waitForElectionDryRunFinish_forTest()3868 void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() {
3869     if (_electionDryRunFinishedEvent.isValid()) {
3870         _replExecutor->waitForEvent(_electionDryRunFinishedEvent);
3871     }
3872 }
3873 
_resetElectionInfoOnProtocolVersionUpgrade(OperationContext * opCtx,const ReplSetConfig & oldConfig,const ReplSetConfig & newConfig)3874 void ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgrade(
3875     OperationContext* opCtx, const ReplSetConfig& oldConfig, const ReplSetConfig& newConfig) {
3876 
3877     // On protocol version upgrade, reset last vote as if I just learned the term 0 from other
3878     // nodes.
3879     if (!oldConfig.isInitialized() ||
3880         oldConfig.getProtocolVersion() >= newConfig.getProtocolVersion()) {
3881         return;
3882     }
3883     invariant(newConfig.getProtocolVersion() == 1);
3884 
3885     const LastVote lastVote{OpTime::kInitialTerm, -1};
3886     fassert(40445, _externalState->storeLocalLastVoteDocument(opCtx, lastVote));
3887 }
3888 
_scheduleWorkAt(Date_t when,const CallbackFn & work)3889 CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) {
3890     auto cbh = _replExecutor->scheduleWorkAt(when, [work](const CallbackArgs& args) {
3891         if (args.status == ErrorCodes::CallbackCanceled) {
3892             return;
3893         }
3894         work(args);
3895     });
3896     if (cbh == ErrorCodes::ShutdownInProgress) {
3897         return {};
3898     }
3899     return fassertStatusOK(28800, cbh);
3900 }
3901 
_makeEvent()3902 EventHandle ReplicationCoordinatorImpl::_makeEvent() {
3903     auto eventResult = this->_replExecutor->makeEvent();
3904     if (eventResult.getStatus() == ErrorCodes::ShutdownInProgress) {
3905         return EventHandle();
3906     }
3907     fassert(28825, eventResult.getStatus());
3908     return eventResult.getValue();
3909 }
3910 
populateUnsetWriteConcernOptionsSyncMode(WriteConcernOptions wc)3911 WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode(
3912     WriteConcernOptions wc) {
3913 
3914     WriteConcernOptions writeConcern(wc);
3915     if (writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET) {
3916         if (writeConcern.wMode == WriteConcernOptions::kMajority &&
3917             getWriteConcernMajorityShouldJournal()) {
3918             writeConcern.syncMode = WriteConcernOptions::SyncMode::JOURNAL;
3919         } else {
3920             writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE;
3921         }
3922     }
3923     return writeConcern;
3924 }
3925 
_wrapAsCallbackFn(const stdx::function<void ()> & work)3926 CallbackFn ReplicationCoordinatorImpl::_wrapAsCallbackFn(const stdx::function<void()>& work) {
3927     return [work](const CallbackArgs& cbData) {
3928         if (cbData.status == ErrorCodes::CallbackCanceled) {
3929             return;
3930         }
3931 
3932         work();
3933     };
3934 }
3935 
stepUpIfEligible(bool skipDryRun)3936 Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) {
3937     if (!isV1ElectionProtocol()) {
3938         return Status(ErrorCodes::CommandNotSupported,
3939                       "Step-up command is only supported by Protocol Version 1");
3940     }
3941 
3942     auto reason = skipDryRun ? TopologyCoordinator::StartElectionReason::kStepUpRequestSkipDryRun
3943                              : TopologyCoordinator::StartElectionReason::kStepUpRequest;
3944     _startElectSelfIfEligibleV1(reason);
3945 
3946     EventHandle finishEvent;
3947     {
3948         stdx::lock_guard<stdx::mutex> lk(_mutex);
3949         finishEvent = _electionFinishedEvent;
3950     }
3951     if (finishEvent.isValid()) {
3952         _replExecutor->waitForEvent(finishEvent);
3953     }
3954     auto state = getMemberState();
3955     if (state.primary()) {
3956         return Status::OK();
3957     }
3958     return Status(ErrorCodes::CommandFailed, "Election failed.");
3959 }
3960 
getIndexPrefetchConfig() const3961 ReplSettings::IndexPrefetchConfig ReplicationCoordinatorImpl::getIndexPrefetchConfig() const {
3962     stdx::lock_guard<stdx::mutex> lock(_indexPrefetchMutex);
3963     return _indexPrefetchConfig;
3964 }
3965 
setIndexPrefetchConfig(const ReplSettings::IndexPrefetchConfig cfg)3966 void ReplicationCoordinatorImpl::setIndexPrefetchConfig(
3967     const ReplSettings::IndexPrefetchConfig cfg) {
3968     stdx::lock_guard<stdx::mutex> lock(_indexPrefetchMutex);
3969     _indexPrefetchConfig = cfg;
3970 }
3971 
_cancelElectionIfNeeded_inlock()3972 executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inlock() {
3973     if (_topCoord->getRole() != TopologyCoordinator::Role::kCandidate) {
3974         return {};
3975     }
3976     if (isV1ElectionProtocol()) {
3977         invariant(_voteRequester);
3978         _voteRequester->cancel();
3979     } else {
3980         invariant(_freshnessChecker);
3981         _freshnessChecker->cancel();
3982         if (_electCmdRunner) {
3983             _electCmdRunner->cancel();
3984         }
3985     }
3986     return _electionFinishedEvent;
3987 }
3988 
_nextRandomInt64_inlock(int64_t limit)3989 int64_t ReplicationCoordinatorImpl::_nextRandomInt64_inlock(int64_t limit) {
3990     return _random.nextInt64(limit);
3991 }
3992 
3993 }  // namespace repl
3994 }  // namespace mongo
3995