1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/repl/replication_coordinator_impl.h"
36
37 #include <algorithm>
38 #include <limits>
39
40 #include "mongo/base/status.h"
41 #include "mongo/client/fetcher.h"
42 #include "mongo/db/audit.h"
43 #include "mongo/db/client.h"
44 #include "mongo/db/commands.h"
45 #include "mongo/db/concurrency/d_concurrency.h"
46 #include "mongo/db/index/index_descriptor.h"
47 #include "mongo/db/logical_clock.h"
48 #include "mongo/db/logical_time.h"
49 #include "mongo/db/logical_time_validator.h"
50 #include "mongo/db/operation_context_noop.h"
51 #include "mongo/db/repl/always_allow_non_local_writes.h"
52 #include "mongo/db/repl/check_quorum_for_config_change.h"
53 #include "mongo/db/repl/data_replicator_external_state_initial_sync.h"
54 #include "mongo/db/repl/elect_cmd_runner.h"
55 #include "mongo/db/repl/freshness_checker.h"
56 #include "mongo/db/repl/handshake_args.h"
57 #include "mongo/db/repl/is_master_response.h"
58 #include "mongo/db/repl/last_vote.h"
59 #include "mongo/db/repl/member_data.h"
60 #include "mongo/db/repl/read_concern_args.h"
61 #include "mongo/db/repl/repl_client_info.h"
62 #include "mongo/db/repl/repl_set_config_checks.h"
63 #include "mongo/db/repl/repl_set_heartbeat_args.h"
64 #include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
65 #include "mongo/db/repl/repl_set_heartbeat_response.h"
66 #include "mongo/db/repl/repl_set_html_summary.h"
67 #include "mongo/db/repl/repl_set_request_votes_args.h"
68 #include "mongo/db/repl/repl_settings.h"
69 #include "mongo/db/repl/replication_process.h"
70 #include "mongo/db/repl/rslog.h"
71 #include "mongo/db/repl/storage_interface.h"
72 #include "mongo/db/repl/topology_coordinator.h"
73 #include "mongo/db/repl/update_position_args.h"
74 #include "mongo/db/repl/vote_requester.h"
75 #include "mongo/db/server_options.h"
76 #include "mongo/db/server_parameters.h"
77 #include "mongo/db/write_concern.h"
78 #include "mongo/db/write_concern_options.h"
79 #include "mongo/executor/connection_pool_stats.h"
80 #include "mongo/executor/network_interface.h"
81 #include "mongo/rpc/metadata/oplog_query_metadata.h"
82 #include "mongo/rpc/metadata/repl_set_metadata.h"
83 #include "mongo/stdx/functional.h"
84 #include "mongo/stdx/mutex.h"
85 #include "mongo/util/assert_util.h"
86 #include "mongo/util/fail_point_service.h"
87 #include "mongo/util/log.h"
88 #include "mongo/util/scopeguard.h"
89 #include "mongo/util/stacktrace.h"
90 #include "mongo/util/time_support.h"
91 #include "mongo/util/timer.h"
92
93 namespace mongo {
94 namespace repl {
95
96 MONGO_FP_DECLARE(stepdownHangBeforePerformingPostMemberStateUpdateActions);
97 MONGO_FP_DECLARE(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock);
98
99 using CallbackArgs = executor::TaskExecutor::CallbackArgs;
100 using CallbackFn = executor::TaskExecutor::CallbackFn;
101 using CallbackHandle = executor::TaskExecutor::CallbackHandle;
102 using EventHandle = executor::TaskExecutor::EventHandle;
103 using executor::NetworkInterface;
104 using NextAction = Fetcher::NextAction;
105
106 namespace {
107
108 const char kLocalDB[] = "local";
109
110 MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncAttempts, int, 10);
111
112 MONGO_EXPORT_SERVER_PARAMETER(enableElectionHandoff, bool, true);
113
114 // Number of seconds between noop writer writes.
115 MONGO_EXPORT_STARTUP_SERVER_PARAMETER(periodicNoopIntervalSecs, int, 10);
116
MONGO_INITIALIZER(periodicNoopIntervalSecs)117 MONGO_INITIALIZER(periodicNoopIntervalSecs)(InitializerContext*) {
118 if (periodicNoopIntervalSecs <= 0) {
119 return Status(ErrorCodes::BadValue,
120 str::stream() << "Periodic noop interval must be greater than 0 seconds: "
121 << periodicNoopIntervalSecs);
122 } else if (periodicNoopIntervalSecs > 10) {
123 return Status(ErrorCodes::BadValue,
124 str::stream()
125 << "Periodic noop interval must be less than or equal to 10 seconds: "
126 << periodicNoopIntervalSecs);
127 }
128 return Status::OK();
129 }
130
lockAndCall(stdx::unique_lock<stdx::mutex> * lk,const stdx::function<void ()> & fn)131 void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const stdx::function<void()>& fn) {
132 if (!lk->owns_lock()) {
133 lk->lock();
134 }
135 fn();
136 }
137
138 /**
139 * Implements the force-reconfig behavior of incrementing config version by a large random
140 * number.
141 */
incrementConfigVersionByRandom(BSONObj config)142 BSONObj incrementConfigVersionByRandom(BSONObj config) {
143 BSONObjBuilder builder;
144 for (BSONObjIterator iter(config); iter.more(); iter.next()) {
145 BSONElement elem = *iter;
146 if (elem.fieldNameStringData() == ReplSetConfig::kVersionFieldName && elem.isNumber()) {
147 std::unique_ptr<SecureRandom> generator(SecureRandom::create());
148 const int random = std::abs(static_cast<int>(generator->nextInt64()) % 100000);
149 builder.appendIntOrLL(ReplSetConfig::kVersionFieldName,
150 elem.numberLong() + 10000 + random);
151 } else {
152 builder.append(elem);
153 }
154 }
155 return builder.obj();
156 }
157
158 // This is a special flag that allows for testing of snapshot behavior by skipping the replication
159 // related checks and isolating the storage/query side of snapshotting.
160 // SERVER-31304 rename this parameter to something more appropriate.
161 bool testingSnapshotBehaviorInIsolation = false;
162 ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation(
163 ServerParameterSet::getGlobal(),
164 "testingSnapshotBehaviorInIsolation",
165 &testingSnapshotBehaviorInIsolation);
166
167 } // namespace
168
Waiter(OpTime _opTime,const WriteConcernOptions * _writeConcern)169 ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern)
170 : opTime(std::move(_opTime)), writeConcern(_writeConcern) {}
171
toBSON() const172 BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const {
173 BSONObjBuilder bob;
174 bob.append("opTime", opTime.toBSON());
175 if (writeConcern) {
176 bob.append("writeConcern", writeConcern->toBSON());
177 }
178 return bob.obj();
179 };
180
toString() const181 std::string ReplicationCoordinatorImpl::Waiter::toString() const {
182 return toBSON().toString();
183 };
184
185
ThreadWaiter(OpTime _opTime,const WriteConcernOptions * _writeConcern,stdx::condition_variable * _condVar)186 ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime,
187 const WriteConcernOptions* _writeConcern,
188 stdx::condition_variable* _condVar)
189 : Waiter(_opTime, _writeConcern), condVar(_condVar) {}
190
notify_inlock()191 void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() {
192 invariant(condVar);
193 condVar->notify_all();
194 }
195
CallbackWaiter(OpTime _opTime,FinishFunc _finishCallback)196 ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime,
197 FinishFunc _finishCallback)
198 : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {}
199
notify_inlock()200 void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() {
201 invariant(finishCallback);
202 finishCallback();
203 }
204
205
206 class ReplicationCoordinatorImpl::WaiterGuard {
207 public:
208 /**
209 * Constructor takes the list of waiters and enqueues itself on the list, removing itself
210 * in the destructor.
211 *
212 * Usually waiters will be signaled and removed when their criteria are satisfied, but
213 * wait_until() with timeout may signal waiters earlier and this guard will remove the waiter
214 * properly.
215 *
216 * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one
217 * of these without holding _mutex
218 */
WaiterGuard(WaiterList * list,Waiter * waiter)219 WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) {
220 list->add_inlock(_waiter);
221 }
222
~WaiterGuard()223 ~WaiterGuard() {
224 _list->remove_inlock(_waiter);
225 }
226
227 private:
228 WaiterList* _list;
229 Waiter* _waiter;
230 };
231
add_inlock(WaiterType waiter)232 void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
233 _list.push_back(waiter);
234 }
235
signalAndRemoveIf_inlock(stdx::function<bool (WaiterType)> func)236 void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock(
237 stdx::function<bool(WaiterType)> func) {
238 // Only advance iterator when the element doesn't match.
239 for (auto it = _list.begin(); it != _list.end();) {
240 if (!func(*it)) {
241 ++it;
242 continue;
243 }
244
245 WaiterType waiter = std::move(*it);
246 if (it == std::prev(_list.end())) {
247 // Iterator will be invalid after erasing the last element, so set it to the
248 // next one (i.e. end()).
249 it = _list.erase(it);
250 } else {
251 // Iterator is still valid after pop_back().
252 std::swap(*it, _list.back());
253 _list.pop_back();
254 }
255
256 // It's important to call notify() after the waiter has been removed from the list
257 // since notify() might remove the waiter itself.
258 waiter->notify_inlock();
259 }
260 }
261
signalAndRemoveAll_inlock()262 void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() {
263 std::vector<WaiterType> list = std::move(_list);
264 // Call notify() after removing the waiters from the list.
265 for (auto& waiter : list) {
266 waiter->notify_inlock();
267 }
268 }
269
remove_inlock(WaiterType waiter)270 bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) {
271 auto it = std::find(_list.begin(), _list.end(), waiter);
272 if (it == _list.end()) {
273 return false;
274 }
275 std::swap(*it, _list.back());
276 _list.pop_back();
277 return true;
278 }
279
280 namespace {
getReplicationModeFromSettings(const ReplSettings & settings)281 ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& settings) {
282 if (settings.usingReplSets()) {
283 return ReplicationCoordinator::modeReplSet;
284 }
285 if (settings.isMaster() || settings.isSlave()) {
286 return ReplicationCoordinator::modeMasterSlave;
287 }
288 return ReplicationCoordinator::modeNone;
289 }
290
createInitialSyncerOptions(ReplicationCoordinator * replCoord,ReplicationCoordinatorExternalState * externalState)291 InitialSyncerOptions createInitialSyncerOptions(
292 ReplicationCoordinator* replCoord, ReplicationCoordinatorExternalState* externalState) {
293 InitialSyncerOptions options;
294 options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); };
295 options.setMyLastOptime = [replCoord, externalState](
296 const OpTime& opTime, ReplicationCoordinator::DataConsistency consistency) {
297 replCoord->setMyLastAppliedOpTimeForward(opTime, consistency);
298 externalState->setGlobalTimestamp(replCoord->getServiceContext(), opTime.getTimestamp());
299 };
300 options.resetOptimes = [replCoord]() { replCoord->resetMyLastOpTimes(); };
301 options.getSlaveDelay = [replCoord]() { return replCoord->getSlaveDelaySecs(); };
302 options.syncSourceSelector = replCoord;
303 options.batchLimits = externalState->getInitialSyncBatchLimits();
304 options.oplogFetcherMaxFetcherRestarts =
305 externalState->getOplogFetcherInitialSyncMaxFetcherRestarts();
306 return options;
307 }
308 } // namespace
309
ReplicationCoordinatorImpl(ServiceContext * service,const ReplSettings & settings,std::unique_ptr<ReplicationCoordinatorExternalState> externalState,std::unique_ptr<executor::TaskExecutor> executor,std::unique_ptr<TopologyCoordinator> topCoord,ReplicationProcess * replicationProcess,StorageInterface * storage,int64_t prngSeed)310 ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
311 ServiceContext* service,
312 const ReplSettings& settings,
313 std::unique_ptr<ReplicationCoordinatorExternalState> externalState,
314 std::unique_ptr<executor::TaskExecutor> executor,
315 std::unique_ptr<TopologyCoordinator> topCoord,
316 ReplicationProcess* replicationProcess,
317 StorageInterface* storage,
318 int64_t prngSeed)
319 : _service(service),
320 _settings(settings),
321 _replMode(getReplicationModeFromSettings(settings)),
322 _topCoord(std::move(topCoord)),
323 _replExecutor(std::move(executor)),
324 _externalState(std::move(externalState)),
325 _inShutdown(false),
326 _memberState(MemberState::RS_STARTUP),
327 _rsConfigState(kConfigPreStart),
328 _selfIndex(-1),
329 _sleptLastElection(false),
330 _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())),
331 _canServeNonLocalReads(0U),
332 _replicationProcess(replicationProcess),
333 _storage(storage),
334 _random(prngSeed) {
335
336 _termShadow.store(OpTime::kUninitializedTerm);
337
338 invariant(_service);
339
340 if (!isReplEnabled()) {
341 return;
342 }
343
344 _externalState->setupNoopWriter(Seconds(periodicNoopIntervalSecs));
345 }
346
347 ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() = default;
348
waitForStartUpComplete_forTest()349 void ReplicationCoordinatorImpl::waitForStartUpComplete_forTest() {
350 _waitForStartUpComplete();
351 }
352
_waitForStartUpComplete()353 void ReplicationCoordinatorImpl::_waitForStartUpComplete() {
354 CallbackHandle handle;
355 {
356 stdx::unique_lock<stdx::mutex> lk(_mutex);
357 while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
358 _rsConfigStateChange.wait(lk);
359 }
360 handle = _finishLoadLocalConfigCbh;
361 }
362 if (handle.isValid()) {
363 _replExecutor->wait(handle);
364 }
365 }
366
getReplicaSetConfig_forTest()367 ReplSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() {
368 stdx::lock_guard<stdx::mutex> lk(_mutex);
369 return _rsConfig;
370 }
371
getElectionTimeout_forTest() const372 Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const {
373 stdx::lock_guard<stdx::mutex> lk(_mutex);
374 if (!_handleElectionTimeoutCbh.isValid()) {
375 return Date_t();
376 }
377 return _handleElectionTimeoutWhen;
378 }
379
getRandomizedElectionOffset_forTest()380 Milliseconds ReplicationCoordinatorImpl::getRandomizedElectionOffset_forTest() {
381 stdx::lock_guard<stdx::mutex> lk(_mutex);
382 return _getRandomizedElectionOffset_inlock();
383 }
384
getPriorityTakeover_forTest() const385 boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const {
386 stdx::lock_guard<stdx::mutex> lk(_mutex);
387 if (!_priorityTakeoverCbh.isValid()) {
388 return boost::none;
389 }
390 return _priorityTakeoverWhen;
391 }
392
getCatchupTakeover_forTest() const393 boost::optional<Date_t> ReplicationCoordinatorImpl::getCatchupTakeover_forTest() const {
394 stdx::lock_guard<stdx::mutex> lk(_mutex);
395 if (!_catchupTakeoverCbh.isValid()) {
396 return boost::none;
397 }
398 return _catchupTakeoverWhen;
399 }
400
getCatchupTakeoverCbh_forTest() const401 executor::TaskExecutor::CallbackHandle ReplicationCoordinatorImpl::getCatchupTakeoverCbh_forTest()
402 const {
403 return _catchupTakeoverCbh;
404 }
405
getCurrentCommittedSnapshotOpTime() const406 OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
407 stdx::lock_guard<stdx::mutex> lk(_mutex);
408 return _getCurrentCommittedSnapshotOpTime_inlock();
409 }
410
_getCurrentCommittedSnapshotOpTime_inlock() const411 OpTime ReplicationCoordinatorImpl::_getCurrentCommittedSnapshotOpTime_inlock() const {
412 if (_currentCommittedSnapshot) {
413 return _currentCommittedSnapshot.get();
414 }
415 return OpTime();
416 }
417
_getCurrentCommittedLogicalTime_inlock() const418 LogicalTime ReplicationCoordinatorImpl::_getCurrentCommittedLogicalTime_inlock() const {
419 return LogicalTime(_getCurrentCommittedSnapshotOpTime_inlock().getTimestamp());
420 }
421
appendDiagnosticBSON(mongo::BSONObjBuilder * bob)422 void ReplicationCoordinatorImpl::appendDiagnosticBSON(mongo::BSONObjBuilder* bob) {
423 BSONObjBuilder eBuilder(bob->subobjStart("executor"));
424 _replExecutor->appendDiagnosticBSON(&eBuilder);
425 }
426
appendConnectionStats(executor::ConnectionPoolStats * stats) const427 void ReplicationCoordinatorImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) const {
428 _replExecutor->appendConnectionStats(stats);
429 }
430
_startLoadLocalConfig(OperationContext * opCtx)431 bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) {
432 _replicationProcess->getConsistencyMarkers()->initializeMinValidDocument(opCtx);
433
434 fassert(51240, _externalState->createLocalLastVoteCollection(opCtx));
435
436 StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(opCtx);
437 if (!lastVote.isOK()) {
438 severe() << "Error loading local voted for document at startup; " << lastVote.getStatus();
439 fassertFailedNoTrace(40367);
440 }
441 if (lastVote.getValue().getTerm() == OpTime::kInitialTerm) {
442 // This log line is checked in unit tests.
443 log() << "Did not find local initialized voted for document at startup.";
444 }
445 {
446 stdx::lock_guard<stdx::mutex> lk(_mutex);
447 _topCoord->loadLastVote(lastVote.getValue());
448 }
449
450 // Check that we have a local Rollback ID. If we do not have one, create one.
451 auto status = _replicationProcess->refreshRollbackID(opCtx);
452 if (!status.isOK()) {
453 if (status == ErrorCodes::NamespaceNotFound) {
454 log() << "Did not find local Rollback ID document at startup. Creating one.";
455 auto initializingStatus = _replicationProcess->initializeRollbackID(opCtx);
456 fassertStatusOK(40424, initializingStatus);
457 } else {
458 severe() << "Error loading local Rollback ID document at startup; " << status;
459 fassertFailedNoTrace(40428);
460 }
461 }
462
463 StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(opCtx);
464 if (!cfg.isOK()) {
465 log() << "Did not find local replica set configuration document at startup; "
466 << cfg.getStatus();
467 return true;
468 }
469 ReplSetConfig localConfig;
470 status = localConfig.initialize(cfg.getValue());
471 if (!status.isOK()) {
472 error() << "Locally stored replica set configuration does not parse; See "
473 "http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config "
474 "for information on how to recover from this. Got \""
475 << status << "\" while parsing " << cfg.getValue();
476 fassertFailedNoTrace(28545);
477 }
478
479 // Read the last op from the oplog after cleaning up any partially applied batches.
480 _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx);
481 auto lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
482
483 // Use a callback here, because _finishLoadLocalConfig calls isself() which requires
484 // that the server's networking layer be up and running and accepting connections, which
485 // doesn't happen until startReplication finishes.
486 auto handle =
487 _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig,
488 this,
489 stdx::placeholders::_1,
490 localConfig,
491 lastOpTimeStatus,
492 lastVote));
493 if (handle == ErrorCodes::ShutdownInProgress) {
494 handle = CallbackHandle{};
495 }
496 fassertStatusOK(40446, handle);
497 stdx::lock_guard<stdx::mutex> lk(_mutex);
498 _finishLoadLocalConfigCbh = std::move(handle.getValue());
499
500 return false;
501 }
502
_finishLoadLocalConfig(const executor::TaskExecutor::CallbackArgs & cbData,const ReplSetConfig & localConfig,const StatusWith<OpTime> & lastOpTimeStatus,const StatusWith<LastVote> & lastVoteStatus)503 void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
504 const executor::TaskExecutor::CallbackArgs& cbData,
505 const ReplSetConfig& localConfig,
506 const StatusWith<OpTime>& lastOpTimeStatus,
507 const StatusWith<LastVote>& lastVoteStatus) {
508 if (!cbData.status.isOK()) {
509 LOG(1) << "Loading local replica set configuration failed due to " << cbData.status;
510 return;
511 }
512
513 StatusWith<int> myIndex =
514 validateConfigForStartUp(_externalState.get(), localConfig, getServiceContext());
515 if (!myIndex.isOK()) {
516 if (myIndex.getStatus() == ErrorCodes::NodeNotFound ||
517 myIndex.getStatus() == ErrorCodes::DuplicateKey) {
518 warning() << "Locally stored replica set configuration does not have a valid entry "
519 "for the current node; waiting for reconfig or remote heartbeat; Got \""
520 << myIndex.getStatus() << "\" while validating " << localConfig.toBSON();
521 myIndex = StatusWith<int>(-1);
522 } else {
523 error() << "Locally stored replica set configuration is invalid; See "
524 "http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config"
525 " for information on how to recover from this. Got \""
526 << myIndex.getStatus() << "\" while validating " << localConfig.toBSON();
527 fassertFailedNoTrace(28544);
528 }
529 }
530
531 if (localConfig.getReplSetName() != _settings.ourSetName()) {
532 warning() << "Local replica set configuration document reports set name of "
533 << localConfig.getReplSetName() << ", but command line reports "
534 << _settings.ourSetName() << "; waiting for reconfig or remote heartbeat";
535 myIndex = StatusWith<int>(-1);
536 }
537
538 if (serverGlobalParams.enableMajorityReadConcern && localConfig.getNumMembers() == 3 &&
539 localConfig.getNumDataBearingMembers() == 2) {
540 log() << startupWarningsLog;
541 log() << "** WARNING: This replica set has a Primary-Secondary-Arbiter architecture, but "
542 "readConcern:majority is enabled "
543 << startupWarningsLog;
544 log() << "** for this node. This is not a recommended configuration. Please see "
545 << startupWarningsLog;
546 log() << "** https://dochub.mongodb.org/core/psa-disable-rc-majority-3.6"
547 << startupWarningsLog;
548 log() << startupWarningsLog;
549 }
550
551 // Do not check optime, if this node is an arbiter.
552 bool isArbiter =
553 myIndex.getValue() != -1 && localConfig.getMemberAt(myIndex.getValue()).isArbiter();
554 OpTime lastOpTime;
555 if (!isArbiter) {
556 if (!lastOpTimeStatus.isOK()) {
557 warning() << "Failed to load timestamp of most recently applied operation: "
558 << lastOpTimeStatus.getStatus();
559 } else {
560 lastOpTime = lastOpTimeStatus.getValue();
561 }
562 } else {
563 // The node is an arbiter hence will not need logical clock for external operations.
564 LogicalClock::get(getServiceContext())->setEnabled(false);
565 auto validator = LogicalTimeValidator::get(getServiceContext());
566 if (validator) {
567 validator->stopKeyManager();
568 }
569 }
570
571 long long term = OpTime::kUninitializedTerm;
572 if (localConfig.getProtocolVersion() == 1) {
573 // Restore the current term according to the terms of last oplog entry and last vote.
574 // The initial term of OpTime() is 0.
575 term = lastOpTime.getTerm();
576 if (lastVoteStatus.isOK()) {
577 long long lastVoteTerm = lastVoteStatus.getValue().getTerm();
578 if (term < lastVoteTerm) {
579 term = lastVoteTerm;
580 }
581 }
582 }
583
584 auto opCtx = cc().makeOperationContext();
585 auto consistency = DataConsistency::Inconsistent;
586 if (!lastOpTime.isNull()) {
587
588 // If we have an oplog, it is still possible that our data is not in a consistent state. For
589 // example, if we are starting up after a crash following a post-rollback RECOVERING state.
590 // To detect this, we see if our last optime is >= the 'minValid' optime, which
591 // should be persistent across node crashes.
592 OpTime minValid = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
593 consistency =
594 (lastOpTime >= minValid) ? DataConsistency::Consistent : DataConsistency::Inconsistent;
595 }
596
597 stdx::unique_lock<stdx::mutex> lock(_mutex);
598 invariant(_rsConfigState == kConfigStartingUp);
599 const PostMemberStateUpdateAction action =
600 _setCurrentRSConfig_inlock(opCtx.get(), localConfig, myIndex.getValue());
601
602 // Set our last applied and durable optimes to the top of the oplog, if we have one.
603 if (!lastOpTime.isNull()) {
604 bool isRollbackAllowed = false;
605 _setMyLastAppliedOpTime_inlock(lastOpTime, isRollbackAllowed, consistency);
606 _setMyLastDurableOpTime_inlock(lastOpTime, isRollbackAllowed);
607 _reportUpstream_inlock(std::move(lock)); // unlocks _mutex.
608 } else {
609 lock.unlock();
610 }
611
612 _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp());
613 {
614 stdx::lock_guard<stdx::mutex> lk(_mutex);
615 // Step down is impossible, so we don't need to wait for the returned event.
616 _updateTerm_inlock(term);
617 }
618 LOG(1) << "Current term is now " << term;
619 if (action == kActionWinElection) {
620 stdx::lock_guard<stdx::mutex> lk(_mutex);
621 _postWonElectionUpdateMemberState_inlock();
622 } else {
623 _performPostMemberStateUpdateAction(action);
624 }
625
626 if (!isArbiter && myIndex.getValue() != -1) {
627 _externalState->startThreads(_settings);
628 _startDataReplication(opCtx.get());
629 }
630 }
631
_stopDataReplication(OperationContext * opCtx)632 void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* opCtx) {
633 std::shared_ptr<InitialSyncer> initialSyncerCopy;
634 {
635 stdx::lock_guard<stdx::mutex> lk(_mutex);
636 if (!_startedSteadyStateReplication) {
637 return;
638 }
639
640 _startedSteadyStateReplication = false;
641 _initialSyncer.swap(initialSyncerCopy);
642 }
643 if (initialSyncerCopy) {
644 LOG(1)
645 << "ReplicationCoordinatorImpl::_stopDataReplication calling InitialSyncer::shutdown.";
646 const auto status = initialSyncerCopy->shutdown();
647 if (!status.isOK()) {
648 warning() << "InitialSyncer shutdown failed: " << status;
649 }
650 initialSyncerCopy.reset();
651 // Do not return here, fall through.
652 }
653 LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling "
654 "ReplCoordExtState::stopDataReplication.";
655 _externalState->stopDataReplication(opCtx);
656 }
657
_startDataReplication(OperationContext * opCtx,stdx::function<void ()> startCompleted)658 void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
659 stdx::function<void()> startCompleted) {
660 stdx::unique_lock<stdx::mutex> lk(_mutex);
661 if (_startedSteadyStateReplication) {
662 return;
663 }
664
665 _startedSteadyStateReplication = true;
666
667 // Check to see if we need to do an initial sync.
668 const auto lastOpTime = _getMyLastAppliedOpTime_inlock();
669 const auto needsInitialSync =
670 lastOpTime.isNull() || _externalState->isInitialSyncFlagSet(opCtx);
671 if (!needsInitialSync) {
672 // Start steady replication, since we already have data.
673 // ReplSetConfig has been installed, so it's either in STARTUP2 or REMOVED.
674 auto memberState = _getMemberState_inlock();
675 invariant(memberState.startup2() || memberState.removed());
676
677 lk.unlock();
678 invariantOK(setFollowerMode(MemberState::RS_RECOVERING));
679 _externalState->startSteadyStateReplication(opCtx, this);
680 return;
681 }
682
683 // Do initial sync.
684 if (!_externalState->getTaskExecutor()) {
685 log() << "not running initial sync during test.";
686 return;
687 }
688
689 auto onCompletion = [this, startCompleted](const StatusWith<OpTimeWithHash>& status) {
690 {
691 stdx::lock_guard<stdx::mutex> lock(_mutex);
692 if (status == ErrorCodes::CallbackCanceled) {
693 log() << "Initial Sync has been cancelled: " << status.getStatus();
694 return;
695 } else if (!status.isOK()) {
696 if (_inShutdown) {
697 log() << "Initial Sync failed during shutdown due to " << status.getStatus();
698 return;
699 } else {
700 error() << "Initial sync failed, shutting down now. Restart the server "
701 "to attempt a new initial sync.";
702 fassertFailedWithStatusNoTrace(40088, status.getStatus());
703 }
704 }
705
706 const auto lastApplied = status.getValue();
707 _setMyLastAppliedOpTime_inlock(lastApplied.opTime, false, DataConsistency::Consistent);
708 }
709
710 // Clear maint. mode.
711 while (getMaintenanceMode()) {
712 setMaintenanceMode(false).transitional_ignore();
713 }
714
715 if (startCompleted) {
716 startCompleted();
717 }
718 // Repair local db (to compact it).
719 auto opCtxHolder = cc().makeOperationContext();
720 uassertStatusOK(_externalState->runRepairOnLocalDB(opCtxHolder.get()));
721 // Because initial sync completed, we can only be in STARTUP2, not REMOVED.
722 // Transition from STARTUP2 to RECOVERING and start the producer and the applier.
723 invariant(getMemberState().startup2());
724 invariantOK(setFollowerMode(MemberState::RS_RECOVERING));
725 _externalState->startSteadyStateReplication(opCtxHolder.get(), this);
726 };
727
728 std::shared_ptr<InitialSyncer> initialSyncerCopy;
729 try {
730 {
731 initialSyncerCopy = std::make_shared<InitialSyncer>(
732 createInitialSyncerOptions(this, _externalState.get()),
733 stdx::make_unique<DataReplicatorExternalStateInitialSync>(this,
734 _externalState.get()),
735 _storage,
736 _replicationProcess,
737 onCompletion);
738 _initialSyncer = initialSyncerCopy;
739 }
740 // InitialSyncer::startup() must be called outside lock because it uses features (eg.
741 // setting the initial sync flag) which depend on the ReplicationCoordinatorImpl.
742 lk.unlock();
743 uassertStatusOK(initialSyncerCopy->startup(opCtx, numInitialSyncAttempts.load()));
744 } catch (...) {
745 auto status = exceptionToStatus();
746 log() << "Initial Sync failed to start: " << status;
747 if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
748 return;
749 }
750 fassertFailedWithStatusNoTrace(40354, status);
751 }
752 }
753
startup(OperationContext * opCtx)754 void ReplicationCoordinatorImpl::startup(OperationContext* opCtx) {
755 if (!isReplEnabled()) {
756 stdx::lock_guard<stdx::mutex> lk(_mutex);
757 _setConfigState_inlock(kConfigReplicationDisabled);
758 return;
759 }
760
761 {
762 OID rid = _externalState->ensureMe(opCtx);
763
764 stdx::lock_guard<stdx::mutex> lk(_mutex);
765 fassert(18822, !_inShutdown);
766 _setConfigState_inlock(kConfigStartingUp);
767 _myRID = rid;
768 _topCoord->getMyMemberData()->setRid(rid);
769 }
770
771 if (!_settings.usingReplSets()) {
772 // Must be Master/Slave
773 invariant(_settings.isMaster() || _settings.isSlave());
774 _externalState->startMasterSlave(opCtx);
775 return;
776 }
777
778 _replExecutor->startup();
779
780 {
781 stdx::lock_guard<stdx::mutex> lk(_mutex);
782 _topCoord->setStorageEngineSupportsReadCommitted(
783 _externalState->isReadCommittedSupportedByStorageEngine(opCtx));
784 }
785
786 bool doneLoadingConfig = _startLoadLocalConfig(opCtx);
787 if (doneLoadingConfig) {
788 // If we're not done loading the config, then the config state will be set by
789 // _finishLoadLocalConfig.
790 stdx::lock_guard<stdx::mutex> lk(_mutex);
791 invariant(!_rsConfig.isInitialized());
792 _setConfigState_inlock(kConfigUninitialized);
793 }
794 }
795
enterTerminalShutdown()796 void ReplicationCoordinatorImpl::enterTerminalShutdown() {
797 stdx::lock_guard<stdx::mutex> lk(_mutex);
798 _inTerminalShutdown = true;
799 }
800
shutdown(OperationContext * opCtx)801 void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
802 // Shutdown must:
803 // * prevent new threads from blocking in awaitReplication
804 // * wake up all existing threads blocking in awaitReplication
805 // * Shut down and join the execution resources it owns.
806
807 if (!_settings.usingReplSets()) {
808 return;
809 }
810
811 log() << "shutting down replication subsystems";
812
813 // Used to shut down outside of the lock.
814 std::shared_ptr<InitialSyncer> initialSyncerCopy;
815 {
816 stdx::unique_lock<stdx::mutex> lk(_mutex);
817 fassert(28533, !_inShutdown);
818 _inShutdown = true;
819 if (_rsConfigState == kConfigPreStart) {
820 warning() << "ReplicationCoordinatorImpl::shutdown() called before "
821 "startup() finished. Shutting down without cleaning up the "
822 "replication system";
823 return;
824 }
825 if (_rsConfigState == kConfigStartingUp) {
826 // Wait until we are finished starting up, so that we can cleanly shut everything down.
827 lk.unlock();
828 _waitForStartUpComplete();
829 lk.lock();
830 fassert(18823, _rsConfigState != kConfigStartingUp);
831 }
832 _replicationWaiterList.signalAndRemoveAll_inlock();
833 _opTimeWaiterList.signalAndRemoveAll_inlock();
834 _currentCommittedSnapshotCond.notify_all();
835 _initialSyncer.swap(initialSyncerCopy);
836 _stepDownWaiters.notify_all();
837 }
838
839
840 // joining the replication executor is blocking so it must be run outside of the mutex
841 if (initialSyncerCopy) {
842 LOG(1) << "ReplicationCoordinatorImpl::shutdown calling InitialSyncer::shutdown.";
843 const auto status = initialSyncerCopy->shutdown();
844 if (!status.isOK()) {
845 warning() << "InitialSyncer shutdown failed: " << status;
846 }
847 initialSyncerCopy->join();
848 initialSyncerCopy.reset();
849 }
850 _externalState->shutdown(opCtx);
851 _replExecutor->shutdown();
852 _replExecutor->join();
853 }
854
getSettings() const855 const ReplSettings& ReplicationCoordinatorImpl::getSettings() const {
856 return _settings;
857 }
858
getReplicationMode() const859 ReplicationCoordinator::Mode ReplicationCoordinatorImpl::getReplicationMode() const {
860 return _replMode;
861 }
862
getMemberState() const863 MemberState ReplicationCoordinatorImpl::getMemberState() const {
864 stdx::lock_guard<stdx::mutex> lk(_mutex);
865 return _getMemberState_inlock();
866 }
867
_getMemberState_inlock() const868 MemberState ReplicationCoordinatorImpl::_getMemberState_inlock() const {
869 return _memberState;
870 }
871
waitForMemberState(MemberState expectedState,Milliseconds timeout)872 Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState,
873 Milliseconds timeout) {
874 if (timeout < Milliseconds(0)) {
875 return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative");
876 }
877
878 stdx::unique_lock<stdx::mutex> lk(_mutex);
879 auto pred = [this, expectedState]() { return _memberState == expectedState; };
880 if (!_memberStateChange.wait_for(lk, timeout.toSystemDuration(), pred)) {
881 return Status(ErrorCodes::ExceededTimeLimit,
882 str::stream() << "Timed out waiting for state to become "
883 << expectedState.toString()
884 << ". Current state is "
885 << _memberState.toString());
886 }
887 return Status::OK();
888 }
889
getSlaveDelaySecs() const890 Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
891 stdx::lock_guard<stdx::mutex> lk(_mutex);
892 invariant(_rsConfig.isInitialized());
893 if (_selfIndex == -1) {
894 // We aren't currently in the set. Return 0 seconds so we can clear out the applier's
895 // queue of work.
896 return Seconds(0);
897 }
898 return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay();
899 }
900
clearSyncSourceBlacklist()901 void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() {
902 stdx::lock_guard<stdx::mutex> lk(_mutex);
903 _topCoord->clearSyncSourceBlacklist();
904 }
905
setFollowerMode(const MemberState & newState)906 Status ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) {
907 stdx::unique_lock<stdx::mutex> lk(_mutex);
908 if (newState == _topCoord->getMemberState()) {
909 return Status::OK();
910 }
911 if (_topCoord->getRole() == TopologyCoordinator::Role::kLeader) {
912 return Status(ErrorCodes::NotSecondary,
913 "Cannot set follower mode when node is currently the leader");
914 }
915
916 if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
917 // We were a candidate, which means _topCoord believed us to be in state RS_SECONDARY, and
918 // we know that newState != RS_SECONDARY because we would have returned early, above if
919 // the old and new state were equal. So, try again after the election is over to
920 // finish setting the follower mode. We cannot wait for the election to finish here as we
921 // may be holding a global X lock, so we return a bad status and rely on the caller to
922 // retry.
923 return Status(ErrorCodes::ElectionInProgress,
924 str::stream() << "Cannot set follower mode to " << newState.toString()
925 << " because we are in the middle of running an election");
926 }
927
928 _topCoord->setFollowerMode(newState.s);
929
930 const PostMemberStateUpdateAction action =
931 _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
932
933 if (action == kActionWinElection) {
934 _postWonElectionUpdateMemberState_inlock();
935 } else {
936 lk.unlock();
937 _performPostMemberStateUpdateAction(action);
938 }
939
940 return Status::OK();
941 }
942
getApplierState()943 ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() {
944 stdx::lock_guard<stdx::mutex> lk(_mutex);
945 return _applierState;
946 }
947
signalDrainComplete(OperationContext * opCtx,long long termWhenBufferIsEmpty)948 void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
949 long long termWhenBufferIsEmpty) {
950 // This logic is a little complicated in order to avoid acquiring the global exclusive lock
951 // unnecessarily. This is important because the applier may call signalDrainComplete()
952 // whenever it wants, not only when the ReplicationCoordinator is expecting it.
953 //
954 // The steps are:
955 // 1.) Check to see if we're waiting for this signal. If not, return early.
956 // 2.) Otherwise, release the mutex while acquiring the global exclusive lock,
957 // since that might take a while (NB there's a deadlock cycle otherwise, too).
958 // 3.) Re-check to see if we've somehow left drain mode. If we have not, clear
959 // producer and applier's states, set the flag allowing non-local database writes and
960 // drop the mutex. At this point, no writes can occur from other threads, due to the
961 // global exclusive lock.
962 // 4.) Drop all temp collections, and log the drops to the oplog.
963 // 5.) Log transition to primary in the oplog and set that OpTime as the floor for what we will
964 // consider to be committed.
965 // 6.) Drop the global exclusive lock.
966 //
967 // Because replicatable writes are forbidden while in drain mode, and we don't exit drain
968 // mode until we have the global exclusive lock, which forbids all other threads from making
969 // writes, we know that from the time that _canAcceptNonLocalWrites is set until
970 // this method returns, no external writes will be processed. This is important so that a new
971 // temp collection isn't introduced on the new primary before we drop all the temp collections.
972
973 // When we go to drop all temp collections, we must replicate the drops.
974 invariant(opCtx->writesAreReplicated());
975
976 stdx::unique_lock<stdx::mutex> lk(_mutex);
977 if (_applierState != ApplierState::Draining) {
978 return;
979 }
980 lk.unlock();
981
982 _externalState->onDrainComplete(opCtx);
983
984 if (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) {
985 log() << "transition to primary - "
986 "transitionToPrimaryHangBeforeTakingGlobalExclusiveLock fail point enabled. "
987 "Blocking until fail point is disabled.";
988 while (MONGO_FAIL_POINT(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock)) {
989 mongo::sleepsecs(1);
990 {
991 stdx::lock_guard<stdx::mutex> lock(_mutex);
992 if (_inShutdown) {
993 break;
994 }
995 }
996 }
997 }
998
999 Lock::GlobalWrite globalWriteLock(opCtx);
1000 lk.lock();
1001
1002 // Exit drain mode only if we're actually in draining mode, the apply buffer is empty in the
1003 // current term, and we're allowed to become the write master.
1004 if (_applierState != ApplierState::Draining ||
1005 !_topCoord->canCompleteTransitionToPrimary(termWhenBufferIsEmpty)) {
1006 return;
1007 }
1008 _applierState = ApplierState::Stopped;
1009
1010 invariant(_getMemberState_inlock().primary());
1011 invariant(!_canAcceptNonLocalWrites.loadRelaxed());
1012
1013 {
1014 lk.unlock();
1015 AllowNonLocalWritesBlock writesAllowed(opCtx);
1016 OpTime firstOpTime = _externalState->onTransitionToPrimary(opCtx, isV1ElectionProtocol());
1017 lk.lock();
1018
1019 auto status = _topCoord->completeTransitionToPrimary(firstOpTime);
1020 if (status.code() == ErrorCodes::PrimarySteppedDown) {
1021 log() << "Transition to primary failed" << causedBy(status);
1022 return;
1023 }
1024 invariantOK(status);
1025 }
1026
1027 // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged
1028 // our election in onTransitionToPrimary(), above.
1029 _updateLastCommittedOpTime_inlock();
1030
1031 // Update _canAcceptNonLocalWrites
1032 _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
1033
1034 log() << "transition to primary complete; database writes are now permitted" << rsLog;
1035 _drainFinishedCond.notify_all();
1036 _externalState->startNoopWriter(_getMyLastAppliedOpTime_inlock());
1037 }
1038
waitForDrainFinish(Milliseconds timeout)1039 Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
1040 if (timeout < Milliseconds(0)) {
1041 return Status(ErrorCodes::BadValue, "Timeout duration cannot be negative");
1042 }
1043
1044 stdx::unique_lock<stdx::mutex> lk(_mutex);
1045 auto pred = [this]() { return _applierState != ApplierState::Draining; };
1046 if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
1047 return Status(ErrorCodes::ExceededTimeLimit,
1048 "Timed out waiting to finish draining applier buffer");
1049 }
1050
1051 return Status::OK();
1052 }
1053
signalUpstreamUpdater()1054 void ReplicationCoordinatorImpl::signalUpstreamUpdater() {
1055 _externalState->forwardSlaveProgress();
1056 }
1057
setLastOptimeForSlave(const OID & rid,const Timestamp & ts)1058 Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const Timestamp& ts) {
1059 stdx::unique_lock<stdx::mutex> lock(_mutex);
1060 massert(28576,
1061 "Received an old style replication progress update, which is only used for Master/"
1062 "Slave replication now, but this node is not using Master/Slave replication. "
1063 "This is likely caused by an old (pre-2.6) member syncing from this node.",
1064 getReplicationMode() == modeMasterSlave);
1065
1066 // term == -1 for master-slave
1067 OpTime opTime(ts, OpTime::kUninitializedTerm);
1068 MemberData* memberData = _topCoord->findMemberDataByRid(rid);
1069 if (memberData) {
1070 memberData->advanceLastAppliedOpTime(opTime, _replExecutor->now());
1071 } else {
1072 auto* memberData = _topCoord->addSlaveMemberData(rid);
1073 memberData->setLastAppliedOpTime(opTime, _replExecutor->now());
1074 }
1075 _updateLastCommittedOpTime_inlock();
1076 return Status::OK();
1077 }
1078
setMyHeartbeatMessage(const std::string & msg)1079 void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
1080 stdx::unique_lock<stdx::mutex> lock(_mutex);
1081 _topCoord->setMyHeartbeatMessage(_replExecutor->now(), msg);
1082 }
1083
setMyLastAppliedOpTimeForward(const OpTime & opTime,DataConsistency consistency)1084 void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime,
1085 DataConsistency consistency) {
1086 stdx::unique_lock<stdx::mutex> lock(_mutex);
1087 auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
1088 if (opTime > myLastAppliedOpTime) {
1089 _setMyLastAppliedOpTime_inlock(opTime, false, consistency);
1090 _reportUpstream_inlock(std::move(lock));
1091 } else {
1092 if (opTime != myLastAppliedOpTime) {
1093 // In pv1, oplog entries are ordered by non-decreasing term and strictly increasing
1094 // timestamp. So, in pv1, its not possible for us to get opTime with lower term and
1095 // timestamp higher than or equal to our current lastAppliedOptime.
1096 invariant(opTime.getTerm() == OpTime::kUninitializedTerm ||
1097 myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
1098 opTime.getTimestamp() < myLastAppliedOpTime.getTimestamp());
1099 }
1100
1101 if (consistency == DataConsistency::Consistent && _canAcceptNonLocalWrites.loadRelaxed() &&
1102 _rsConfig.getWriteMajority() == 1) {
1103 // Single vote primaries may have a lagged stable timestamp due to paring back the
1104 // stable timestamp to the all committed timestamp.
1105 _setStableTimestampForStorage_inlock();
1106 }
1107 }
1108 }
1109
setMyLastDurableOpTimeForward(const OpTime & opTime)1110 void ReplicationCoordinatorImpl::setMyLastDurableOpTimeForward(const OpTime& opTime) {
1111 stdx::unique_lock<stdx::mutex> lock(_mutex);
1112 if (opTime > _getMyLastDurableOpTime_inlock()) {
1113 _setMyLastDurableOpTime_inlock(opTime, false);
1114 _reportUpstream_inlock(std::move(lock));
1115 }
1116 }
1117
setMyLastAppliedOpTime(const OpTime & opTime)1118 void ReplicationCoordinatorImpl::setMyLastAppliedOpTime(const OpTime& opTime) {
1119 stdx::unique_lock<stdx::mutex> lock(_mutex);
1120 // The optime passed to this function is required to represent a consistent database state.
1121 _setMyLastAppliedOpTime_inlock(opTime, false, DataConsistency::Consistent);
1122 _reportUpstream_inlock(std::move(lock));
1123 }
1124
setMyLastDurableOpTime(const OpTime & opTime)1125 void ReplicationCoordinatorImpl::setMyLastDurableOpTime(const OpTime& opTime) {
1126 stdx::unique_lock<stdx::mutex> lock(_mutex);
1127 _setMyLastDurableOpTime_inlock(opTime, false);
1128 _reportUpstream_inlock(std::move(lock));
1129 }
1130
resetMyLastOpTimes()1131 void ReplicationCoordinatorImpl::resetMyLastOpTimes() {
1132 stdx::unique_lock<stdx::mutex> lock(_mutex);
1133 _resetMyLastOpTimes_inlock();
1134 _reportUpstream_inlock(std::move(lock));
1135 }
1136
_resetMyLastOpTimes_inlock()1137 void ReplicationCoordinatorImpl::_resetMyLastOpTimes_inlock() {
1138 LOG(1) << "resetting durable/applied optimes.";
1139 // Reset to uninitialized OpTime
1140 bool isRollbackAllowed = true;
1141 _setMyLastAppliedOpTime_inlock(OpTime(), isRollbackAllowed, DataConsistency::Inconsistent);
1142 _setMyLastDurableOpTime_inlock(OpTime(), isRollbackAllowed);
1143 _stableOpTimeCandidates.clear();
1144 }
1145
_reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock)1146 void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock) {
1147 invariant(lock.owns_lock());
1148
1149 if (getReplicationMode() != modeReplSet) {
1150 return;
1151 }
1152
1153 if (_getMemberState_inlock().primary()) {
1154 return;
1155 }
1156
1157 lock.unlock();
1158
1159 _externalState->forwardSlaveProgress(); // Must do this outside _mutex
1160 }
1161
_setMyLastAppliedOpTime_inlock(const OpTime & opTime,bool isRollbackAllowed,DataConsistency consistency)1162 void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& opTime,
1163 bool isRollbackAllowed,
1164 DataConsistency consistency) {
1165 auto* myMemberData = _topCoord->getMyMemberData();
1166 auto myLastAppliedOpTime = myMemberData->getLastAppliedOpTime();
1167
1168 if (!(isRollbackAllowed || opTime == myLastAppliedOpTime)) {
1169 invariant(opTime > myLastAppliedOpTime);
1170 // In pv1, oplog entries are ordered by non-decreasing term and strictly increasing
1171 // timestamp. So, in pv1, its not possible for us to get opTime with higher term and
1172 // timestamp lesser than or equal to our current lastAppliedOptime.
1173 invariant(opTime.getTerm() == OpTime::kUninitializedTerm ||
1174 myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
1175 opTime.getTimestamp() > myLastAppliedOpTime.getTimestamp());
1176 }
1177 myMemberData->setLastAppliedOpTime(opTime, _replExecutor->now());
1178 // If we are using applied times to calculate the commit level, update it now.
1179 if (!_rsConfig.getWriteConcernMajorityShouldJournal()) {
1180 _updateLastCommittedOpTime_inlock();
1181 }
1182
1183 // Signal anyone waiting on optime changes.
1184 _opTimeWaiterList.signalAndRemoveIf_inlock(
1185 [opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
1186
1187
1188 // Note that master-slave mode has no automatic fail over, and so rollbacks never occur.
1189 // Additionally, the commit point for a master-slave set will never advance, since it doesn't
1190 // use any consensus protocol. Since the set of stable optime candidates can only get cleaned up
1191 // when the commit point advances, we should refrain from updating stable optime candidates in
1192 // master-slave mode, to avoid the candidates list from growing unbounded.
1193 if (opTime.isNull() || getReplicationMode() != Mode::modeReplSet) {
1194 return;
1195 }
1196
1197 // On PV0 secondaries, we do not track the commit point, so we do not use the commit point to
1198 // set the stable timestamp. Instead, we always set the stable timestamp to the last applied
1199 // write, to ensure that timestamp history can be purged. This means we do not need to track
1200 // stable optime candidates (and we should not track them, since we only purge the candidate
1201 // list when setting the stable timestamp due to the commit point advancing).
1202 const bool opTimeIsStableCandidate = isV1ElectionProtocol() || _memberState.primary();
1203
1204 // Add the new applied optime to the list of stable optime candidates and then set the last
1205 // stable optime. Stable optimes are used to determine the last optime that it is safe to revert
1206 // the database to, in the event of a rollback via the 'recover to timestamp' method. If we are
1207 // setting our applied optime to a value that doesn't represent a consistent database state, we
1208 // should not add it to the set of stable optime candidates. For example, if we are in
1209 // RECOVERING after a rollback using the 'rollbackViaRefetch' algorithm, we will be inconsistent
1210 // until we reach the 'minValid' optime.
1211 if (consistency == DataConsistency::Consistent) {
1212 invariant(opTime.getTimestamp().getInc() > 0,
1213 str::stream() << "Impossible optime received: " << opTime.toString());
1214 if (opTimeIsStableCandidate) {
1215 _stableOpTimeCandidates.insert(opTime);
1216 }
1217 // If we are lagged behind the commit optime, set a new stable timestamp here.
1218 if (opTime <= _topCoord->getLastCommittedOpTime()) {
1219 _setStableTimestampForStorage_inlock();
1220 }
1221 } else if (_getMemberState_inlock().startup2()) {
1222 // The oplog application phase of initial sync starts timestamping writes, causing
1223 // WiredTiger to pin this data in memory. Advancing the oldest timestamp in step with the
1224 // last applied optime here will permit WiredTiger to evict this data as it sees fit.
1225 _service->getGlobalStorageEngine()->setOldestTimestamp(opTime.getTimestamp());
1226 }
1227 if (!opTimeIsStableCandidate) {
1228 auto storageEngine = _service->getGlobalStorageEngine();
1229 if (storageEngine) {
1230 auto newOldestTimestamp = opTime.getTimestamp();
1231 if (!newOldestTimestamp.isNull()) {
1232 _storage->setStableTimestamp(getServiceContext(), newOldestTimestamp);
1233 }
1234 }
1235 }
1236 }
1237
_setMyLastDurableOpTime_inlock(const OpTime & opTime,bool isRollbackAllowed)1238 void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
1239 bool isRollbackAllowed) {
1240 auto* myMemberData = _topCoord->getMyMemberData();
1241 invariant(isRollbackAllowed || myMemberData->getLastDurableOpTime() <= opTime);
1242 myMemberData->setLastDurableOpTime(opTime, _replExecutor->now());
1243 // If we are using durable times to calculate the commit level, update it now.
1244 if (_rsConfig.getWriteConcernMajorityShouldJournal()) {
1245 _updateLastCommittedOpTime_inlock();
1246 }
1247 }
1248
getMyLastAppliedOpTime() const1249 OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const {
1250 stdx::lock_guard<stdx::mutex> lock(_mutex);
1251 return _getMyLastAppliedOpTime_inlock();
1252 }
1253
getMyLastDurableOpTime() const1254 OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
1255 stdx::lock_guard<stdx::mutex> lock(_mutex);
1256 return _getMyLastDurableOpTime_inlock();
1257 }
1258
_validateReadConcern(OperationContext * opCtx,const ReadConcernArgs & readConcern)1259 Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
1260 const ReadConcernArgs& readConcern) {
1261 // We should never wait for replication if we are holding any locks, because this can
1262 // potentially block for long time while doing network activity.
1263 if (opCtx->lockState()->isLocked()) {
1264 return {ErrorCodes::IllegalOperation,
1265 "Waiting for replication not allowed while holding a lock"};
1266 }
1267
1268 if (readConcern.getArgsClusterTime() &&
1269 readConcern.getLevel() != ReadConcernLevel::kMajorityReadConcern &&
1270 readConcern.getLevel() != ReadConcernLevel::kLocalReadConcern) {
1271 return {ErrorCodes::BadValue,
1272 "Only readConcern level 'majority' or 'local' is allowed when specifying "
1273 "afterClusterTime"};
1274 }
1275
1276 if (readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern &&
1277 !_externalState->isReadCommittedSupportedByStorageEngine(opCtx)) {
1278 return {ErrorCodes::ReadConcernMajorityNotEnabled,
1279 "Majority read concern requested, but it is not supported by the storage engine."};
1280 }
1281
1282 return Status::OK();
1283 }
1284
waitUntilOpTimeForRead(OperationContext * opCtx,const ReadConcernArgs & readConcern)1285 Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* opCtx,
1286 const ReadConcernArgs& readConcern) {
1287 auto verifyStatus = _validateReadConcern(opCtx, readConcern);
1288 if (!verifyStatus.isOK()) {
1289 return verifyStatus;
1290 }
1291
1292 // nothing to wait for
1293 if (!readConcern.getArgsClusterTime() && !readConcern.getArgsOpTime()) {
1294 return Status::OK();
1295 }
1296
1297 return waitUntilOpTimeForReadUntil(opCtx, readConcern, boost::none);
1298 }
1299
waitUntilOpTimeForReadUntil(OperationContext * opCtx,const ReadConcernArgs & readConcern,boost::optional<Date_t> deadline)1300 Status ReplicationCoordinatorImpl::waitUntilOpTimeForReadUntil(OperationContext* opCtx,
1301 const ReadConcernArgs& readConcern,
1302 boost::optional<Date_t> deadline) {
1303 if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) {
1304 // For master/slave and standalone nodes, readAfterOpTime is not supported, so we return an
1305 // error. However, we consider all writes "committed" and can treat MajorityReadConcern as
1306 // LocalReadConcern, which is immediately satisfied since there is no OpTime to wait for.
1307 return {ErrorCodes::NotAReplicaSet,
1308 "node needs to be a replica set member to use read concern"};
1309 }
1310
1311 if (_rsConfigState == kConfigUninitialized || _rsConfigState == kConfigInitiating) {
1312 return {ErrorCodes::NotYetInitialized,
1313 "Cannot use non-local read concern until replica set is finished initializing."};
1314 }
1315
1316 if (readConcern.getArgsClusterTime()) {
1317 return _waitUntilClusterTimeForRead(opCtx, readConcern, deadline);
1318 } else {
1319 return _waitUntilOpTimeForReadDeprecated(opCtx, readConcern);
1320 }
1321 }
1322
_waitUntilOpTime(OperationContext * opCtx,bool isMajorityReadConcern,OpTime targetOpTime,boost::optional<Date_t> deadline)1323 Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
1324 bool isMajorityReadConcern,
1325 OpTime targetOpTime,
1326 boost::optional<Date_t> deadline) {
1327 if (!isMajorityReadConcern) {
1328 // This assumes the read concern is "local" level.
1329 // We need to wait for all committed writes to be visible, even in the oplog (which uses
1330 // special visibility rules).
1331 _externalState->waitForAllEarlierOplogWritesToBeVisible(opCtx);
1332 }
1333
1334 stdx::unique_lock<stdx::mutex> lock(_mutex);
1335
1336 if (isMajorityReadConcern && !_externalState->snapshotsEnabled()) {
1337 return {ErrorCodes::CommandNotSupported,
1338 "Current storage engine does not support majority readConcerns"};
1339 }
1340
1341 auto getCurrentOpTime = [this, isMajorityReadConcern]() {
1342 return isMajorityReadConcern ? _getCurrentCommittedSnapshotOpTime_inlock()
1343 : _getMyLastAppliedOpTime_inlock();
1344 };
1345
1346 if (isMajorityReadConcern && targetOpTime > getCurrentOpTime()) {
1347 LOG(1) << "waitUntilOpTime: waiting for optime:" << targetOpTime
1348 << " to be in a snapshot -- current snapshot: " << getCurrentOpTime();
1349 }
1350
1351 while (targetOpTime > getCurrentOpTime()) {
1352 if (_inShutdown) {
1353 return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
1354 }
1355
1356 // If we are doing a majority read concern we only need to wait for a new snapshot.
1357 if (isMajorityReadConcern) {
1358 // Wait for a snapshot that meets our needs (< targetOpTime).
1359 LOG(3) << "waitUntilOpTime: waiting for a new snapshot until " << opCtx->getDeadline();
1360
1361 auto waitStatus =
1362 opCtx->waitForConditionOrInterruptNoAssert(_currentCommittedSnapshotCond, lock);
1363 if (!waitStatus.isOK()) {
1364 return waitStatus.withContext(str::stream()
1365 << "Error waiting for snapshot not less than "
1366 << targetOpTime.toString()
1367 << ", current relevant optime is "
1368 << getCurrentOpTime().toString()
1369 << ".");
1370 }
1371 LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
1372 continue;
1373 }
1374
1375 // We just need to wait for the opTime to catch up to what we need (not majority RC).
1376 stdx::condition_variable condVar;
1377 ThreadWaiter waiter(targetOpTime, nullptr, &condVar);
1378 WaiterGuard guard(&_opTimeWaiterList, &waiter);
1379
1380 LOG(3) << "waitUntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
1381 << waiter << " until " << opCtx->getDeadline();
1382
1383 auto waitStatus = Status::OK();
1384 if (deadline) {
1385 auto waitUntilStatus =
1386 opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, lock, *deadline);
1387 waitStatus = waitUntilStatus.getStatus();
1388 } else {
1389 waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
1390 }
1391
1392 if (!waitStatus.isOK()) {
1393 return waitStatus.withContext(str::stream() << "Error waiting for optime "
1394 << targetOpTime.toString()
1395 << ", current relevant optime is "
1396 << getCurrentOpTime().toString()
1397 << ".");
1398 }
1399
1400 // If deadline is set no need to wait until the targetTime time is reached.
1401 if (deadline) {
1402 return Status::OK();
1403 }
1404 }
1405
1406 return Status::OK();
1407 }
1408
_waitUntilClusterTimeForRead(OperationContext * opCtx,const ReadConcernArgs & readConcern,boost::optional<Date_t> deadline)1409 Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext* opCtx,
1410 const ReadConcernArgs& readConcern,
1411 boost::optional<Date_t> deadline) {
1412 auto clusterTime = *readConcern.getArgsClusterTime();
1413 invariant(clusterTime != LogicalTime::kUninitialized);
1414
1415 // convert clusterTime to opTime so it can be used by the _opTimeWaiterList for wait on
1416 // readConcern level local.
1417 auto targetOpTime = OpTime(clusterTime.asTimestamp(), OpTime::kUninitializedTerm);
1418 invariant(!readConcern.getArgsOpTime());
1419
1420 const bool isMajorityReadConcern =
1421 readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
1422
1423 return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime, deadline);
1424 }
1425
1426 // TODO: remove when SERVER-29729 is done
_waitUntilOpTimeForReadDeprecated(OperationContext * opCtx,const ReadConcernArgs & readConcern)1427 Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
1428 OperationContext* opCtx, const ReadConcernArgs& readConcern) {
1429 const bool isMajorityReadConcern =
1430 readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
1431
1432 const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime());
1433 return _waitUntilOpTime(opCtx, isMajorityReadConcern, targetOpTime);
1434 }
1435
_getMyLastAppliedOpTime_inlock() const1436 OpTime ReplicationCoordinatorImpl::_getMyLastAppliedOpTime_inlock() const {
1437 return _topCoord->getMyLastAppliedOpTime();
1438 }
1439
_getMyLastDurableOpTime_inlock() const1440 OpTime ReplicationCoordinatorImpl::_getMyLastDurableOpTime_inlock() const {
1441 return _topCoord->getMyLastDurableOpTime();
1442 }
1443
setLastDurableOptime_forTest(long long cfgVer,long long memberId,const OpTime & opTime)1444 Status ReplicationCoordinatorImpl::setLastDurableOptime_forTest(long long cfgVer,
1445 long long memberId,
1446 const OpTime& opTime) {
1447 stdx::lock_guard<stdx::mutex> lock(_mutex);
1448 invariant(getReplicationMode() == modeReplSet);
1449
1450 const UpdatePositionArgs::UpdateInfo update(OpTime(), opTime, cfgVer, memberId);
1451 long long configVersion;
1452 const auto status = _setLastOptime_inlock(update, &configVersion);
1453 _updateLastCommittedOpTime_inlock();
1454 return status;
1455 }
1456
setLastAppliedOptime_forTest(long long cfgVer,long long memberId,const OpTime & opTime)1457 Status ReplicationCoordinatorImpl::setLastAppliedOptime_forTest(long long cfgVer,
1458 long long memberId,
1459 const OpTime& opTime) {
1460 stdx::lock_guard<stdx::mutex> lock(_mutex);
1461 invariant(getReplicationMode() == modeReplSet);
1462
1463 const UpdatePositionArgs::UpdateInfo update(opTime, OpTime(), cfgVer, memberId);
1464 long long configVersion;
1465 const auto status = _setLastOptime_inlock(update, &configVersion);
1466 _updateLastCommittedOpTime_inlock();
1467 return status;
1468 }
1469
_setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo & args,long long * configVersion)1470 Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args,
1471 long long* configVersion) {
1472 if (_selfIndex == -1) {
1473 // Ignore updates when we're in state REMOVED.
1474 return Status(ErrorCodes::NotMasterOrSecondary,
1475 "Received replSetUpdatePosition command but we are in state REMOVED");
1476 }
1477 invariant(getReplicationMode() == modeReplSet);
1478
1479 if (args.memberId < 0) {
1480 std::string errmsg = str::stream()
1481 << "Received replSetUpdatePosition for node with memberId " << args.memberId
1482 << " which is negative and therefore invalid";
1483 LOG(1) << errmsg;
1484 return Status(ErrorCodes::NodeNotFound, errmsg);
1485 }
1486
1487 if (args.memberId == _rsConfig.getMemberAt(_selfIndex).getId()) {
1488 // Do not let remote nodes tell us what our optime is.
1489 return Status::OK();
1490 }
1491
1492 LOG(2) << "received notification that node with memberID " << args.memberId
1493 << " in config with version " << args.cfgver
1494 << " has reached optime: " << args.appliedOpTime
1495 << " and is durable through: " << args.durableOpTime;
1496
1497 if (args.cfgver != _rsConfig.getConfigVersion()) {
1498 std::string errmsg = str::stream()
1499 << "Received replSetUpdatePosition for node with memberId " << args.memberId
1500 << " whose config version of " << args.cfgver << " doesn't match our config version of "
1501 << _rsConfig.getConfigVersion();
1502 LOG(1) << errmsg;
1503 *configVersion = _rsConfig.getConfigVersion();
1504 return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
1505 }
1506
1507 auto* memberData = _topCoord->findMemberDataByMemberId(args.memberId);
1508 if (!memberData) {
1509 invariant(!_rsConfig.findMemberByID(args.memberId));
1510
1511 std::string errmsg = str::stream()
1512 << "Received replSetUpdatePosition for node with memberId " << args.memberId
1513 << " which doesn't exist in our config";
1514 LOG(1) << errmsg;
1515 return Status(ErrorCodes::NodeNotFound, errmsg);
1516 }
1517
1518 invariant(args.memberId == memberData->getMemberId());
1519
1520 LOG(3) << "Node with memberID " << args.memberId << " currently has optime "
1521 << memberData->getLastAppliedOpTime() << " durable through "
1522 << memberData->getLastDurableOpTime() << "; updating to optime " << args.appliedOpTime
1523 << " and durable through " << args.durableOpTime;
1524
1525
1526 auto now(_replExecutor->now());
1527 bool advancedOpTime = memberData->advanceLastAppliedOpTime(args.appliedOpTime, now);
1528 advancedOpTime =
1529 memberData->advanceLastDurableOpTime(args.durableOpTime, now) || advancedOpTime;
1530
1531 // Only update committed optime if the remote optimes increased.
1532 if (advancedOpTime) {
1533 _updateLastCommittedOpTime_inlock();
1534 }
1535
1536 _cancelAndRescheduleLivenessUpdate_inlock(args.memberId);
1537 return Status::OK();
1538 }
1539
_doneWaitingForReplication_inlock(const OpTime & opTime,Timestamp minSnapshot,const WriteConcernOptions & writeConcern)1540 bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
1541 const OpTime& opTime, Timestamp minSnapshot, const WriteConcernOptions& writeConcern) {
1542 // The syncMode cannot be unset.
1543 invariant(writeConcern.syncMode != WriteConcernOptions::SyncMode::UNSET);
1544 Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
1545 if (!status.isOK()) {
1546 return true;
1547 }
1548
1549 const bool useDurableOpTime = writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL;
1550
1551 if (writeConcern.wMode.empty()) {
1552 return _topCoord->haveNumNodesReachedOpTime(
1553 opTime, writeConcern.wNumNodes, useDurableOpTime);
1554 }
1555
1556 StringData patternName;
1557 if (writeConcern.wMode == WriteConcernOptions::kMajority) {
1558 if (_externalState->snapshotsEnabled() && !testingSnapshotBehaviorInIsolation) {
1559 // Make sure we have a valid "committed" snapshot up to the needed optime.
1560 if (!_currentCommittedSnapshot) {
1561 return false;
1562 }
1563
1564 // Wait for the "current" snapshot to advance to/past the opTime.
1565 const auto haveSnapshot = (_currentCommittedSnapshot >= opTime &&
1566 _currentCommittedSnapshot->getTimestamp() >= minSnapshot);
1567 if (!haveSnapshot) {
1568 LOG(1) << "Required snapshot optime: " << opTime << " is not yet part of the "
1569 << "current 'committed' snapshot: " << *_currentCommittedSnapshot;
1570 return false;
1571 }
1572
1573 // Fallthrough to wait for "majority" write concern.
1574 }
1575 // Continue and wait for replication to the majority (of voters).
1576 // *** Needed for J:True, writeConcernMajorityShouldJournal:False (appliedOpTime snapshot).
1577 patternName = ReplSetConfig::kMajorityWriteConcernModeName;
1578 } else {
1579 patternName = writeConcern.wMode;
1580 }
1581
1582 StatusWith<ReplSetTagPattern> tagPattern = _rsConfig.findCustomWriteMode(patternName);
1583 if (!tagPattern.isOK()) {
1584 return true;
1585 }
1586 return _topCoord->haveTaggedNodesReachedOpTime(opTime, tagPattern.getValue(), useDurableOpTime);
1587 }
1588
awaitReplication(OperationContext * opCtx,const OpTime & opTime,const WriteConcernOptions & writeConcern)1589 ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication(
1590 OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
1591 Timer timer;
1592 WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
1593 stdx::unique_lock<stdx::mutex> lock(_mutex);
1594 auto status = _awaitReplication_inlock(&lock, opCtx, opTime, Timestamp(), fixedWriteConcern);
1595 return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
1596 }
1597
1598 ReplicationCoordinator::StatusAndDuration
awaitReplicationOfLastOpForClient(OperationContext * opCtx,const WriteConcernOptions & writeConcern)1599 ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient(
1600 OperationContext* opCtx, const WriteConcernOptions& writeConcern) {
1601 Timer timer;
1602 WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
1603 stdx::unique_lock<stdx::mutex> lock(_mutex);
1604 const auto& clientInfo = ReplClientInfo::forClient(opCtx->getClient());
1605 auto status = _awaitReplication_inlock(
1606 &lock, opCtx, clientInfo.getLastOp(), clientInfo.getLastSnapshot(), fixedWriteConcern);
1607 return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
1608 }
1609
_awaitReplication_inlock(stdx::unique_lock<stdx::mutex> * lock,OperationContext * opCtx,const OpTime & opTime,Timestamp minSnapshot,const WriteConcernOptions & writeConcern)1610 Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
1611 stdx::unique_lock<stdx::mutex>* lock,
1612 OperationContext* opCtx,
1613 const OpTime& opTime,
1614 Timestamp minSnapshot,
1615 const WriteConcernOptions& writeConcern) {
1616
1617 // We should never wait for replication if we are holding any locks, because this can
1618 // potentially block for long time while doing network activity.
1619 if (opCtx->lockState()->isLocked()) {
1620 return {ErrorCodes::IllegalOperation,
1621 "Waiting for replication not allowed while holding a lock"};
1622 }
1623
1624 const Mode replMode = getReplicationMode();
1625 if (replMode == modeNone) {
1626 // no replication check needed (validated above)
1627 return Status::OK();
1628 }
1629
1630 if (replMode == modeMasterSlave && writeConcern.wMode == WriteConcernOptions::kMajority) {
1631 // with master/slave, majority is equivalent to w=1
1632 return Status::OK();
1633 }
1634
1635 if (opTime.isNull() && minSnapshot == Timestamp()) {
1636 // If waiting for the empty optime, always say it's been replicated.
1637 return Status::OK();
1638 }
1639
1640 auto checkForStepDown = [&]() -> Status {
1641 if (replMode == modeReplSet && !_memberState.primary()) {
1642 return {ErrorCodes::PrimarySteppedDown,
1643 "Primary stepped down while waiting for replication"};
1644 }
1645
1646 if (opTime.getTerm() != _topCoord->getTerm()) {
1647 return {
1648 ErrorCodes::PrimarySteppedDown,
1649 str::stream() << "Term changed from " << opTime.getTerm() << " to "
1650 << _topCoord->getTerm()
1651 << " while waiting for replication, indicating that this node must "
1652 "have stepped down."};
1653 }
1654
1655 if (_topCoord->isSteppingDown()) {
1656 return {ErrorCodes::PrimarySteppedDown,
1657 "Received stepdown request while waiting for replication"};
1658 }
1659 return Status::OK();
1660 };
1661
1662 Status stepdownStatus = checkForStepDown();
1663 if (!stepdownStatus.isOK()) {
1664 return stepdownStatus;
1665 }
1666
1667 auto interruptStatus = opCtx->checkForInterruptNoAssert();
1668 if (!interruptStatus.isOK()) {
1669 return interruptStatus;
1670 }
1671
1672 if (writeConcern.wMode.empty()) {
1673 if (writeConcern.wNumNodes < 1) {
1674 return Status::OK();
1675 } else if (writeConcern.wNumNodes == 1 && _getMyLastAppliedOpTime_inlock() >= opTime) {
1676 return Status::OK();
1677 }
1678 }
1679
1680 auto clockSource = opCtx->getServiceContext()->getPreciseClockSource();
1681 const auto wTimeoutDate = [&]() -> const Date_t {
1682 if (writeConcern.wDeadline != Date_t::max()) {
1683 return writeConcern.wDeadline;
1684 }
1685 if (writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
1686 return Date_t::max();
1687 }
1688 return clockSource->now() + clockSource->getPrecision() +
1689 Milliseconds{writeConcern.wTimeout};
1690 }();
1691
1692 // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
1693 stdx::condition_variable condVar;
1694 ThreadWaiter waiter(opTime, &writeConcern, &condVar);
1695 WaiterGuard guard(&_replicationWaiterList, &waiter);
1696 while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
1697
1698 if (_inShutdown) {
1699 return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"};
1700 }
1701
1702 auto status = opCtx->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate);
1703 if (!status.isOK()) {
1704 return status.getStatus();
1705 }
1706
1707 if (status.getValue() == stdx::cv_status::timeout) {
1708 if (Command::testCommandsEnabled) {
1709 // log state of replica set on timeout to help with diagnosis.
1710 BSONObjBuilder progress;
1711 _topCoord->fillMemberData(&progress);
1712 log() << "Replication for failed WC: " << writeConcern.toBSON()
1713 << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID()
1714 << ", progress: " << progress.done();
1715 }
1716 return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
1717 }
1718
1719 stepdownStatus = checkForStepDown();
1720 if (!stepdownStatus.isOK()) {
1721 return stepdownStatus;
1722 }
1723 }
1724
1725 return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
1726 }
1727
waitForStepDownAttempt_forTest()1728 void ReplicationCoordinatorImpl::waitForStepDownAttempt_forTest() {
1729 stdx::unique_lock<stdx::mutex> lk(_mutex);
1730 while (!_topCoord->isSteppingDown()) {
1731 _stepDownWaiters.wait(lk);
1732 }
1733 }
1734
stepDown(OperationContext * opCtx,const bool force,const Milliseconds & waitTime,const Milliseconds & stepdownTime)1735 Status ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
1736 const bool force,
1737 const Milliseconds& waitTime,
1738 const Milliseconds& stepdownTime) {
1739
1740 const Date_t startTime = _replExecutor->now();
1741 const Date_t stepDownUntil = startTime + stepdownTime;
1742 const Date_t waitUntil = startTime + waitTime;
1743
1744 if (!getMemberState().primary()) {
1745 // Note this check is inherently racy - it's always possible for the node to
1746 // stepdown from some other path before we acquire the global exclusive lock. This check
1747 // is just to try to save us from acquiring the global X lock unnecessarily.
1748 return {ErrorCodes::NotMaster, "not primary so can't step down"};
1749 }
1750
1751 auto globalLock = stdx::make_unique<Lock::GlobalLock>(
1752 opCtx, MODE_X, durationCount<Milliseconds>(stepdownTime), Lock::GlobalLock::EnqueueOnly());
1753
1754 // We've requested the global exclusive lock which will stop new operations from coming in,
1755 // but existing operations could take a long time to finish, so kill all user operations
1756 // to help us get the global lock faster.
1757 _externalState->killAllUserOperations(opCtx);
1758
1759 globalLock->waitForLock(durationCount<Milliseconds>(stepdownTime));
1760 if (!globalLock->isLocked()) {
1761 return {ErrorCodes::ExceededTimeLimit,
1762 "Could not acquire the global shared lock within the amount of time "
1763 "specified that we should step down for"};
1764 }
1765
1766 stdx::unique_lock<stdx::mutex> lk(_mutex);
1767
1768 auto status = opCtx->checkForInterruptNoAssert();
1769 if (!status.isOK()) {
1770 return status;
1771 }
1772
1773 const long long termAtStart = _topCoord->getTerm();
1774
1775 auto statusWithAbortFn = _topCoord->prepareForStepDownAttempt();
1776 if (!statusWithAbortFn.isOK()) {
1777 // This will cause us to fail if we're already in the process of stepping down.
1778 // It is also possible to get here even if we're done stepping down via another path,
1779 // and this will also elicit a failure from this call.
1780 return statusWithAbortFn.getStatus();
1781 }
1782 const auto& abortFn = statusWithAbortFn.getValue();
1783
1784 // Wake up threads blocked in waitForStepDownAttempt_forTest.
1785 _stepDownWaiters.notify_all();
1786
1787 // Update _canAcceptNonLocalWrites from the TopologyCoordinator now that we're in the middle
1788 // of a stepdown attempt. This will prevent us from accepting writes so that if our stepdown
1789 // attempt fails later we can release the global lock and go to sleep to allow secondaries to
1790 // catch up without allowing new writes in.
1791 auto action = _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
1792 invariant(action == PostMemberStateUpdateAction::kActionNone);
1793 invariant(!_canAcceptNonLocalWrites.loadRelaxed());
1794
1795 // Make sure that we leave _canAcceptNonLocalWrites in the proper state.
1796 auto updateMemberState = [&] {
1797 invariant(lk.owns_lock());
1798 invariant(opCtx->lockState()->isW());
1799
1800 auto action = _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
1801 // Seems unlikely but handle kActionWinElection in case some surprising sequence leads here.
1802 if (action == kActionWinElection) {
1803 _postWonElectionUpdateMemberState_inlock();
1804 lk.unlock();
1805 } else {
1806 lk.unlock();
1807
1808 if (MONGO_FAIL_POINT(stepdownHangBeforePerformingPostMemberStateUpdateActions)) {
1809 log() << "stepping down from primary - "
1810 "stepdownHangBeforePerformingPostMemberStateUpdateActions fail point "
1811 "enabled. Blocking until fail point is disabled.";
1812 while (MONGO_FAIL_POINT(stepdownHangBeforePerformingPostMemberStateUpdateActions)) {
1813 mongo::sleepsecs(1);
1814 {
1815 stdx::lock_guard<stdx::mutex> lock(_mutex);
1816 if (_inShutdown) {
1817 break;
1818 }
1819 }
1820 }
1821 }
1822
1823 _performPostMemberStateUpdateAction(action);
1824 }
1825 };
1826 ScopeGuard onExitGuard = MakeGuard([&] {
1827 abortFn();
1828 updateMemberState();
1829 });
1830
1831 try {
1832
1833 while (!_topCoord->attemptStepDown(
1834 termAtStart, _replExecutor->now(), waitUntil, stepDownUntil, force)) {
1835
1836 // The stepdown attempt failed. Now release the global lock to allow secondaries
1837 // to read the oplog, then wait until enough secondaries are caught up for us to
1838 // finish stepdown.
1839 globalLock.reset();
1840 invariant(!opCtx->lockState()->isLocked());
1841
1842 // Make sure we re-acquire the global lock before returning so that we're always holding
1843 // the
1844 // global lock when the onExitGuard set up earlier runs.
1845 ON_BLOCK_EXIT([&] {
1846 // Need to release _mutex before re-acquiring the global lock to preserve lock
1847 // acquisition order rules.
1848 lk.unlock();
1849
1850 // Need to re-acquire the global lock before re-attempting stepdown.
1851 // We use no timeout here even though that means the lock acquisition could take
1852 // longer
1853 // than the stepdown window. If that happens, the call to _tryToStepDown
1854 // immediately
1855 // after will error. Since we'll need the global lock no matter what to clean up a
1856 // failed stepdown attempt, we might as well spend whatever time we need to acquire
1857 // it
1858 // now.
1859 globalLock.reset(new Lock::GlobalLock(opCtx, MODE_X, UINT_MAX));
1860 invariant(globalLock->isLocked());
1861 lk.lock();
1862 });
1863
1864 // We ignore the case where waitForConditionOrInterruptUntil returns
1865 // stdx::cv_status::timeout because in that case coming back around the loop and calling
1866 // attemptStepDown again will cause attemptStepDown to return ExceededTimeLimit with
1867 // the proper error message.
1868 opCtx->waitForConditionOrInterruptUntil(
1869 _stepDownWaiters, lk, std::min(stepDownUntil, waitUntil));
1870 }
1871 } catch (const DBException& e) {
1872 return e.toStatus();
1873 }
1874
1875 // Stepdown success!
1876 onExitGuard.Dismiss();
1877 updateMemberState();
1878
1879 // Schedule work to (potentially) step back up once the stepdown period has ended.
1880 _scheduleWorkAt(
1881 stepDownUntil,
1882 stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, this, stdx::placeholders::_1));
1883
1884 // If election handoff is enabled, schedule a step-up immediately instead of waiting for the
1885 // election timeout to expire. ReplSetStepUp is only supported in PV1.
1886 if (isV1ElectionProtocol() && !force && enableElectionHandoff.load()) {
1887 _performElectionHandoff();
1888 }
1889 return Status::OK();
1890 }
1891
_signalStepDownWaiterIfReady_inlock()1892 void ReplicationCoordinatorImpl::_signalStepDownWaiterIfReady_inlock() {
1893 if (_topCoord->isSteppingDown() && _topCoord->isSafeToStepDown()) {
1894 _stepDownWaiters.notify_all();
1895 }
1896 }
1897
_performElectionHandoff()1898 void ReplicationCoordinatorImpl::_performElectionHandoff() {
1899 stdx::lock_guard<stdx::mutex> lock(_mutex);
1900 auto candidateIndex = _topCoord->chooseElectionHandoffCandidate();
1901
1902 if (candidateIndex < 0) {
1903 log() << "Could not find node to hand off election to.";
1904 return;
1905 }
1906
1907 auto target = _rsConfig.getMemberAt(candidateIndex).getHostAndPort();
1908 executor::RemoteCommandRequest request(
1909 target, "admin", BSON("replSetStepUp" << 1 << "skipDryRun" << true), nullptr);
1910 log() << "Handing off election to " << target;
1911
1912 auto callbackHandleSW = _replExecutor->scheduleRemoteCommand(
1913 request, [target](const executor::TaskExecutor::RemoteCommandCallbackArgs& callbackData) {
1914 auto status = callbackData.response.status;
1915
1916 if (status.isOK()) {
1917 LOG(1) << "replSetStepUp request to " << target << " succeeded with response -- "
1918 << callbackData.response.data;
1919 } else {
1920 log() << "replSetStepUp request to " << target << " failed due to " << status;
1921 }
1922 });
1923
1924 auto callbackHandleStatus = callbackHandleSW.getStatus();
1925 if (!callbackHandleStatus.isOK()) {
1926 error() << "Failed to schedule ReplSetStepUp request to " << target
1927 << " for election handoff: " << callbackHandleStatus;
1928 }
1929 }
1930
_handleTimePassing(const executor::TaskExecutor::CallbackArgs & cbData)1931 void ReplicationCoordinatorImpl::_handleTimePassing(
1932 const executor::TaskExecutor::CallbackArgs& cbData) {
1933 if (!cbData.status.isOK()) {
1934 return;
1935 }
1936
1937 // For election protocol v1, call _startElectSelfIfEligibleV1 to avoid race
1938 // against other elections caused by events like election timeout, replSetStepUp etc.
1939 if (isV1ElectionProtocol()) {
1940 _startElectSelfIfEligibleV1(
1941 TopologyCoordinator::StartElectionReason::kSingleNodePromptElection);
1942 return;
1943 }
1944
1945 bool wonSingleNodeElection = [this]() {
1946 stdx::lock_guard<stdx::mutex> lk(_mutex);
1947 return _topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor->now());
1948 }();
1949
1950 if (wonSingleNodeElection) {
1951 stdx::lock_guard<stdx::mutex> lk(_mutex);
1952 _postWonElectionUpdateMemberState_inlock();
1953 }
1954 }
1955
isMasterForReportingPurposes()1956 bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() {
1957 if (_settings.usingReplSets()) {
1958 stdx::lock_guard<stdx::mutex> lock(_mutex);
1959 if (getReplicationMode() == modeReplSet && _getMemberState_inlock().primary()) {
1960 return true;
1961 }
1962 return false;
1963 }
1964
1965 if (!_settings.isSlave())
1966 return true;
1967
1968
1969 // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed
1970 if (replAllDead) {
1971 return false;
1972 }
1973
1974 if (_settings.isMaster()) {
1975 // if running with --master --slave, allow.
1976 return true;
1977 }
1978
1979 return false;
1980 }
1981
canAcceptWritesForDatabase(OperationContext * opCtx,StringData dbName)1982 bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* opCtx,
1983 StringData dbName) {
1984 // The answer isn't meaningful unless we hold the global lock.
1985 invariant(opCtx->lockState()->isLocked());
1986 return canAcceptWritesForDatabase_UNSAFE(opCtx, dbName);
1987 }
1988
canAcceptWritesForDatabase_UNSAFE(OperationContext * opCtx,StringData dbName)1989 bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx,
1990 StringData dbName) {
1991 // _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes
1992 // started with --slave, and adjusted based on primary+drain state in replica sets.
1993 //
1994 // That is, stand-alone nodes, non-slave nodes and drained replica set primaries can always
1995 // accept writes. Similarly, writes are always permitted to the "local" database. Finally,
1996 // in the event that a node is started with --slave and --master, we allow writes unless the
1997 // master/slave system has set the replAllDead flag.
1998 if (_canAcceptNonLocalWrites.loadRelaxed() || alwaysAllowNonLocalWrites(opCtx)) {
1999 return true;
2000 }
2001 if (dbName == kLocalDB) {
2002 return true;
2003 }
2004 return !replAllDead && _settings.isMaster();
2005 }
2006
canAcceptWritesFor(OperationContext * opCtx,const NamespaceString & ns)2007 bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* opCtx,
2008 const NamespaceString& ns) {
2009 invariant(opCtx->lockState()->isLocked());
2010 return canAcceptWritesFor_UNSAFE(opCtx, ns);
2011 }
2012
canAcceptWritesFor_UNSAFE(OperationContext * opCtx,const NamespaceString & ns)2013 bool ReplicationCoordinatorImpl::canAcceptWritesFor_UNSAFE(OperationContext* opCtx,
2014 const NamespaceString& ns) {
2015 StringData dbName = ns.db();
2016 bool canWriteToDB = canAcceptWritesForDatabase_UNSAFE(opCtx, dbName);
2017
2018 if (!canWriteToDB && !ns.isSystemDotProfile()) {
2019 return false;
2020 }
2021
2022 // Even if we think we can write to the database we need to make sure we're not trying
2023 // to write to the oplog in ROLLBACK.
2024 // If we can accept non local writes (ie we're PRIMARY) then we must not be in ROLLBACK.
2025 // This check is redundant of the check of _memberState below, but since this can be checked
2026 // without locking, we do it as an optimization.
2027 if (_canAcceptNonLocalWrites.loadRelaxed() || alwaysAllowNonLocalWrites(opCtx)) {
2028 return true;
2029 }
2030
2031 if (!ns.isOplog()) {
2032 return true;
2033 }
2034
2035 stdx::lock_guard<stdx::mutex> lock(_mutex);
2036 if (_memberState.rollback()) {
2037 return false;
2038 }
2039 return true;
2040 }
2041
checkCanServeReadsFor(OperationContext * opCtx,const NamespaceString & ns,bool slaveOk)2042 Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* opCtx,
2043 const NamespaceString& ns,
2044 bool slaveOk) {
2045 invariant(opCtx->lockState()->isLocked());
2046 return checkCanServeReadsFor_UNSAFE(opCtx, ns, slaveOk);
2047 }
2048
checkCanServeReadsFor_UNSAFE(OperationContext * opCtx,const NamespaceString & ns,bool slaveOk)2049 Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext* opCtx,
2050 const NamespaceString& ns,
2051 bool slaveOk) {
2052 auto client = opCtx->getClient();
2053 bool isPrimaryOrSecondary = _canServeNonLocalReads.loadRelaxed();
2054
2055 // Oplog reads are not allowed during STARTUP state, but we make an exception for internal
2056 // reads. Internal reads are required for cleaning up unfinished apply batches.
2057 if (!isPrimaryOrSecondary && getReplicationMode() == modeReplSet && ns.isOplog()) {
2058 stdx::lock_guard<stdx::mutex> lock(_mutex);
2059 if ((_memberState.startup() && client->isFromUserConnection()) || _memberState.startup2() ||
2060 _memberState.rollback()) {
2061 return Status{ErrorCodes::NotMasterOrSecondary,
2062 "Oplog collection reads are not allowed while in the rollback or "
2063 "startup state."};
2064 }
2065 }
2066
2067 if (client->isInDirectClient()) {
2068 return Status::OK();
2069 }
2070 if (canAcceptWritesFor_UNSAFE(opCtx, ns)) {
2071 return Status::OK();
2072 }
2073 if (getReplicationMode() == modeMasterSlave) {
2074 return Status::OK();
2075 }
2076 if (slaveOk) {
2077 if (isPrimaryOrSecondary) {
2078 return Status::OK();
2079 }
2080 return Status(ErrorCodes::NotMasterOrSecondary,
2081 "not master or secondary; cannot currently read from this replSet member");
2082 }
2083 return Status(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false");
2084 }
2085
isInPrimaryOrSecondaryState() const2086 bool ReplicationCoordinatorImpl::isInPrimaryOrSecondaryState() const {
2087 return _canServeNonLocalReads.loadRelaxed();
2088 }
2089
shouldRelaxIndexConstraints(OperationContext * opCtx,const NamespaceString & ns)2090 bool ReplicationCoordinatorImpl::shouldRelaxIndexConstraints(OperationContext* opCtx,
2091 const NamespaceString& ns) {
2092 return !canAcceptWritesFor(opCtx, ns);
2093 }
2094
getElectionId()2095 OID ReplicationCoordinatorImpl::getElectionId() {
2096 stdx::lock_guard<stdx::mutex> lock(_mutex);
2097 return _electionId;
2098 }
2099
getMyRID() const2100 OID ReplicationCoordinatorImpl::getMyRID() const {
2101 stdx::lock_guard<stdx::mutex> lock(_mutex);
2102 return _getMyRID_inlock();
2103 }
2104
_getMyRID_inlock() const2105 OID ReplicationCoordinatorImpl::_getMyRID_inlock() const {
2106 return _myRID;
2107 }
2108
getMyId() const2109 int ReplicationCoordinatorImpl::getMyId() const {
2110 stdx::lock_guard<stdx::mutex> lock(_mutex);
2111 return _getMyId_inlock();
2112 }
2113
_getMyId_inlock() const2114 int ReplicationCoordinatorImpl::_getMyId_inlock() const {
2115 const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex);
2116 return self.getId();
2117 }
2118
resyncData(OperationContext * opCtx,bool waitUntilCompleted)2119 Status ReplicationCoordinatorImpl::resyncData(OperationContext* opCtx, bool waitUntilCompleted) {
2120 _stopDataReplication(opCtx);
2121 auto finishedEvent = uassertStatusOK(_replExecutor->makeEvent());
2122 stdx::function<void()> f;
2123 if (waitUntilCompleted)
2124 f = [&finishedEvent, this]() { _replExecutor->signalEvent(finishedEvent); };
2125
2126 {
2127 stdx::lock_guard<stdx::mutex> lk(_mutex);
2128 _resetMyLastOpTimes_inlock();
2129 }
2130 // unlock before calling _startDataReplication().
2131 _startDataReplication(opCtx, f);
2132 if (waitUntilCompleted) {
2133 _replExecutor->waitForEvent(finishedEvent);
2134 }
2135 return Status::OK();
2136 }
2137
prepareReplSetUpdatePositionCommand() const2138 StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand() const {
2139 stdx::lock_guard<stdx::mutex> lock(_mutex);
2140 return _topCoord->prepareReplSetUpdatePositionCommand(
2141 _getCurrentCommittedSnapshotOpTime_inlock());
2142 }
2143
processReplSetGetStatus(BSONObjBuilder * response,ReplSetGetStatusResponseStyle responseStyle)2144 Status ReplicationCoordinatorImpl::processReplSetGetStatus(
2145 BSONObjBuilder* response, ReplSetGetStatusResponseStyle responseStyle) {
2146
2147 BSONObj initialSyncProgress;
2148 if (responseStyle == ReplSetGetStatusResponseStyle::kInitialSync) {
2149 std::shared_ptr<InitialSyncer> initialSyncerCopy;
2150 {
2151 stdx::lock_guard<stdx::mutex> lk(_mutex);
2152 initialSyncerCopy = _initialSyncer;
2153 }
2154
2155 // getInitialSyncProgress must be called outside the ReplicationCoordinatorImpl::_mutex
2156 // lock. Else it might deadlock with InitialSyncer::_multiApplierCallback where it first
2157 // acquires InitialSyncer::_mutex and then ReplicationCoordinatorImpl::_mutex.
2158 if (initialSyncerCopy) {
2159 initialSyncProgress = initialSyncerCopy->getInitialSyncProgress();
2160 }
2161 }
2162
2163 stdx::lock_guard<stdx::mutex> lk(_mutex);
2164 Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse");
2165 _topCoord->prepareStatusResponse(
2166 TopologyCoordinator::ReplSetStatusArgs{
2167 _replExecutor->now(),
2168 static_cast<unsigned>(time(0) - serverGlobalParams.started),
2169 _getCurrentCommittedSnapshotOpTime_inlock(),
2170 initialSyncProgress},
2171 response,
2172 &result);
2173 return result;
2174 }
2175
fillIsMasterForReplSet(IsMasterResponse * response,const SplitHorizon::Parameters & horizonParams)2176 void ReplicationCoordinatorImpl::fillIsMasterForReplSet(
2177 IsMasterResponse* response, const SplitHorizon::Parameters& horizonParams) {
2178 invariant(getSettings().usingReplSets());
2179
2180 stdx::lock_guard<stdx::mutex> lk(_mutex);
2181 _topCoord->fillIsMasterForReplSet(response, horizonParams);
2182
2183 OpTime lastOpTime = _getMyLastAppliedOpTime_inlock();
2184 response->setLastWrite(lastOpTime, lastOpTime.getTimestamp().getSecs());
2185 if (_currentCommittedSnapshot) {
2186 response->setLastMajorityWrite(_currentCommittedSnapshot.get(),
2187 _currentCommittedSnapshot->getTimestamp().getSecs());
2188 }
2189
2190 if (response->isMaster() && !_canAcceptNonLocalWrites.loadRelaxed()) {
2191 // Report that we are secondary to ismaster callers until drain completes.
2192 response->setIsMaster(false);
2193 response->setIsSecondary(true);
2194 }
2195
2196 if (_inShutdown) {
2197 response->setIsMaster(false);
2198 response->setIsSecondary(false);
2199 }
2200 }
2201
appendSlaveInfoData(BSONObjBuilder * result)2202 void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) {
2203 stdx::lock_guard<stdx::mutex> lock(_mutex);
2204 _topCoord->fillMemberData(result);
2205 }
2206
getConfig() const2207 ReplSetConfig ReplicationCoordinatorImpl::getConfig() const {
2208 stdx::lock_guard<stdx::mutex> lock(_mutex);
2209 return _rsConfig;
2210 }
2211
processReplSetGetConfig(BSONObjBuilder * result)2212 void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) {
2213 stdx::lock_guard<stdx::mutex> lock(_mutex);
2214 result->append("config", _rsConfig.toBSON());
2215 }
2216
processReplSetMetadata(const rpc::ReplSetMetadata & replMetadata)2217 void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {
2218 EventHandle evh;
2219
2220 {
2221 stdx::lock_guard<stdx::mutex> lock(_mutex);
2222 evh = _processReplSetMetadata_inlock(replMetadata);
2223 }
2224
2225 if (evh) {
2226 _replExecutor->waitForEvent(evh);
2227 }
2228 }
2229
cancelAndRescheduleElectionTimeout()2230 void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() {
2231 stdx::lock_guard<stdx::mutex> lock(_mutex);
2232 _cancelAndRescheduleElectionTimeout_inlock();
2233 }
2234
_processReplSetMetadata_inlock(const rpc::ReplSetMetadata & replMetadata)2235 EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_inlock(
2236 const rpc::ReplSetMetadata& replMetadata) {
2237 if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
2238 return EventHandle();
2239 }
2240 return _updateTerm_inlock(replMetadata.getTerm());
2241 }
2242
getMaintenanceMode()2243 bool ReplicationCoordinatorImpl::getMaintenanceMode() {
2244 stdx::lock_guard<stdx::mutex> lk(_mutex);
2245 return _topCoord->getMaintenanceCount() > 0;
2246 }
2247
setMaintenanceMode(bool activate)2248 Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
2249 if (getReplicationMode() != modeReplSet) {
2250 return Status(ErrorCodes::NoReplicationEnabled,
2251 "can only set maintenance mode on replica set members");
2252 }
2253
2254 stdx::unique_lock<stdx::mutex> lk(_mutex);
2255 if (_topCoord->getRole() == TopologyCoordinator::Role::kCandidate) {
2256 return Status(ErrorCodes::NotSecondary, "currently running for election");
2257 }
2258
2259 if (_getMemberState_inlock().primary()) {
2260 return Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode");
2261 }
2262
2263 int curMaintenanceCalls = _topCoord->getMaintenanceCount();
2264 if (activate) {
2265 log() << "going into maintenance mode with " << curMaintenanceCalls
2266 << " other maintenance mode tasks in progress" << rsLog;
2267 _topCoord->adjustMaintenanceCountBy(1);
2268 } else if (curMaintenanceCalls > 0) {
2269 invariant(_topCoord->getRole() == TopologyCoordinator::Role::kFollower);
2270
2271 _topCoord->adjustMaintenanceCountBy(-1);
2272
2273 log() << "leaving maintenance mode (" << curMaintenanceCalls - 1
2274 << " other maintenance mode tasks ongoing)" << rsLog;
2275 } else {
2276 warning() << "Attempted to leave maintenance mode but it is not currently active";
2277 return Status(ErrorCodes::OperationFailed, "already out of maintenance mode");
2278 }
2279
2280 const PostMemberStateUpdateAction action =
2281 _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
2282
2283 if (action == kActionWinElection) {
2284 _postWonElectionUpdateMemberState_inlock();
2285 } else {
2286 lk.unlock();
2287 _performPostMemberStateUpdateAction(action);
2288 }
2289
2290 return Status::OK();
2291 }
2292
processReplSetSyncFrom(OperationContext * opCtx,const HostAndPort & target,BSONObjBuilder * resultObj)2293 Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCtx,
2294 const HostAndPort& target,
2295 BSONObjBuilder* resultObj) {
2296 Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
2297 auto doResync = false;
2298 {
2299 stdx::lock_guard<stdx::mutex> lk(_mutex);
2300 _topCoord->prepareSyncFromResponse(target, resultObj, &result);
2301 // If we are in the middle of an initial sync, do a resync.
2302 doResync = result.isOK() && _initialSyncer && _initialSyncer->isActive();
2303 }
2304
2305 if (doResync) {
2306 return resyncData(opCtx, false);
2307 }
2308
2309 return result;
2310 }
2311
processReplSetFreeze(int secs,BSONObjBuilder * resultObj)2312 Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) {
2313 auto result = [=]() {
2314 stdx::lock_guard<stdx::mutex> lock(_mutex);
2315 return _topCoord->prepareFreezeResponse(_replExecutor->now(), secs, resultObj);
2316 }();
2317 if (!result.isOK()) {
2318 return result.getStatus();
2319 }
2320
2321 if (TopologyCoordinator::PrepareFreezeResponseResult::kSingleNodeSelfElect ==
2322 result.getValue()) {
2323 if (isV1ElectionProtocol()) {
2324 // For election protocol v1, call _startElectSelfIfEligibleV1 to avoid race
2325 // against other elections caused by events like election timeout, replSetStepUp etc.
2326 _startElectSelfIfEligibleV1(
2327 TopologyCoordinator::StartElectionReason::kSingleNodePromptElection);
2328 } else {
2329 // If we just unfroze and ended our stepdown period and we are a one node replica set,
2330 // the topology coordinator will have gone into the candidate role to signal that we
2331 // need to elect ourself.
2332 _performPostMemberStateUpdateAction(kActionWinElection);
2333 }
2334 }
2335
2336 return Status::OK();
2337 }
2338
processHeartbeat(const ReplSetHeartbeatArgs & args,ReplSetHeartbeatResponse * response)2339 Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& args,
2340 ReplSetHeartbeatResponse* response) {
2341 {
2342 stdx::lock_guard<stdx::mutex> lock(_mutex);
2343 if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
2344 return Status(ErrorCodes::NotYetInitialized,
2345 "Received heartbeat while still initializing replication system");
2346 }
2347 }
2348
2349 auto senderHost(args.getSenderHost());
2350
2351 stdx::lock_guard<stdx::mutex> lk(_mutex);
2352
2353 const Date_t now = _replExecutor->now();
2354 Status result =
2355 _topCoord->prepareHeartbeatResponse(now, args, _settings.ourSetName(), response);
2356 if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
2357 // If this node does not belong to the configuration it knows about, send heartbeats
2358 // back to any node that sends us a heartbeat, in case one of those remote nodes has
2359 // a configuration that contains us. Chances are excellent that it will, since that
2360 // is the only reason for a remote node to send this node a heartbeat request.
2361 if (!senderHost.empty() && _seedList.insert(senderHost).second) {
2362 _scheduleHeartbeatToTarget_inlock(senderHost, -1, now);
2363 }
2364 } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) {
2365 // Schedule a heartbeat to the sender to fetch the new config.
2366 // We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
2367 // will trigger reconfig, which cancels and reschedules all heartbeats.
2368
2369 if (args.hasSenderHost()) {
2370 int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
2371 _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now);
2372 }
2373 }
2374 return result;
2375 }
2376
processReplSetReconfig(OperationContext * opCtx,const ReplSetReconfigArgs & args,BSONObjBuilder * resultObj)2377 Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCtx,
2378 const ReplSetReconfigArgs& args,
2379 BSONObjBuilder* resultObj) {
2380 log() << "replSetReconfig admin command received from client; new config: "
2381 << args.newConfigObj;
2382
2383 stdx::unique_lock<stdx::mutex> lk(_mutex);
2384
2385 while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
2386 _rsConfigStateChange.wait(lk);
2387 }
2388
2389 switch (_rsConfigState) {
2390 case kConfigSteady:
2391 break;
2392 case kConfigUninitialized:
2393 return Status(ErrorCodes::NotYetInitialized,
2394 "Node not yet initialized; use the replSetInitiate command");
2395 case kConfigReplicationDisabled:
2396 invariant(
2397 false); // should be unreachable due to !_settings.usingReplSets() check above
2398 case kConfigInitiating:
2399 case kConfigReconfiguring:
2400 case kConfigHBReconfiguring:
2401 return Status(ErrorCodes::ConfigurationInProgress,
2402 "Cannot run replSetReconfig because the node is currently updating "
2403 "its configuration");
2404 default:
2405 severe() << "Unexpected _rsConfigState " << int(_rsConfigState);
2406 fassertFailed(18914);
2407 }
2408
2409 invariant(_rsConfig.isInitialized());
2410
2411 if (!args.force && !_getMemberState_inlock().primary()) {
2412 return Status(ErrorCodes::NotMaster,
2413 str::stream()
2414 << "replSetReconfig should only be run on PRIMARY, but my state is "
2415 << _getMemberState_inlock().toString()
2416 << "; use the \"force\" argument to override");
2417 }
2418
2419 _setConfigState_inlock(kConfigReconfiguring);
2420 ScopeGuard configStateGuard = MakeGuard(
2421 lockAndCall,
2422 &lk,
2423 stdx::bind(&ReplicationCoordinatorImpl::_setConfigState_inlock, this, kConfigSteady));
2424
2425 ReplSetConfig oldConfig = _rsConfig;
2426 lk.unlock();
2427
2428 ReplSetConfig newConfig;
2429 BSONObj newConfigObj = args.newConfigObj;
2430 if (args.force) {
2431 newConfigObj = incrementConfigVersionByRandom(newConfigObj);
2432 }
2433
2434 BSONObj oldConfigObj = oldConfig.toBSON();
2435 audit::logReplSetReconfig(opCtx->getClient(), &oldConfigObj, &newConfigObj);
2436
2437 Status status = newConfig.initialize(
2438 newConfigObj, oldConfig.getProtocolVersion() == 1, oldConfig.getReplicaSetId());
2439
2440 if (!status.isOK()) {
2441 error() << "replSetReconfig got " << status << " while parsing " << newConfigObj;
2442 return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());
2443 ;
2444 }
2445 if (newConfig.getReplSetName() != _settings.ourSetName()) {
2446 str::stream errmsg;
2447 errmsg << "Attempting to reconfigure a replica set with name " << newConfig.getReplSetName()
2448 << ", but command line reports " << _settings.ourSetName() << "; rejecting";
2449 error() << std::string(errmsg);
2450 return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
2451 }
2452
2453 StatusWith<int> myIndex = validateConfigForReconfig(
2454 _externalState.get(), oldConfig, newConfig, opCtx->getServiceContext(), args.force);
2455 if (!myIndex.isOK()) {
2456 error() << "replSetReconfig got " << myIndex.getStatus() << " while validating "
2457 << newConfigObj;
2458 return Status(ErrorCodes::NewReplicaSetConfigurationIncompatible,
2459 myIndex.getStatus().reason());
2460 }
2461
2462 log() << "replSetReconfig config object with " << newConfig.getNumMembers()
2463 << " members parses ok";
2464
2465 if (!args.force) {
2466 status = checkQuorumForReconfig(
2467 _replExecutor.get(), newConfig, myIndex.getValue(), _topCoord->getTerm());
2468 if (!status.isOK()) {
2469 error() << "replSetReconfig failed; " << status;
2470 return status;
2471 }
2472 }
2473
2474 status = _externalState->storeLocalConfigDocument(opCtx, newConfig.toBSON());
2475 if (!status.isOK()) {
2476 error() << "replSetReconfig failed to store config document; " << status;
2477 return status;
2478 }
2479
2480 auto reconfigFinished = uassertStatusOK(_replExecutor->makeEvent());
2481 uassertStatusOK(
2482 _replExecutor->scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
2483 this,
2484 stdx::placeholders::_1,
2485 newConfig,
2486 args.force,
2487 myIndex.getValue(),
2488 reconfigFinished)));
2489 configStateGuard.Dismiss();
2490 _replExecutor->waitForEvent(reconfigFinished);
2491 return Status::OK();
2492 }
2493
_finishReplSetReconfig(const executor::TaskExecutor::CallbackArgs & cbData,const ReplSetConfig & newConfig,const bool isForceReconfig,int myIndex,const executor::TaskExecutor::EventHandle & finishedEvent)2494 void ReplicationCoordinatorImpl::_finishReplSetReconfig(
2495 const executor::TaskExecutor::CallbackArgs& cbData,
2496 const ReplSetConfig& newConfig,
2497 const bool isForceReconfig,
2498 int myIndex,
2499 const executor::TaskExecutor::EventHandle& finishedEvent) {
2500
2501 if (cbData.status == ErrorCodes::CallbackCanceled) {
2502 return;
2503 }
2504 auto opCtx = cc().makeOperationContext();
2505 boost::optional<Lock::GlobalWrite> globalExclusiveLock;
2506 if (isForceReconfig) {
2507 // Since it's a force reconfig, the primary node may not be electable after the
2508 // configuration change. In case we are that primary node, finish the reconfig under the
2509 // global lock, so that the step down occurs safely.
2510 globalExclusiveLock.emplace(opCtx.get());
2511 }
2512 stdx::unique_lock<stdx::mutex> lk(_mutex);
2513
2514 invariant(_rsConfigState == kConfigReconfiguring);
2515 invariant(_rsConfig.isInitialized());
2516
2517 // Do not conduct an election during a reconfig, as the node may not be electable post-reconfig.
2518 if (auto electionFinishedEvent = _cancelElectionIfNeeded_inlock()) {
2519 // Wait for the election to complete and the node's Role to be set to follower.
2520 _replExecutor
2521 ->onEvent(electionFinishedEvent,
2522 stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
2523 this,
2524 stdx::placeholders::_1,
2525 newConfig,
2526 isForceReconfig,
2527 myIndex,
2528 finishedEvent))
2529 .status_with_transitional_ignore();
2530 return;
2531 }
2532
2533 const ReplSetConfig oldConfig = _rsConfig;
2534 const PostMemberStateUpdateAction action =
2535 _setCurrentRSConfig_inlock(opCtx.get(), newConfig, myIndex);
2536
2537 // On a reconfig we drop all snapshots so we don't mistakenly read from the wrong one.
2538 // For example, if we change the meaning of the "committed" snapshot from applied -> durable.
2539 _dropAllSnapshots_inlock();
2540
2541 lk.unlock();
2542 _resetElectionInfoOnProtocolVersionUpgrade(opCtx.get(), oldConfig, newConfig);
2543 if (action == kActionWinElection) {
2544 lk.lock();
2545 _postWonElectionUpdateMemberState_inlock();
2546 lk.unlock();
2547 } else {
2548 _performPostMemberStateUpdateAction(action);
2549 }
2550
2551 _replExecutor->signalEvent(finishedEvent);
2552 }
2553
processReplSetInitiate(OperationContext * opCtx,const BSONObj & configObj,BSONObjBuilder * resultObj)2554 Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCtx,
2555 const BSONObj& configObj,
2556 BSONObjBuilder* resultObj) {
2557 log() << "replSetInitiate admin command received from client";
2558
2559 const auto replEnabled = _settings.usingReplSets();
2560 stdx::unique_lock<stdx::mutex> lk(_mutex);
2561 if (!replEnabled) {
2562 return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet");
2563 }
2564 while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
2565 _rsConfigStateChange.wait(lk);
2566 }
2567
2568 if (_rsConfigState != kConfigUninitialized) {
2569 resultObj->append("info", "try querying local.system.replset to see current configuration");
2570 return Status(ErrorCodes::AlreadyInitialized, "already initialized");
2571 }
2572 invariant(!_rsConfig.isInitialized());
2573 _setConfigState_inlock(kConfigInitiating);
2574
2575 ScopeGuard configStateGuard = MakeGuard(
2576 lockAndCall,
2577 &lk,
2578 stdx::bind(
2579 &ReplicationCoordinatorImpl::_setConfigState_inlock, this, kConfigUninitialized));
2580 lk.unlock();
2581
2582 ReplSetConfig newConfig;
2583 Status status = newConfig.initializeForInitiate(configObj, true);
2584 if (!status.isOK()) {
2585 error() << "replSet initiate got " << status << " while parsing " << configObj;
2586 return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());
2587 }
2588 if (newConfig.getReplSetName() != _settings.ourSetName()) {
2589 str::stream errmsg;
2590 errmsg << "Attempting to initiate a replica set with name " << newConfig.getReplSetName()
2591 << ", but command line reports " << _settings.ourSetName() << "; rejecting";
2592 error() << std::string(errmsg);
2593 return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
2594 }
2595
2596 StatusWith<int> myIndex =
2597 validateConfigForInitiate(_externalState.get(), newConfig, opCtx->getServiceContext());
2598 if (!myIndex.isOK()) {
2599 error() << "replSet initiate got " << myIndex.getStatus() << " while validating "
2600 << configObj;
2601 return Status(ErrorCodes::InvalidReplicaSetConfig, myIndex.getStatus().reason());
2602 }
2603
2604 log() << "replSetInitiate config object with " << newConfig.getNumMembers()
2605 << " members parses ok";
2606
2607 // In pv1, the TopologyCoordinator has not set the term yet. It will be set to kInitialTerm if
2608 // the initiate succeeds so we pass that here.
2609 status = checkQuorumForInitiate(
2610 _replExecutor.get(),
2611 newConfig,
2612 myIndex.getValue(),
2613 newConfig.getProtocolVersion() == 1 ? OpTime::kInitialTerm : OpTime::kUninitializedTerm);
2614
2615 if (!status.isOK()) {
2616 error() << "replSetInitiate failed; " << status;
2617 return status;
2618 }
2619
2620 status = _externalState->initializeReplSetStorage(opCtx, newConfig.toBSON());
2621 if (!status.isOK()) {
2622 error() << "replSetInitiate failed to store config document or create the oplog; "
2623 << status;
2624 return status;
2625 }
2626
2627 _replicationProcess->getConsistencyMarkers()->initializeMinValidDocument(opCtx);
2628
2629 auto lastAppliedOpTime = getMyLastAppliedOpTime();
2630
2631 // Since the JournalListener has not yet been set up, we must manually set our
2632 // durableOpTime.
2633 setMyLastDurableOpTime(lastAppliedOpTime);
2634
2635 // Sets the initial data timestamp on the storage engine so it can assign a timestamp
2636 // to data on disk. We do this after writing the "initiating set" oplog entry.
2637 _storage->setInitialDataTimestamp(getServiceContext(), lastAppliedOpTime.getTimestamp());
2638
2639 _finishReplSetInitiate(opCtx, newConfig, myIndex.getValue());
2640
2641 // A configuration passed to replSetInitiate() with the current node as an arbiter
2642 // will fail validation with a "replSet initiate got ... while validating" reason.
2643 invariant(!newConfig.getMemberAt(myIndex.getValue()).isArbiter());
2644 _externalState->startThreads(_settings);
2645 _startDataReplication(opCtx);
2646
2647 configStateGuard.Dismiss();
2648 return Status::OK();
2649 }
2650
_finishReplSetInitiate(OperationContext * opCtx,const ReplSetConfig & newConfig,int myIndex)2651 void ReplicationCoordinatorImpl::_finishReplSetInitiate(OperationContext* opCtx,
2652 const ReplSetConfig& newConfig,
2653 int myIndex) {
2654 stdx::unique_lock<stdx::mutex> lk(_mutex);
2655 invariant(_rsConfigState == kConfigInitiating);
2656 invariant(!_rsConfig.isInitialized());
2657 auto action = _setCurrentRSConfig_inlock(opCtx, newConfig, myIndex);
2658 if (action == kActionWinElection) {
2659 _postWonElectionUpdateMemberState_inlock();
2660 } else {
2661 lk.unlock();
2662 _performPostMemberStateUpdateAction(action);
2663 }
2664 }
2665
_setConfigState_inlock(ConfigState newState)2666 void ReplicationCoordinatorImpl::_setConfigState_inlock(ConfigState newState) {
2667 if (newState != _rsConfigState) {
2668 _rsConfigState = newState;
2669 _rsConfigStateChange.notify_all();
2670 }
2671 }
2672
2673 ReplicationCoordinatorImpl::PostMemberStateUpdateAction
_updateMemberStateFromTopologyCoordinator_inlock(OperationContext * opCtx)2674 ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock(
2675 OperationContext* opCtx) {
2676 {
2677 // We have to do this check even if our current and target state are the same as we might
2678 // have just failed a stepdown attempt and thus are staying in PRIMARY state but restoring
2679 // our ability to accept writes.
2680 bool canAcceptWrites = _topCoord->canAcceptWrites();
2681 if (canAcceptWrites != _canAcceptNonLocalWrites.loadRelaxed()) {
2682 // We must be holding the global X lock to change _canAcceptNonLocalWrites.
2683 invariant(opCtx);
2684 invariant(opCtx->lockState()->isW());
2685 }
2686 _canAcceptNonLocalWrites.store(canAcceptWrites);
2687 }
2688
2689
2690 const MemberState newState = _topCoord->getMemberState();
2691 if (newState == _memberState) {
2692 if (_topCoord->getRole() == TopologyCoordinator::Role::kCandidate) {
2693 invariant(_rsConfig.getNumMembers() == 1 && _selfIndex == 0 &&
2694 _rsConfig.getMemberAt(0).isElectable());
2695 if (isV1ElectionProtocol()) {
2696 // Start election in protocol version 1
2697 return kActionStartSingleNodeElection;
2698 }
2699 return kActionWinElection;
2700 }
2701 return kActionNone;
2702 }
2703
2704 PostMemberStateUpdateAction result;
2705 if (_memberState.primary() || newState.removed() || newState.rollback()) {
2706 // Wake up any threads blocked in awaitReplication, close connections, etc.
2707 _replicationWaiterList.signalAndRemoveAll_inlock();
2708 // Wake up the optime waiter that is waiting for primary catch-up to finish.
2709 _opTimeWaiterList.signalAndRemoveAll_inlock();
2710 // If there are any pending stepdown command requests wake them up.
2711 _stepDownWaiters.notify_all();
2712
2713 // _canAcceptNonLocalWrites should already be set above.
2714 invariant(!_canAcceptNonLocalWrites.loadRelaxed());
2715
2716 serverGlobalParams.validateFeaturesAsMaster.store(false);
2717 result = kActionCloseAllConnections;
2718 } else {
2719 result = kActionFollowerModeStateChange;
2720 }
2721
2722 // Exit catchup mode if we're in it and enable replication producer and applier on stepdown.
2723 if (_memberState.primary()) {
2724 if (_catchupState) {
2725 _catchupState->abort_inlock();
2726 }
2727 _applierState = ApplierState::Running;
2728 _externalState->startProducerIfStopped();
2729 }
2730
2731 if (_memberState.secondary() && !newState.primary()) {
2732 // Switching out of SECONDARY, but not to PRIMARY.
2733 _canServeNonLocalReads.store(0U);
2734 } else if (!_memberState.primary() && newState.secondary()) {
2735 // Switching into SECONDARY, but not from PRIMARY.
2736 _canServeNonLocalReads.store(1U);
2737 }
2738
2739 if (newState.secondary() && _topCoord->getRole() == TopologyCoordinator::Role::kCandidate) {
2740 // When transitioning to SECONDARY, the only way for _topCoord to report the candidate
2741 // role is if the configuration represents a single-node replica set. In that case, the
2742 // overriding requirement is to elect this singleton node primary.
2743 invariant(_rsConfig.getNumMembers() == 1 && _selfIndex == 0 &&
2744 _rsConfig.getMemberAt(0).isElectable());
2745 if (isV1ElectionProtocol()) {
2746 // Start election in protocol version 1
2747 result = kActionStartSingleNodeElection;
2748 } else {
2749 result = kActionWinElection;
2750 }
2751 }
2752
2753 if (newState.rollback()) {
2754 // When we start rollback, we need to drop all snapshots since we may need to create
2755 // out-of-order snapshots. This would be necessary even if the SnapshotName was completely
2756 // monotonically increasing because we don't necessarily have a snapshot of every write.
2757 // If we didn't drop all snapshots on rollback it could lead to the following situation:
2758 //
2759 // |--------|-------------|-------------|
2760 // | OpTime | HasSnapshot | Committed |
2761 // |--------|-------------|-------------|
2762 // | (0, 1) | * | * |
2763 // | (0, 2) | * | ROLLED BACK |
2764 // | (1, 2) | | * |
2765 // |--------|-------------|-------------|
2766 //
2767 // When we try to make (1,2) the commit point, we'd find (0,2) as the newest snapshot
2768 // before the commit point, but it would be invalid to mark it as the committed snapshot
2769 // since it was never committed.
2770 //
2771 // TODO SERVER-19209 We also need to clear snapshots before a resync.
2772 _dropAllSnapshots_inlock();
2773 }
2774
2775 // Upon transitioning out of ROLLBACK, we must clear any stable optime candidates that may have
2776 // been rolled back.
2777 if (_memberState.rollback()) {
2778 // Our 'lastApplied' optime at this point should be the rollback common point. We should
2779 // remove any stable optime candidates greater than the common point.
2780 auto lastApplied = _getMyLastAppliedOpTime_inlock();
2781 // The upper bound will give us the first optime T such that T > lastApplied.
2782 auto deletePoint = _stableOpTimeCandidates.upper_bound(lastApplied);
2783 _stableOpTimeCandidates.erase(deletePoint, _stableOpTimeCandidates.end());
2784
2785 // Ensure that no snapshots were created while we were in rollback.
2786 invariant(!_currentCommittedSnapshot);
2787 }
2788
2789 // If we are transitioning from secondary, cancel any scheduled takeovers.
2790 if (_memberState.secondary()) {
2791 _cancelCatchupTakeover_inlock();
2792 _cancelPriorityTakeover_inlock();
2793 }
2794
2795 // Ensure replication is running if we are no longer REMOVED.
2796 if (_memberState.removed() && !newState.arbiter()) {
2797 log() << "Scheduling a task to begin or continue replication";
2798 _scheduleWorkAt(_replExecutor->now(),
2799 [=](const mongo::executor::TaskExecutor::CallbackArgs& cbData) {
2800 _externalState->startThreads(_settings);
2801 auto opCtx = cc().makeOperationContext();
2802 _startDataReplication(opCtx.get());
2803 });
2804 }
2805
2806 log() << "transition to " << newState << " from " << _memberState << rsLog;
2807 // Initializes the featureCompatibilityVersion to the default value, because arbiters do not
2808 // receive the replicated version.
2809 if (newState.arbiter()) {
2810 serverGlobalParams.featureCompatibility.reset();
2811 }
2812
2813 _memberState = newState;
2814
2815 _cancelAndRescheduleElectionTimeout_inlock();
2816
2817 // Notifies waiters blocked in waitForMemberState().
2818 // For testing only.
2819 _memberStateChange.notify_all();
2820
2821 return result;
2822 }
2823
_performPostMemberStateUpdateAction(PostMemberStateUpdateAction action)2824 void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
2825 PostMemberStateUpdateAction action) {
2826 invariant(action != kActionWinElection);
2827 switch (action) {
2828 case kActionNone:
2829 break;
2830 case kActionFollowerModeStateChange:
2831 _onFollowerModeStateChange();
2832 break;
2833 case kActionCloseAllConnections:
2834 _externalState->closeConnections();
2835 _externalState->shardingOnStepDownHook();
2836 _externalState->stopNoopWriter();
2837 break;
2838 case kActionStartSingleNodeElection:
2839 // In protocol version 1, single node replset will run an election instead of directly
2840 // calling _postWonElectionUpdateMemberState_inlock as in protocol version 0.
2841 _startElectSelfV1(TopologyCoordinator::StartElectionReason::kElectionTimeout);
2842 break;
2843 default:
2844 severe() << "Unknown post member state update action " << static_cast<int>(action);
2845 fassertFailed(26010);
2846 }
2847 }
2848
_postWonElectionUpdateMemberState_inlock()2849 void ReplicationCoordinatorImpl::_postWonElectionUpdateMemberState_inlock() {
2850 if (isV1ElectionProtocol()) {
2851 invariant(_topCoord->getTerm() != OpTime::kUninitializedTerm);
2852 _electionId = OID::fromTerm(_topCoord->getTerm());
2853 } else {
2854 _electionId = OID::gen();
2855 }
2856
2857 auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp();
2858 _topCoord->processWinElection(_electionId, ts);
2859 const PostMemberStateUpdateAction nextAction =
2860 _updateMemberStateFromTopologyCoordinator_inlock(nullptr);
2861 invariant(nextAction == kActionFollowerModeStateChange,
2862 str::stream() << "nextAction == " << static_cast<int>(nextAction));
2863 invariant(_getMemberState_inlock().primary());
2864 // Clear the sync source.
2865 _onFollowerModeStateChange();
2866 // Notify all secondaries of the election win.
2867 _restartHeartbeats_inlock();
2868 if (isV1ElectionProtocol()) {
2869 invariant(!_catchupState);
2870 _catchupState = stdx::make_unique<CatchupState>(this);
2871 _catchupState->start_inlock();
2872 } else {
2873 _enterDrainMode_inlock();
2874 }
2875 }
2876
_onFollowerModeStateChange()2877 void ReplicationCoordinatorImpl::_onFollowerModeStateChange() {
2878 _externalState->signalApplierToChooseNewSyncSource();
2879 }
2880
start_inlock()2881 void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
2882 log() << "Entering primary catch-up mode.";
2883
2884 // No catchup in single node replica set.
2885 if (_repl->_rsConfig.getNumMembers() == 1) {
2886 abort_inlock();
2887 return;
2888 }
2889
2890 auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod();
2891
2892 // When catchUpTimeoutMillis is 0, we skip doing catchup entirely.
2893 if (catchupTimeout == ReplSetConfig::kCatchUpDisabled) {
2894 log() << "Skipping primary catchup since the catchup timeout is 0.";
2895 abort_inlock();
2896 return;
2897 }
2898
2899 auto mutex = &_repl->_mutex;
2900 auto timeoutCB = [this, mutex](const CallbackArgs& cbData) {
2901 if (!cbData.status.isOK()) {
2902 return;
2903 }
2904 stdx::lock_guard<stdx::mutex> lk(*mutex);
2905 // Check whether the callback has been cancelled while holding mutex.
2906 if (cbData.myHandle.isCanceled()) {
2907 return;
2908 }
2909 log() << "Catchup timed out after becoming primary.";
2910 abort_inlock();
2911 };
2912
2913 // Deal with infinity and overflow - no timeout.
2914 if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout ||
2915 Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) {
2916 return;
2917 }
2918 // Schedule timeout callback.
2919 auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout;
2920 auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB);
2921 if (!status.isOK()) {
2922 log() << "Failed to schedule catchup timeout work.";
2923 abort_inlock();
2924 return;
2925 }
2926 _timeoutCbh = status.getValue();
2927 }
2928
abort_inlock()2929 void ReplicationCoordinatorImpl::CatchupState::abort_inlock() {
2930 invariant(_repl->_getMemberState_inlock().primary());
2931
2932 log() << "Exited primary catch-up mode.";
2933 // Clean up its own members.
2934 if (_timeoutCbh) {
2935 _repl->_replExecutor->cancel(_timeoutCbh);
2936 }
2937 if (_waiter) {
2938 _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
2939 }
2940
2941 // Enter primary drain mode.
2942 _repl->_enterDrainMode_inlock();
2943 // Destroy the state itself.
2944 _repl->_catchupState.reset();
2945 }
2946
signalHeartbeatUpdate_inlock()2947 void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() {
2948 auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart();
2949 // Haven't collected all heartbeat responses.
2950 if (!targetOpTime) {
2951 return;
2952 }
2953
2954 // We've caught up.
2955 const auto myLastApplied = _repl->_getMyLastAppliedOpTime_inlock();
2956 if (*targetOpTime <= myLastApplied) {
2957 log() << "Caught up to the latest optime known via heartbeats after becoming primary. "
2958 << "Target optime: " << *targetOpTime << ". My Last Applied: " << myLastApplied;
2959 abort_inlock();
2960 return;
2961 }
2962
2963 // Reset the target optime if it has changed.
2964 if (_waiter && _waiter->opTime == *targetOpTime) {
2965 return;
2966 }
2967
2968 log() << "Heartbeats updated catchup target optime to " << *targetOpTime;
2969 if (_waiter) {
2970 _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
2971 }
2972 auto targetOpTimeCB = [this, targetOpTime]() {
2973 // Double check the target time since stepdown may signal us too.
2974 const auto myLastApplied = _repl->_getMyLastAppliedOpTime_inlock();
2975 if (*targetOpTime <= myLastApplied) {
2976 log() << "Caught up to the latest known optime successfully after becoming primary. "
2977 << "Target optime: " << *targetOpTime << ". My Last Applied: " << myLastApplied;
2978 abort_inlock();
2979 }
2980 };
2981 _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB);
2982 _repl->_opTimeWaiterList.add_inlock(_waiter.get());
2983 }
2984
abortCatchupIfNeeded()2985 Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() {
2986 if (!isV1ElectionProtocol()) {
2987 return Status(ErrorCodes::CommandNotSupported,
2988 "Primary catch-up is only supported by Protocol Version 1");
2989 }
2990
2991 stdx::lock_guard<stdx::mutex> lk(_mutex);
2992 if (_catchupState) {
2993 _catchupState->abort_inlock();
2994 return Status::OK();
2995 }
2996 return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode.");
2997 }
2998
_enterDrainMode_inlock()2999 void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
3000 _applierState = ApplierState::Draining;
3001 _externalState->stopProducer();
3002 }
3003
processReplSetFresh(const ReplSetFreshArgs & args,BSONObjBuilder * resultObj)3004 Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args,
3005 BSONObjBuilder* resultObj) {
3006 stdx::lock_guard<stdx::mutex> lk(_mutex);
3007 Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse");
3008 _topCoord->prepareFreshResponse(args, _replExecutor->now(), resultObj, &result);
3009 return result;
3010 }
3011
processReplSetElect(const ReplSetElectArgs & args,BSONObjBuilder * responseObj)3012 Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args,
3013 BSONObjBuilder* responseObj) {
3014 stdx::lock_guard<stdx::mutex> lk(_mutex);
3015 Status result = Status(ErrorCodes::InternalError, "status not set by callback");
3016 _topCoord->prepareElectResponse(args, _replExecutor->now(), responseObj, &result);
3017 return result;
3018 }
3019
3020 ReplicationCoordinatorImpl::PostMemberStateUpdateAction
_setCurrentRSConfig_inlock(OperationContext * opCtx,const ReplSetConfig & newConfig,int myIndex)3021 ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(OperationContext* opCtx,
3022 const ReplSetConfig& newConfig,
3023 int myIndex) {
3024 invariant(_settings.usingReplSets());
3025 _cancelHeartbeats_inlock();
3026 _setConfigState_inlock(kConfigSteady);
3027
3028 _topCoord->updateConfig(newConfig, myIndex, _replExecutor->now());
3029
3030 // updateConfig() can change terms, so update our term shadow to match.
3031 _termShadow.store(_topCoord->getTerm());
3032
3033 const ReplSetConfig oldConfig = _rsConfig;
3034 _rsConfig = newConfig;
3035 _protVersion.store(_rsConfig.getProtocolVersion());
3036
3037 // Warn if this config has protocol version 0
3038 if (newConfig.getProtocolVersion() == 0 &&
3039 (!oldConfig.isInitialized() || oldConfig.getProtocolVersion() == 1)) {
3040 log() << startupWarningsLog;
3041 log() << "** WARNING: This replica set was configured with protocol version 0."
3042 << startupWarningsLog;
3043 log() << "** This protocol version is deprecated and subject to be removed "
3044 << startupWarningsLog;
3045 log() << "** in a future version." << startupWarningsLog;
3046 }
3047
3048 // Warn if running --nojournal and writeConcernMajorityJournalDefault = true
3049 StorageEngine* storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine();
3050 if (storageEngine && !storageEngine->isDurable() &&
3051 (newConfig.getWriteConcernMajorityShouldJournal() &&
3052 (!oldConfig.isInitialized() || !oldConfig.getWriteConcernMajorityShouldJournal()))) {
3053 log() << startupWarningsLog;
3054 log() << "** WARNING: This replica set node is running without journaling enabled but the "
3055 << startupWarningsLog;
3056 log() << "** writeConcernMajorityJournalDefault option to the replica set config "
3057 << startupWarningsLog;
3058 log() << "** is set to true. The writeConcernMajorityJournalDefault "
3059 << startupWarningsLog;
3060 log() << "** option to the replica set config must be set to false "
3061 << startupWarningsLog;
3062 log() << "** or w:majority write concerns will never complete."
3063 << startupWarningsLog;
3064 log() << "** In addition, this node's memory consumption may increase until all"
3065 << startupWarningsLog;
3066 log() << "** available free RAM is exhausted." << startupWarningsLog;
3067 log() << startupWarningsLog;
3068 }
3069
3070 // Warn if using the in-memory (ephemeral) storage engine with
3071 // writeConcernMajorityJournalDefault = true
3072 if (storageEngine && storageEngine->isEphemeral() &&
3073 (newConfig.getWriteConcernMajorityShouldJournal() &&
3074 (!oldConfig.isInitialized() || !oldConfig.getWriteConcernMajorityShouldJournal()))) {
3075 log() << startupWarningsLog;
3076 log() << "** WARNING: This replica set node is using in-memory (ephemeral) storage with the"
3077 << startupWarningsLog;
3078 log() << "** writeConcernMajorityJournalDefault option to the replica set config "
3079 << startupWarningsLog;
3080 log() << "** set to true. The writeConcernMajorityJournalDefault option to the "
3081 << startupWarningsLog;
3082 log() << "** replica set config must be set to false " << startupWarningsLog;
3083 log() << "** or w:majority write concerns will never complete."
3084 << startupWarningsLog;
3085 log() << "** In addition, this node's memory consumption may increase until all"
3086 << startupWarningsLog;
3087 log() << "** available free RAM is exhausted." << startupWarningsLog;
3088 log() << startupWarningsLog;
3089 }
3090
3091 log() << "New replica set config in use: " << _rsConfig.toBSON() << rsLog;
3092 _selfIndex = myIndex;
3093 if (_selfIndex >= 0) {
3094 log() << "This node is " << _rsConfig.getMemberAt(_selfIndex).getHostAndPort()
3095 << " in the config";
3096 } else {
3097 log() << "This node is not a member of the config";
3098 }
3099
3100 _cancelCatchupTakeover_inlock();
3101 _cancelPriorityTakeover_inlock();
3102 _cancelAndRescheduleElectionTimeout_inlock();
3103
3104 const PostMemberStateUpdateAction action =
3105 _updateMemberStateFromTopologyCoordinator_inlock(opCtx);
3106 if (_selfIndex >= 0) {
3107 // Don't send heartbeats if we're not in the config, if we get re-added one of the
3108 // nodes in the set will contact us.
3109 _startHeartbeats_inlock();
3110 }
3111
3112 /**
3113 * Protocol Version upgrade and downgrade.
3114 *
3115 * Since PV upgrade resets its term to 0, in-memory states that involve terms should be
3116 * reset on either PV downgrade or upgrade unless they can be updated in both PV0 and PV1 by
3117 * data replication or heartbeats, like the last applied. Otherwise PV downgrade then upgrade
3118 * will pollute the terms with higher terms from the first PV1.
3119 *
3120 * Here we reset those in-memory states including:
3121 * - last committed optime
3122 * - election id (reset for drivers' primary discovery)
3123 * - first optime of term (used by primary)
3124 * - optime used by snapshots
3125 * - stable optime candidates.
3126 *
3127 * _replicationWaiterList and _opTimeWaiterList are used by awaitReplication() and awaitOptime()
3128 * for write/read concerns. Because we are holding the global lock here, there should be no
3129 * running client requests and these waiter lists should be empty. One exception I can think of
3130 * is the target optime of catchup, which doesn't acquire the global lock. It's potentially
3131 * problematic, but very rare.
3132 */
3133 if (oldConfig.isInitialized()) {
3134 if (oldConfig.getProtocolVersion() > newConfig.getProtocolVersion()) {
3135 // Downgrade
3136 // Reset last committed optime for all nodes. After this reset, the commit point will
3137 // not move forward on secondaries since it's only maintained on primary in PV0 and
3138 // never get propagated to secondaries.
3139 _topCoord->resetLastCommittedOpTime();
3140 // Set election id if we're primary.
3141 if (_memberState.primary()) {
3142 invariant(newConfig.getProtocolVersion() == 0);
3143 _electionId = OID::gen();
3144 auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp();
3145 _topCoord->setElectionInfo(_electionId, ts);
3146 }
3147
3148 // On PV downgrade, we drop all snapshots and clear the stable optime candidates because
3149 // read majority isn't supported in PV0.
3150 //
3151 // TODO: On a reconfig in PV1, we should also drop all snapshots so we don't mistakenly
3152 // read from the wrong one, for example, if we change the meaning of the "committed"
3153 // snapshot from applied -> durable or the quorum gets changed.
3154 // We currently only do this on the primary that processes the replSetReconfig command,
3155 // but not on nodes that learn of the new config via a heartbeat.
3156 _dropAllSnapshots_inlock();
3157 // Also clear all stable snapshots. This is critical for PV downgrade and then upgrade
3158 // to make sure OpTimes before the downgrade don't interfere the second upgrade.
3159 _stableOpTimeCandidates.clear();
3160
3161 } else if (oldConfig.getProtocolVersion() < newConfig.getProtocolVersion()) {
3162 // Upgrade
3163 if (_memberState.primary()) {
3164 invariant(newConfig.getProtocolVersion() == 1);
3165 // The term is only set to "uninitialized" at startup or in PV0.
3166 invariant(_topCoord->getTerm() != OpTime::kUninitializedTerm);
3167 _electionId = OID::fromTerm(_topCoord->getTerm());
3168 // Allow anything to commit, because, technically, this node is also the previous
3169 // primary.
3170 OpTime firstOpTimeOfTerm(Timestamp(), _topCoord->getTerm());
3171 invariantOK(_topCoord->completeTransitionToPrimary(firstOpTimeOfTerm));
3172 auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp();
3173 _topCoord->setElectionInfo(_electionId, ts);
3174 }
3175 }
3176 }
3177
3178 // Update commit point for the primary. Called by every reconfig because the config
3179 // may change the definition of majority.
3180 //
3181 // On PV downgrade, commit point is probably still from PV1 but will advance to an OpTime with
3182 // term -1 once any write gets committed in PV0.
3183 _updateLastCommittedOpTime_inlock();
3184
3185 return action;
3186 }
3187
_wakeReadyWaiters_inlock()3188 void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() {
3189 _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) {
3190 return _doneWaitingForReplication_inlock(
3191 waiter->opTime, Timestamp(), *waiter->writeConcern);
3192 });
3193 }
3194
processReplSetUpdatePosition(const UpdatePositionArgs & updates,long long * configVersion)3195 Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates,
3196 long long* configVersion) {
3197 stdx::unique_lock<stdx::mutex> lock(_mutex);
3198 Status status = Status::OK();
3199 bool somethingChanged = false;
3200 for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin();
3201 update != updates.updatesEnd();
3202 ++update) {
3203 status = _setLastOptime_inlock(*update, configVersion);
3204 if (!status.isOK()) {
3205 break;
3206 }
3207 somethingChanged = true;
3208 }
3209
3210 if (somethingChanged && !_getMemberState_inlock().primary()) {
3211 lock.unlock();
3212 // Must do this outside _mutex
3213 _externalState->forwardSlaveProgress();
3214 }
3215 return status;
3216 }
3217
processHandshake(OperationContext * opCtx,const HandshakeArgs & handshake)3218 Status ReplicationCoordinatorImpl::processHandshake(OperationContext* opCtx,
3219 const HandshakeArgs& handshake) {
3220 LOG(2) << "Received handshake " << handshake.toBSON();
3221
3222 stdx::lock_guard<stdx::mutex> lock(_mutex);
3223
3224 if (getReplicationMode() != modeMasterSlave) {
3225 return Status(ErrorCodes::IllegalOperation,
3226 "The handshake command is only used for master/slave replication");
3227 }
3228
3229 auto* memberData = _topCoord->findMemberDataByRid(handshake.getRid());
3230 if (memberData) {
3231 return Status::OK(); // nothing to do
3232 }
3233
3234 memberData = _topCoord->addSlaveMemberData(handshake.getRid());
3235 memberData->setHostAndPort(_externalState->getClientHostAndPort(opCtx));
3236
3237 return Status::OK();
3238 }
3239
buildsIndexes()3240 bool ReplicationCoordinatorImpl::buildsIndexes() {
3241 stdx::lock_guard<stdx::mutex> lk(_mutex);
3242 if (_selfIndex == -1) {
3243 return true;
3244 }
3245 const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex);
3246 return self.shouldBuildIndexes();
3247 }
3248
getHostsWrittenTo(const OpTime & op,bool durablyWritten)3249 std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op,
3250 bool durablyWritten) {
3251 stdx::lock_guard<stdx::mutex> lk(_mutex);
3252 /* skip self in master-slave mode because our own HostAndPort is unknown */
3253 const bool skipSelf = getReplicationMode() == modeMasterSlave;
3254 return _topCoord->getHostsWrittenTo(op, durablyWritten, skipSelf);
3255 }
3256
getOtherNodesInReplSet() const3257 std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const {
3258 stdx::lock_guard<stdx::mutex> lk(_mutex);
3259 invariant(_settings.usingReplSets());
3260
3261 std::vector<HostAndPort> nodes;
3262 if (_selfIndex == -1) {
3263 return nodes;
3264 }
3265
3266 for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
3267 if (i == _selfIndex)
3268 continue;
3269
3270 nodes.push_back(_rsConfig.getMemberAt(i).getHostAndPort());
3271 }
3272 return nodes;
3273 }
3274
checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions & writeConcern) const3275 Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied(
3276 const WriteConcernOptions& writeConcern) const {
3277 stdx::lock_guard<stdx::mutex> lock(_mutex);
3278 return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
3279 }
3280
_checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions & writeConcern) const3281 Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock(
3282 const WriteConcernOptions& writeConcern) const {
3283 if (getReplicationMode() == modeNone) {
3284 return Status(ErrorCodes::NoReplicationEnabled,
3285 "No replication enabled when checking if write concern can be satisfied");
3286 }
3287
3288 if (getReplicationMode() == modeMasterSlave) {
3289 if (!writeConcern.wMode.empty()) {
3290 return Status(ErrorCodes::UnknownReplWriteConcern,
3291 "Cannot use named write concern modes in master-slave");
3292 }
3293 // No way to know how many slaves there are, so assume any numeric mode is possible.
3294 return Status::OK();
3295 }
3296
3297 invariant(getReplicationMode() == modeReplSet);
3298 return _rsConfig.checkIfWriteConcernCanBeSatisfied(writeConcern);
3299 }
3300
getGetLastErrorDefault()3301 WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() {
3302 stdx::lock_guard<stdx::mutex> lock(_mutex);
3303 if (_rsConfig.isInitialized()) {
3304 return _rsConfig.getDefaultWriteConcern();
3305 }
3306 return WriteConcernOptions();
3307 }
3308
checkReplEnabledForCommand(BSONObjBuilder * result)3309 Status ReplicationCoordinatorImpl::checkReplEnabledForCommand(BSONObjBuilder* result) {
3310 if (!_settings.usingReplSets()) {
3311 if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
3312 result->append("info", "configsvr"); // for shell prompt
3313 }
3314 return Status(ErrorCodes::NoReplicationEnabled, "not running with --replSet");
3315 }
3316
3317 if (getMemberState().startup()) {
3318 result->append("info", "run rs.initiate(...) if not yet done for the set");
3319 return Status(ErrorCodes::NotYetInitialized, "no replset config has been received");
3320 }
3321
3322 return Status::OK();
3323 }
3324
isReplEnabled() const3325 bool ReplicationCoordinatorImpl::isReplEnabled() const {
3326 return getReplicationMode() != modeNone;
3327 }
3328
chooseNewSyncSource(const OpTime & lastOpTimeFetched)3329 HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
3330 stdx::lock_guard<stdx::mutex> lk(_mutex);
3331
3332 HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
3333 // Always allow chaining while in catchup and drain mode.
3334 auto chainingPreference = _getMemberState_inlock().primary()
3335 ? TopologyCoordinator::ChainingPreference::kAllowChaining
3336 : TopologyCoordinator::ChainingPreference::kUseConfiguration;
3337 HostAndPort newSyncSource =
3338 _topCoord->chooseNewSyncSource(_replExecutor->now(), lastOpTimeFetched, chainingPreference);
3339
3340 // If we lost our sync source, schedule new heartbeats immediately to update our knowledge
3341 // of other members's state, allowing us to make informed sync source decisions.
3342 if (newSyncSource.empty() && !oldSyncSource.empty() && _selfIndex >= 0 &&
3343 !_getMemberState_inlock().primary()) {
3344 _restartHeartbeats_inlock();
3345 }
3346
3347 return newSyncSource;
3348 }
3349
_unblacklistSyncSource(const executor::TaskExecutor::CallbackArgs & cbData,const HostAndPort & host)3350 void ReplicationCoordinatorImpl::_unblacklistSyncSource(
3351 const executor::TaskExecutor::CallbackArgs& cbData, const HostAndPort& host) {
3352 if (cbData.status == ErrorCodes::CallbackCanceled)
3353 return;
3354
3355 stdx::lock_guard<stdx::mutex> lock(_mutex);
3356 _topCoord->unblacklistSyncSource(host, _replExecutor->now());
3357 }
3358
blacklistSyncSource(const HostAndPort & host,Date_t until)3359 void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
3360 stdx::lock_guard<stdx::mutex> lock(_mutex);
3361 _topCoord->blacklistSyncSource(host, until);
3362 _scheduleWorkAt(until,
3363 stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource,
3364 this,
3365 stdx::placeholders::_1,
3366 host));
3367 }
3368
resetLastOpTimesFromOplog(OperationContext * opCtx,DataConsistency consistency)3369 void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opCtx,
3370 DataConsistency consistency) {
3371 StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(opCtx);
3372 OpTime lastOpTime;
3373 if (!lastOpTimeStatus.isOK()) {
3374 warning() << "Failed to load timestamp of most recently applied operation; "
3375 << lastOpTimeStatus.getStatus();
3376 } else {
3377 lastOpTime = lastOpTimeStatus.getValue();
3378 }
3379
3380 stdx::unique_lock<stdx::mutex> lock(_mutex);
3381 bool isRollbackAllowed = true;
3382 _setMyLastAppliedOpTime_inlock(lastOpTime, isRollbackAllowed, consistency);
3383 _setMyLastDurableOpTime_inlock(lastOpTime, isRollbackAllowed);
3384 _reportUpstream_inlock(std::move(lock));
3385 // Unlocked below.
3386
3387 _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTime.getTimestamp());
3388 }
3389
shouldChangeSyncSource(const HostAndPort & currentSource,const rpc::ReplSetMetadata & replMetadata,boost::optional<rpc::OplogQueryMetadata> oqMetadata)3390 bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
3391 const HostAndPort& currentSource,
3392 const rpc::ReplSetMetadata& replMetadata,
3393 boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
3394 stdx::lock_guard<stdx::mutex> lock(_mutex);
3395 return _topCoord->shouldChangeSyncSource(
3396 currentSource, replMetadata, oqMetadata, _replExecutor->now());
3397 }
3398
_updateLastCommittedOpTime_inlock()3399 void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
3400 if (_topCoord->updateLastCommittedOpTime()) {
3401 _setStableTimestampForStorage_inlock();
3402 }
3403 // Wake up any threads waiting for replication that now have their replication
3404 // check satisfied. We must do this regardless of whether we updated the lastCommittedOpTime,
3405 // as lastCommittedOpTime may be based on durable optimes whereas some waiters may be
3406 // waiting on applied (but not necessarily durable) optimes.
3407 _wakeReadyWaiters_inlock();
3408 _signalStepDownWaiterIfReady_inlock();
3409 }
3410
_calculateStableOpTime_inlock(const std::set<OpTime> & candidates,const OpTime & commitPoint)3411 boost::optional<OpTime> ReplicationCoordinatorImpl::_calculateStableOpTime_inlock(
3412 const std::set<OpTime>& candidates, const OpTime& commitPoint) {
3413
3414 // No optime candidates.
3415 if (candidates.empty()) {
3416 return boost::none;
3417 }
3418
3419 auto maximumStableTimestamp = commitPoint.getTimestamp();
3420 if (_canAcceptNonLocalWrites.loadRelaxed() && _storage->supportsDocLocking(_service)) {
3421 // If the storage engine supports document level locking, then it is possible for oplog
3422 // writes to commit out of order. In that case, we don't want to set the stable timestamp
3423 // ahead of the all committed timestamp. This is not a problem for oplog application
3424 // because we only set lastApplied between batches when the all committed timestamp cannot
3425 // be behind. During oplog application the all committed timestamp can jump around since
3426 // we first write oplog entries to the oplog and then go back and apply them.
3427 //
3428 // If the all committed timestamp is less than the commit point, then we are guaranteed that
3429 // there are no stable timestamp candidates with a greater timestamp than the all committed
3430 // timestamp and a lower term than the commit point. Thus we can consider the all committed
3431 // timestamp to have the same term as the commit point. When a primary enters a new term, it
3432 // first storage-commits a 'new primary' oplog entry in the new term before accepting any
3433 // new writes. This will ensure that the all committed timestamp is in the new term before
3434 // any writes in the new term are replication committed.
3435 maximumStableTimestamp =
3436 std::min(_storage->getAllCommittedTimestamp(_service), commitPoint.getTimestamp());
3437 }
3438 const auto maximumStableOpTime = OpTime(maximumStableTimestamp, commitPoint.getTerm());
3439
3440 // Find the greatest optime candidate that is less than or equal to the commit point.
3441 // To do this we first find the upper bound of 'commitPoint', which points to the smallest
3442 // element in 'candidates' that is greater than 'commitPoint'. We then step back one element,
3443 // which should give us the largest element in 'candidates' that is less than or equal to the
3444 // 'commitPoint'.
3445 auto upperBoundIter = candidates.upper_bound(maximumStableOpTime);
3446
3447 // All optime candidates are greater than the commit point.
3448 if (upperBoundIter == candidates.begin()) {
3449 return boost::none;
3450 }
3451 // There is a valid stable optime.
3452 else {
3453 return *std::prev(upperBoundIter);
3454 }
3455 }
3456
_cleanupStableOpTimeCandidates(std::set<OpTime> * candidates,OpTime stableOpTime)3457 void ReplicationCoordinatorImpl::_cleanupStableOpTimeCandidates(std::set<OpTime>* candidates,
3458 OpTime stableOpTime) {
3459 // Discard optime candidates earlier than the current stable optime, since we don't need
3460 // them anymore. To do this, we find the lower bound of the 'stableOpTime' which is the first
3461 // element that is greater than or equal to the 'stableOpTime'. Then we discard everything up
3462 // to but not including this lower bound i.e. 'deletePoint'.
3463 auto deletePoint = candidates->lower_bound(stableOpTime);
3464
3465 // Delete the entire range of unneeded optimes.
3466 candidates->erase(candidates->begin(), deletePoint);
3467 }
3468
calculateStableOpTime_forTest(const std::set<OpTime> & candidates,const OpTime & commitPoint)3469 boost::optional<OpTime> ReplicationCoordinatorImpl::calculateStableOpTime_forTest(
3470 const std::set<OpTime>& candidates, const OpTime& commitPoint) {
3471 stdx::lock_guard<stdx::mutex> lk(_mutex);
3472 return _calculateStableOpTime_inlock(candidates, commitPoint);
3473 }
cleanupStableOpTimeCandidates_forTest(std::set<OpTime> * candidates,OpTime stableOpTime)3474 void ReplicationCoordinatorImpl::cleanupStableOpTimeCandidates_forTest(std::set<OpTime>* candidates,
3475 OpTime stableOpTime) {
3476 _cleanupStableOpTimeCandidates(candidates, stableOpTime);
3477 }
3478
getStableOpTimeCandidates_forTest()3479 std::set<OpTime> ReplicationCoordinatorImpl::getStableOpTimeCandidates_forTest() {
3480 stdx::unique_lock<stdx::mutex> lk(_mutex);
3481 return _stableOpTimeCandidates;
3482 }
3483
getStableOpTime_forTest()3484 boost::optional<OpTime> ReplicationCoordinatorImpl::getStableOpTime_forTest() {
3485 return _getStableOpTime_inlock();
3486 }
3487
_getStableOpTime_inlock()3488 boost::optional<OpTime> ReplicationCoordinatorImpl::_getStableOpTime_inlock() {
3489 auto commitPoint = _topCoord->getLastCommittedOpTime();
3490 if (_currentCommittedSnapshot) {
3491 auto snapshotOpTime = *_currentCommittedSnapshot;
3492 invariant(snapshotOpTime.getTimestamp() <= commitPoint.getTimestamp());
3493 invariant(snapshotOpTime <= commitPoint);
3494 }
3495
3496 // Compute the current stable optime.
3497 auto stableOpTime = _calculateStableOpTime_inlock(_stableOpTimeCandidates, commitPoint);
3498 if (stableOpTime) {
3499 // By definition, the stable optime should never be greater than the commit point.
3500 invariant(stableOpTime->getTimestamp() <= commitPoint.getTimestamp());
3501 invariant(*stableOpTime <= commitPoint);
3502 }
3503
3504 return stableOpTime;
3505 }
3506
_setStableTimestampForStorage_inlock()3507 void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() {
3508
3509 // Get the current stable optime.
3510 auto stableOpTime = _getStableOpTime_inlock();
3511
3512 // If there is a valid stable optime, set it for the storage engine, and then remove any
3513 // old, unneeded stable optime candidates.
3514 if (stableOpTime) {
3515 LOG(2) << "Setting replication's stable optime to " << stableOpTime.value();
3516
3517 if (!testingSnapshotBehaviorInIsolation) {
3518 // Update committed snapshot and wake up any threads waiting on read concern or
3519 // write concern.
3520 _updateCommittedSnapshot_inlock(stableOpTime.get());
3521 // Update the stable timestamp for the storage engine.
3522 _storage->setStableTimestamp(getServiceContext(), stableOpTime->getTimestamp());
3523 }
3524 _cleanupStableOpTimeCandidates(&_stableOpTimeCandidates, stableOpTime.get());
3525 }
3526 }
3527
advanceCommitPoint(const OpTime & committedOpTime)3528 void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTime) {
3529 stdx::unique_lock<stdx::mutex> lk(_mutex);
3530 _advanceCommitPoint_inlock(committedOpTime);
3531 }
3532
_advanceCommitPoint_inlock(const OpTime & committedOpTime)3533 void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& committedOpTime) {
3534 if (_topCoord->advanceLastCommittedOpTime(committedOpTime)) {
3535 if (_getMemberState_inlock().arbiter()) {
3536 // Arbiters do not store replicated data, so we consider their data trivially
3537 // consistent.
3538 _setMyLastAppliedOpTime_inlock(committedOpTime, false, DataConsistency::Consistent);
3539 }
3540
3541 _setStableTimestampForStorage_inlock();
3542 // Even if we have no new snapshot, we need to notify waiters that the commit point moved.
3543 _externalState->notifyOplogMetadataWaiters(committedOpTime);
3544 }
3545 }
3546
getLastCommittedOpTime() const3547 OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
3548 stdx::unique_lock<stdx::mutex> lk(_mutex);
3549 return _topCoord->getLastCommittedOpTime();
3550 }
3551
processReplSetRequestVotes(OperationContext * opCtx,const ReplSetRequestVotesArgs & args,ReplSetRequestVotesResponse * response)3552 Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
3553 OperationContext* opCtx,
3554 const ReplSetRequestVotesArgs& args,
3555 ReplSetRequestVotesResponse* response) {
3556 if (!isV1ElectionProtocol()) {
3557 return {ErrorCodes::BadValue, "not using election protocol v1"};
3558 }
3559
3560 auto termStatus = updateTerm(opCtx, args.getTerm());
3561 if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm)
3562 return termStatus;
3563
3564 {
3565 stdx::lock_guard<stdx::mutex> lk(_mutex);
3566
3567 // We should only enter terminal shutdown from global terminal exit. In that case, rather
3568 // than voting in a term we don't plan to stay alive in, refuse to vote.
3569 if (_inTerminalShutdown) {
3570 return Status(ErrorCodes::ShutdownInProgress, "In the process of shutting down");
3571 }
3572
3573 _topCoord->processReplSetRequestVotes(args, response);
3574 }
3575
3576 if (!args.isADryRun() && response->getVoteGranted()) {
3577 LastVote lastVote{args.getTerm(), args.getCandidateIndex()};
3578
3579 Status status = _externalState->storeLocalLastVoteDocument(opCtx, lastVote);
3580 if (!status.isOK()) {
3581 error() << "replSetRequestVotes failed to store LastVote document; " << status;
3582 return status;
3583 }
3584 }
3585 return Status::OK();
3586 }
3587
prepareReplMetadata(const BSONObj & metadataRequestObj,const OpTime & lastOpTimeFromClient,BSONObjBuilder * builder) const3588 void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequestObj,
3589 const OpTime& lastOpTimeFromClient,
3590 BSONObjBuilder* builder) const {
3591
3592 bool hasReplSetMetadata = metadataRequestObj.hasField(rpc::kReplSetMetadataFieldName);
3593 bool hasOplogQueryMetadata = metadataRequestObj.hasField(rpc::kOplogQueryMetadataFieldName);
3594 // Don't take any locks if we do not need to.
3595 if (!hasReplSetMetadata && !hasOplogQueryMetadata) {
3596 return;
3597 }
3598
3599 // Avoid retrieving Rollback ID if we do not need it for _prepareOplogQueryMetadata_inlock().
3600 int rbid = -1;
3601 if (hasOplogQueryMetadata) {
3602 rbid = _replicationProcess->getRollbackID();
3603 invariant(-1 != rbid);
3604 }
3605
3606 stdx::lock_guard<stdx::mutex> lk(_mutex);
3607
3608 if (hasReplSetMetadata) {
3609 _prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder);
3610 }
3611
3612 if (hasOplogQueryMetadata) {
3613 _prepareOplogQueryMetadata_inlock(rbid, builder);
3614 }
3615 }
3616
_prepareReplSetMetadata_inlock(const OpTime & lastOpTimeFromClient,BSONObjBuilder * builder) const3617 void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient,
3618 BSONObjBuilder* builder) const {
3619 OpTime lastVisibleOpTime =
3620 std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock());
3621 auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime);
3622 metadata.writeToMetadata(builder).transitional_ignore();
3623 }
3624
_prepareOplogQueryMetadata_inlock(int rbid,BSONObjBuilder * builder) const3625 void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(int rbid,
3626 BSONObjBuilder* builder) const {
3627 _topCoord->prepareOplogQueryMetadata(rbid).writeToMetadata(builder).transitional_ignore();
3628 }
3629
isV1ElectionProtocol() const3630 bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const {
3631 return _protVersion.load() == 1;
3632 }
3633
getWriteConcernMajorityShouldJournal()3634 bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal() {
3635 return getConfig().getWriteConcernMajorityShouldJournal();
3636 }
3637
getWriteConcernMajorityShouldJournal_inlock() const3638 bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal_inlock() const {
3639 return _rsConfig.getWriteConcernMajorityShouldJournal();
3640 }
3641
processHeartbeatV1(const ReplSetHeartbeatArgsV1 & args,ReplSetHeartbeatResponse * response)3642 Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
3643 ReplSetHeartbeatResponse* response) {
3644 {
3645 stdx::lock_guard<stdx::mutex> lock(_mutex);
3646 if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
3647 return Status(ErrorCodes::NotYetInitialized,
3648 "Received heartbeat while still initializing replication system");
3649 }
3650 }
3651
3652 Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
3653 stdx::lock_guard<stdx::mutex> lk(_mutex);
3654
3655 auto senderHost(args.getSenderHost());
3656 const Date_t now = _replExecutor->now();
3657 result = _topCoord->prepareHeartbeatResponseV1(now, args, _settings.ourSetName(), response);
3658
3659 if ((result.isOK() || result == ErrorCodes::InvalidReplicaSetConfig) && _selfIndex < 0) {
3660 // If this node does not belong to the configuration it knows about, send heartbeats
3661 // back to any node that sends us a heartbeat, in case one of those remote nodes has
3662 // a configuration that contains us. Chances are excellent that it will, since that
3663 // is the only reason for a remote node to send this node a heartbeat request.
3664 if (!senderHost.empty() && _seedList.insert(senderHost).second) {
3665 _scheduleHeartbeatToTarget_inlock(senderHost, -1, now);
3666 }
3667 } else if (result.isOK() && response->getConfigVersion() < args.getConfigVersion()) {
3668 // Schedule a heartbeat to the sender to fetch the new config.
3669 // We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
3670 // will trigger reconfig, which cancels and reschedules all heartbeats.
3671 if (args.hasSender()) {
3672 int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
3673 _scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now);
3674 }
3675 } else if (result.isOK()) {
3676 // Update liveness for sending node.
3677 auto* memberData = _topCoord->findMemberDataByMemberId(args.getSenderId());
3678 if (!memberData) {
3679 return result;
3680 }
3681 memberData->updateLiveness(_replExecutor->now());
3682 }
3683 return result;
3684 }
3685
summarizeAsHtml(ReplSetHtmlSummary * output)3686 void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
3687 stdx::lock_guard<stdx::mutex> lock(_mutex);
3688
3689 // TODO(dannenberg) consider putting both optimes into the htmlsummary.
3690 output->setSelfOptime(_getMyLastAppliedOpTime_inlock());
3691 output->setSelfUptime(time(0) - serverGlobalParams.started);
3692 output->setNow(_replExecutor->now());
3693
3694 _topCoord->summarizeAsHtml(output);
3695 }
3696
getTerm()3697 long long ReplicationCoordinatorImpl::getTerm() {
3698 // Note: no mutex acquisition here, as we are reading an Atomic variable.
3699 return _termShadow.load();
3700 }
3701
updateTerm_forTest(long long term,TopologyCoordinator::UpdateTermResult * updateResult)3702 EventHandle ReplicationCoordinatorImpl::updateTerm_forTest(
3703 long long term, TopologyCoordinator::UpdateTermResult* updateResult) {
3704 stdx::lock_guard<stdx::mutex> lock(_mutex);
3705
3706 EventHandle finishEvh;
3707 finishEvh = _updateTerm_inlock(term, updateResult);
3708 return finishEvh;
3709 }
3710
updateTerm(OperationContext * opCtx,long long term)3711 Status ReplicationCoordinatorImpl::updateTerm(OperationContext* opCtx, long long term) {
3712 // Term is only valid if we are replicating.
3713 if (getReplicationMode() != modeReplSet) {
3714 return {ErrorCodes::BadValue, "cannot supply 'term' without active replication"};
3715 }
3716
3717 if (!isV1ElectionProtocol()) {
3718 // Do not update if not in V1 protocol.
3719 return Status::OK();
3720 }
3721
3722 // Check we haven't acquired any lock, because potential stepdown needs global lock.
3723 dassert(!opCtx->lockState()->isLocked());
3724 TopologyCoordinator::UpdateTermResult updateTermResult;
3725 EventHandle finishEvh;
3726
3727 {
3728 stdx::lock_guard<stdx::mutex> lock(_mutex);
3729 finishEvh = _updateTerm_inlock(term, &updateTermResult);
3730 }
3731
3732 // Wait for potential stepdown to finish.
3733 if (finishEvh.isValid()) {
3734 _replExecutor->waitForEvent(finishEvh);
3735 }
3736 if (updateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm ||
3737 updateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
3738 return {ErrorCodes::StaleTerm, "Replication term of this node was stale; retry query"};
3739 }
3740
3741 return Status::OK();
3742 }
3743
_updateTerm_inlock(long long term,TopologyCoordinator::UpdateTermResult * updateTermResult)3744 EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock(
3745 long long term, TopologyCoordinator::UpdateTermResult* updateTermResult) {
3746 if (!isV1ElectionProtocol()) {
3747 LOG(3) << "Cannot update term in election protocol version 0";
3748 return EventHandle();
3749 }
3750
3751 auto now = _replExecutor->now();
3752 TopologyCoordinator::UpdateTermResult localUpdateTermResult = _topCoord->updateTerm(term, now);
3753 {
3754 if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm) {
3755 _termShadow.store(term);
3756 _cancelPriorityTakeover_inlock();
3757 _cancelAndRescheduleElectionTimeout_inlock();
3758 }
3759 }
3760
3761 if (updateTermResult) {
3762 *updateTermResult = localUpdateTermResult;
3763 }
3764
3765 if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
3766 if (!_pendingTermUpdateDuringStepDown || *_pendingTermUpdateDuringStepDown < term) {
3767 _pendingTermUpdateDuringStepDown = term;
3768 }
3769 if (_topCoord->prepareForUnconditionalStepDown()) {
3770 log() << "stepping down from primary, because a new term has begun: " << term;
3771 return _stepDownStart();
3772 } else {
3773 LOG(2) << "Updated term but not triggering stepdown because we are already in the "
3774 "process of stepping down";
3775 }
3776 }
3777 return EventHandle();
3778 }
3779
reserveSnapshotName(OperationContext * opCtx)3780 Timestamp ReplicationCoordinatorImpl::reserveSnapshotName(OperationContext* opCtx) {
3781 Timestamp reservedName;
3782 if (getReplicationMode() == Mode::modeReplSet) {
3783 invariant(opCtx->lockState()->isLocked());
3784 if (getMemberState().primary()) {
3785 // Use the current optime on the node, for primary nodes.
3786 reservedName = LogicalClock::get(getServiceContext())->getClusterTime().asTimestamp();
3787 } else {
3788 // Use lastApplied time, for secondary nodes.
3789 reservedName = getMyLastAppliedOpTime().getTimestamp();
3790 }
3791 } else {
3792 // All snapshots are the same for a standalone node.
3793 reservedName = Timestamp();
3794 }
3795 // This was just in case the snapshot name was different from the lastOp in the client.
3796 ReplClientInfo::forClient(opCtx->getClient()).setLastSnapshot(reservedName);
3797 return reservedName;
3798 }
3799
waitUntilSnapshotCommitted(OperationContext * opCtx,const Timestamp & untilSnapshot)3800 void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx,
3801 const Timestamp& untilSnapshot) {
3802 stdx::unique_lock<stdx::mutex> lock(_mutex);
3803
3804 uassert(ErrorCodes::NotYetInitialized,
3805 "Cannot use snapshots until replica set is finished initializing.",
3806 _rsConfigState != kConfigUninitialized && _rsConfigState != kConfigInitiating);
3807 while (!_currentCommittedSnapshot ||
3808 _currentCommittedSnapshot->getTimestamp() < untilSnapshot) {
3809 opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock);
3810 }
3811 }
3812
getNumUncommittedSnapshots()3813 size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() {
3814 return _uncommittedSnapshotsSize.load();
3815 }
3816
3817 MONGO_FP_DECLARE(disableSnapshotting);
3818
_updateCommittedSnapshot_inlock(const OpTime & newCommittedSnapshot)3819 void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
3820 const OpTime& newCommittedSnapshot) {
3821 if (testingSnapshotBehaviorInIsolation) {
3822 return;
3823 }
3824
3825 // If we are in ROLLBACK state, do not set any new _currentCommittedSnapshot, as it will be
3826 // cleared at the end of rollback anyway.
3827 if (_memberState.rollback()) {
3828 log() << "Not updating committed snapshot because we are in rollback";
3829 return;
3830 }
3831
3832 invariant(!newCommittedSnapshot.isNull());
3833 invariant(newCommittedSnapshot.getTimestamp() <=
3834 _topCoord->getLastCommittedOpTime().getTimestamp());
3835
3836 // The new committed snapshot should be >= the current snapshot.
3837 if (_currentCommittedSnapshot) {
3838 invariant(newCommittedSnapshot >= _currentCommittedSnapshot);
3839 }
3840
3841 if (MONGO_FAIL_POINT(disableSnapshotting))
3842 return;
3843 _currentCommittedSnapshot = newCommittedSnapshot;
3844 _currentCommittedSnapshotCond.notify_all();
3845
3846 _externalState->updateCommittedSnapshot(newCommittedSnapshot);
3847
3848 // Wake up any threads waiting for read concern or write concern.
3849 _wakeReadyWaiters_inlock();
3850 }
3851
dropAllSnapshots()3852 void ReplicationCoordinatorImpl::dropAllSnapshots() {
3853 stdx::lock_guard<stdx::mutex> lock(_mutex);
3854 _dropAllSnapshots_inlock();
3855 }
3856
_dropAllSnapshots_inlock()3857 void ReplicationCoordinatorImpl::_dropAllSnapshots_inlock() {
3858 _currentCommittedSnapshot = boost::none;
3859 _externalState->dropAllSnapshots();
3860 }
3861
waitForElectionFinish_forTest()3862 void ReplicationCoordinatorImpl::waitForElectionFinish_forTest() {
3863 if (_electionFinishedEvent.isValid()) {
3864 _replExecutor->waitForEvent(_electionFinishedEvent);
3865 }
3866 }
3867
waitForElectionDryRunFinish_forTest()3868 void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() {
3869 if (_electionDryRunFinishedEvent.isValid()) {
3870 _replExecutor->waitForEvent(_electionDryRunFinishedEvent);
3871 }
3872 }
3873
_resetElectionInfoOnProtocolVersionUpgrade(OperationContext * opCtx,const ReplSetConfig & oldConfig,const ReplSetConfig & newConfig)3874 void ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgrade(
3875 OperationContext* opCtx, const ReplSetConfig& oldConfig, const ReplSetConfig& newConfig) {
3876
3877 // On protocol version upgrade, reset last vote as if I just learned the term 0 from other
3878 // nodes.
3879 if (!oldConfig.isInitialized() ||
3880 oldConfig.getProtocolVersion() >= newConfig.getProtocolVersion()) {
3881 return;
3882 }
3883 invariant(newConfig.getProtocolVersion() == 1);
3884
3885 const LastVote lastVote{OpTime::kInitialTerm, -1};
3886 fassert(40445, _externalState->storeLocalLastVoteDocument(opCtx, lastVote));
3887 }
3888
_scheduleWorkAt(Date_t when,const CallbackFn & work)3889 CallbackHandle ReplicationCoordinatorImpl::_scheduleWorkAt(Date_t when, const CallbackFn& work) {
3890 auto cbh = _replExecutor->scheduleWorkAt(when, [work](const CallbackArgs& args) {
3891 if (args.status == ErrorCodes::CallbackCanceled) {
3892 return;
3893 }
3894 work(args);
3895 });
3896 if (cbh == ErrorCodes::ShutdownInProgress) {
3897 return {};
3898 }
3899 return fassertStatusOK(28800, cbh);
3900 }
3901
_makeEvent()3902 EventHandle ReplicationCoordinatorImpl::_makeEvent() {
3903 auto eventResult = this->_replExecutor->makeEvent();
3904 if (eventResult.getStatus() == ErrorCodes::ShutdownInProgress) {
3905 return EventHandle();
3906 }
3907 fassert(28825, eventResult.getStatus());
3908 return eventResult.getValue();
3909 }
3910
populateUnsetWriteConcernOptionsSyncMode(WriteConcernOptions wc)3911 WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode(
3912 WriteConcernOptions wc) {
3913
3914 WriteConcernOptions writeConcern(wc);
3915 if (writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET) {
3916 if (writeConcern.wMode == WriteConcernOptions::kMajority &&
3917 getWriteConcernMajorityShouldJournal()) {
3918 writeConcern.syncMode = WriteConcernOptions::SyncMode::JOURNAL;
3919 } else {
3920 writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE;
3921 }
3922 }
3923 return writeConcern;
3924 }
3925
_wrapAsCallbackFn(const stdx::function<void ()> & work)3926 CallbackFn ReplicationCoordinatorImpl::_wrapAsCallbackFn(const stdx::function<void()>& work) {
3927 return [work](const CallbackArgs& cbData) {
3928 if (cbData.status == ErrorCodes::CallbackCanceled) {
3929 return;
3930 }
3931
3932 work();
3933 };
3934 }
3935
stepUpIfEligible(bool skipDryRun)3936 Status ReplicationCoordinatorImpl::stepUpIfEligible(bool skipDryRun) {
3937 if (!isV1ElectionProtocol()) {
3938 return Status(ErrorCodes::CommandNotSupported,
3939 "Step-up command is only supported by Protocol Version 1");
3940 }
3941
3942 auto reason = skipDryRun ? TopologyCoordinator::StartElectionReason::kStepUpRequestSkipDryRun
3943 : TopologyCoordinator::StartElectionReason::kStepUpRequest;
3944 _startElectSelfIfEligibleV1(reason);
3945
3946 EventHandle finishEvent;
3947 {
3948 stdx::lock_guard<stdx::mutex> lk(_mutex);
3949 finishEvent = _electionFinishedEvent;
3950 }
3951 if (finishEvent.isValid()) {
3952 _replExecutor->waitForEvent(finishEvent);
3953 }
3954 auto state = getMemberState();
3955 if (state.primary()) {
3956 return Status::OK();
3957 }
3958 return Status(ErrorCodes::CommandFailed, "Election failed.");
3959 }
3960
getIndexPrefetchConfig() const3961 ReplSettings::IndexPrefetchConfig ReplicationCoordinatorImpl::getIndexPrefetchConfig() const {
3962 stdx::lock_guard<stdx::mutex> lock(_indexPrefetchMutex);
3963 return _indexPrefetchConfig;
3964 }
3965
setIndexPrefetchConfig(const ReplSettings::IndexPrefetchConfig cfg)3966 void ReplicationCoordinatorImpl::setIndexPrefetchConfig(
3967 const ReplSettings::IndexPrefetchConfig cfg) {
3968 stdx::lock_guard<stdx::mutex> lock(_indexPrefetchMutex);
3969 _indexPrefetchConfig = cfg;
3970 }
3971
_cancelElectionIfNeeded_inlock()3972 executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inlock() {
3973 if (_topCoord->getRole() != TopologyCoordinator::Role::kCandidate) {
3974 return {};
3975 }
3976 if (isV1ElectionProtocol()) {
3977 invariant(_voteRequester);
3978 _voteRequester->cancel();
3979 } else {
3980 invariant(_freshnessChecker);
3981 _freshnessChecker->cancel();
3982 if (_electCmdRunner) {
3983 _electCmdRunner->cancel();
3984 }
3985 }
3986 return _electionFinishedEvent;
3987 }
3988
_nextRandomInt64_inlock(int64_t limit)3989 int64_t ReplicationCoordinatorImpl::_nextRandomInt64_inlock(int64_t limit) {
3990 return _random.nextInt64(limit);
3991 }
3992
3993 } // namespace repl
3994 } // namespace mongo
3995