1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/repl/topology_coordinator.h"
36
37 #include <limits>
38 #include <string>
39
40 #include "mongo/db/audit.h"
41 #include "mongo/db/client.h"
42 #include "mongo/db/mongod_options.h"
43 #include "mongo/db/operation_context.h"
44 #include "mongo/db/repl/heartbeat_response_action.h"
45 #include "mongo/db/repl/is_master_response.h"
46 #include "mongo/db/repl/isself.h"
47 #include "mongo/db/repl/repl_set_heartbeat_args.h"
48 #include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
49 #include "mongo/db/repl/repl_set_heartbeat_response.h"
50 #include "mongo/db/repl/repl_set_html_summary.h"
51 #include "mongo/db/repl/repl_set_request_votes_args.h"
52 #include "mongo/db/repl/rslog.h"
53 #include "mongo/db/repl/update_position_args.h"
54 #include "mongo/db/server_parameters.h"
55 #include "mongo/rpc/metadata/oplog_query_metadata.h"
56 #include "mongo/rpc/metadata/repl_set_metadata.h"
57 #include "mongo/util/assert_util.h"
58 #include "mongo/util/fail_point_service.h"
59 #include "mongo/util/hex.h"
60 #include "mongo/util/log.h"
61 #include "mongo/util/mongoutils/str.h"
62 #include "mongo/util/scopeguard.h"
63
64 namespace mongo {
65 namespace repl {
66
67 using std::vector;
68
69 MONGO_FP_DECLARE(forceSyncSourceCandidate);
70
71 const Seconds TopologyCoordinator::VoteLease::leaseTime = Seconds(30);
72
73 // Controls how caught up in replication a secondary with higher priority than the current primary
74 // must be before it will call for a priority takeover election.
75 MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2);
76
77 // If this fail point is enabled, TopologyCoordinator::shouldChangeSyncSource() will ignore
78 // the option TopologyCoordinator::Options::maxSyncSourceLagSecs. The sync source will not be
79 // re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds.
80 MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs);
81
82 constexpr Milliseconds TopologyCoordinator::PingStats::UninitializedPingTime;
83
roleToString(TopologyCoordinator::Role role)84 std::string TopologyCoordinator::roleToString(TopologyCoordinator::Role role) {
85 switch (role) {
86 case TopologyCoordinator::Role::kLeader:
87 return "leader";
88 case TopologyCoordinator::Role::kFollower:
89 return "follower";
90 case TopologyCoordinator::Role::kCandidate:
91 return "candidate";
92 }
93 invariant(false);
94 }
95
~TopologyCoordinator()96 TopologyCoordinator::~TopologyCoordinator() {}
97
operator <<(std::ostream & os,TopologyCoordinator::Role role)98 std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role) {
99 return os << TopologyCoordinator::roleToString(role);
100 }
101
operator <<(std::ostream & os,TopologyCoordinator::PrepareFreezeResponseResult result)102 std::ostream& operator<<(std::ostream& os,
103 TopologyCoordinator::PrepareFreezeResponseResult result) {
104 switch (result) {
105 case TopologyCoordinator::PrepareFreezeResponseResult::kNoAction:
106 return os << "no action";
107 case TopologyCoordinator::PrepareFreezeResponseResult::kSingleNodeSelfElect:
108 return os << "single node self elect";
109 }
110 MONGO_UNREACHABLE;
111 }
112
113 namespace {
114 template <typename T>
indexOfIterator(const std::vector<T> & vec,typename std::vector<T>::const_iterator & it)115 int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) {
116 return static_cast<int>(it - vec.begin());
117 }
118
119 /**
120 * Returns true if the only up heartbeats are auth errors.
121 */
_hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData> & hbdata,const int selfIndex)122 bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData>& hbdata, const int selfIndex) {
123 bool foundAuthError = false;
124 for (std::vector<MemberData>::const_iterator it = hbdata.begin(); it != hbdata.end(); ++it) {
125 if (indexOfIterator(hbdata, it) == selfIndex) {
126 continue;
127 }
128
129 if (it->up()) {
130 return false;
131 }
132
133 if (it->hasAuthIssue()) {
134 foundAuthError = true;
135 }
136 }
137
138 return foundAuthError;
139 }
140
appendOpTime(BSONObjBuilder * bob,const char * elemName,const OpTime & opTime,const long long pv)141 void appendOpTime(BSONObjBuilder* bob,
142 const char* elemName,
143 const OpTime& opTime,
144 const long long pv) {
145 if (pv == 1) {
146 opTime.append(bob, elemName);
147 } else {
148 bob->append(elemName, opTime.getTimestamp());
149 }
150 }
151 } // namespace
152
start(Date_t now)153 void TopologyCoordinator::PingStats::start(Date_t now) {
154 _lastHeartbeatStartDate = now;
155 _numFailuresSinceLastStart = 0;
156 _state = HeartbeatState::TRYING;
157 }
158
hit(Milliseconds millis)159 void TopologyCoordinator::PingStats::hit(Milliseconds millis) {
160 _state = HeartbeatState::SUCCEEDED;
161 ++hitCount;
162
163 averagePingTimeMs = averagePingTimeMs == UninitializedPingTime
164 ? millis
165 : Milliseconds((averagePingTimeMs * 4 + millis) / 5);
166 }
167
miss()168 void TopologyCoordinator::PingStats::miss() {
169 ++_numFailuresSinceLastStart;
170 // Transition to 'FAILED' state if this was our last retry.
171 if (_numFailuresSinceLastStart > kMaxHeartbeatRetries) {
172 _state = PingStats::HeartbeatState::FAILED;
173 }
174 }
175
TopologyCoordinator(Options options)176 TopologyCoordinator::TopologyCoordinator(Options options)
177 : _role(Role::kFollower),
178 _term(OpTime::kUninitializedTerm),
179 _currentPrimaryIndex(-1),
180 _forceSyncSourceIndex(-1),
181 _options(std::move(options)),
182 _selfIndex(-1),
183 _maintenanceModeCalls(0),
184 _followerMode(MemberState::RS_STARTUP2) {
185 invariant(getMemberState() == MemberState::RS_STARTUP);
186 // Need an entry for self in the memberHearbeatData.
187 _memberData.emplace_back();
188 _memberData.back().setIsSelf(true);
189 }
190
getRole() const191 TopologyCoordinator::Role TopologyCoordinator::getRole() const {
192 return _role;
193 }
194
setForceSyncSourceIndex(int index)195 void TopologyCoordinator::setForceSyncSourceIndex(int index) {
196 invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
197 _forceSyncSourceIndex = index;
198 }
199
getSyncSourceAddress() const200 HostAndPort TopologyCoordinator::getSyncSourceAddress() const {
201 return _syncSource;
202 }
203
chooseNewSyncSource(Date_t now,const OpTime & lastOpTimeFetched,ChainingPreference chainingPreference)204 HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
205 const OpTime& lastOpTimeFetched,
206 ChainingPreference chainingPreference) {
207 // If we are not a member of the current replica set configuration, no sync source is valid.
208 if (_selfIndex == -1) {
209 LOG(1) << "Cannot sync from any members because we are not in the replica set config";
210 return HostAndPort();
211 }
212
213 MONGO_FAIL_POINT_BLOCK(forceSyncSourceCandidate, customArgs) {
214 const auto& data = customArgs.getData();
215 const auto hostAndPortElem = data["hostAndPort"];
216 if (!hostAndPortElem) {
217 severe() << "'forceSyncSoureCandidate' parameter set with invalid host and port: "
218 << data;
219 fassertFailed(50835);
220 }
221
222 const auto hostAndPort = HostAndPort(hostAndPortElem.checkAndGetStringData());
223 const int syncSourceIndex = _rsConfig.findMemberIndexByHostAndPort(hostAndPort);
224 if (syncSourceIndex < 0) {
225 log() << "'forceSyncSourceCandidate' failed due to host and port not in "
226 "replica set config: "
227 << hostAndPort.toString();
228 fassertFailed(50836);
229 }
230
231
232 if (_memberIsBlacklisted(_rsConfig.getMemberAt(syncSourceIndex), now)) {
233 log() << "Cannot select a sync source because forced candidate is blacklisted: "
234 << hostAndPort.toString();
235 _syncSource = HostAndPort();
236 return _syncSource;
237 }
238
239 _syncSource = _rsConfig.getMemberAt(syncSourceIndex).getHostAndPort();
240 log() << "choosing sync source candidate due to 'forceSyncSourceCandidate' parameter: "
241 << _syncSource;
242 std::string msg(str::stream() << "syncing from: " << _syncSource.toString()
243 << " by 'forceSyncSourceCandidate' parameter");
244 setMyHeartbeatMessage(now, msg);
245 return _syncSource;
246 }
247
248 // if we have a target we've requested to sync from, use it
249 if (_forceSyncSourceIndex != -1) {
250 invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
251 _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
252 _forceSyncSourceIndex = -1;
253 log() << "choosing sync source candidate by request: " << _syncSource;
254 std::string msg(str::stream() << "syncing from: " << _syncSource.toString()
255 << " by request");
256 setMyHeartbeatMessage(now, msg);
257 return _syncSource;
258 }
259
260 // wait for 2N pings (not counting ourselves) before choosing a sync target
261 int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings();
262
263 if (needMorePings > 0) {
264 OCCASIONALLY log() << "waiting for " << needMorePings
265 << " pings from other members before syncing";
266 _syncSource = HostAndPort();
267 return _syncSource;
268 }
269
270 // If we are only allowed to sync from the primary, set that
271 if (chainingPreference == ChainingPreference::kUseConfiguration &&
272 !_rsConfig.isChainingAllowed()) {
273 if (_currentPrimaryIndex == -1) {
274 LOG(1) << "Cannot select a sync source because chaining is"
275 " not allowed and primary is unknown/down";
276 _syncSource = HostAndPort();
277 return _syncSource;
278 } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) {
279 LOG(1) << "Cannot select a sync source because chaining is not allowed and primary "
280 "member is blacklisted: "
281 << _currentPrimaryMember()->getHostAndPort();
282 _syncSource = HostAndPort();
283 return _syncSource;
284 } else if (_currentPrimaryIndex == _selfIndex) {
285 LOG(1)
286 << "Cannot select a sync source because chaining is not allowed and we are primary";
287 _syncSource = HostAndPort();
288 return _syncSource;
289 } else {
290 _syncSource = _currentPrimaryMember()->getHostAndPort();
291 log() << "chaining not allowed, choosing primary as sync source candidate: "
292 << _syncSource;
293 std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString());
294 setMyHeartbeatMessage(now, msg);
295 return _syncSource;
296 }
297 }
298
299 // find the member with the lowest ping time that is ahead of me
300
301 // choose a time that will exclude no candidates by default, in case we don't see a primary
302 OpTime oldestSyncOpTime;
303
304 // Find primary's oplog time. Reject sync candidates that are more than
305 // _options.maxSyncSourceLagSecs seconds behind.
306 if (_currentPrimaryIndex != -1) {
307 OpTime primaryOpTime = _memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime();
308
309 // Check if primaryOpTime is still close to 0 because we haven't received
310 // our first heartbeat from a new primary yet.
311 unsigned int maxLag =
312 static_cast<unsigned int>(durationCount<Seconds>(_options.maxSyncSourceLagSecs));
313 if (primaryOpTime.getSecs() >= maxLag) {
314 oldestSyncOpTime =
315 OpTime(Timestamp(primaryOpTime.getSecs() - maxLag, 0), primaryOpTime.getTerm());
316 }
317 }
318
319 int closestIndex = -1;
320
321 // Make two attempts, with less restrictive rules the second time.
322 //
323 // During the first attempt, we ignore those nodes that have a larger slave
324 // delay, hidden nodes or non-voting, and nodes that are excessively behind.
325 //
326 // For the second attempt include those nodes, in case those are the only ones we can reach.
327 //
328 // This loop attempts to set 'closestIndex', to select a viable candidate.
329 for (int attempts = 0; attempts < 2; ++attempts) {
330 for (std::vector<MemberData>::const_iterator it = _memberData.begin();
331 it != _memberData.end();
332 ++it) {
333 const int itIndex = indexOfIterator(_memberData, it);
334 // Don't consider ourselves.
335 if (itIndex == _selfIndex) {
336 continue;
337 }
338
339 const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex));
340
341 // Candidate must be up to be considered.
342 if (!it->up()) {
343 LOG(2) << "Cannot select sync source because it is not up: "
344 << itMemberConfig.getHostAndPort();
345 continue;
346 }
347 // Candidate must be PRIMARY or SECONDARY state to be considered.
348 if (!it->getState().readable()) {
349 LOG(2) << "Cannot select sync source because it is not readable: "
350 << itMemberConfig.getHostAndPort();
351 continue;
352 }
353
354 // On the first attempt, we skip candidates that do not match these criteria.
355 if (attempts == 0) {
356 // Candidate must be a voter if we are a voter.
357 if (_selfConfig().isVoter() && !itMemberConfig.isVoter()) {
358 LOG(2) << "Cannot select sync source because we are a voter and it is not: "
359 << itMemberConfig.getHostAndPort();
360 continue;
361 }
362 // Candidates must not be hidden.
363 if (itMemberConfig.isHidden()) {
364 LOG(2) << "Cannot select sync source because it is hidden: "
365 << itMemberConfig.getHostAndPort();
366 continue;
367 }
368 // Candidates cannot be excessively behind.
369 if (it->getHeartbeatAppliedOpTime() < oldestSyncOpTime) {
370 LOG(2) << "Cannot select sync source because it is too far behind."
371 << "Latest optime of sync candidate " << itMemberConfig.getHostAndPort()
372 << ": " << it->getHeartbeatAppliedOpTime()
373 << ", oldest acceptable optime: " << oldestSyncOpTime;
374 continue;
375 }
376 // Candidate must not have a configured delay larger than ours.
377 if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()) {
378 LOG(2) << "Cannot select sync source with larger slaveDelay than ours: "
379 << itMemberConfig.getHostAndPort();
380 continue;
381 }
382 }
383 // Candidate must build indexes if we build indexes, to be considered.
384 if (_selfConfig().shouldBuildIndexes()) {
385 if (!itMemberConfig.shouldBuildIndexes()) {
386 LOG(2) << "Cannot select sync source with shouldBuildIndex differences: "
387 << itMemberConfig.getHostAndPort();
388 continue;
389 }
390 }
391 // only consider candidates that are ahead of where we are
392 if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) {
393 LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. "
394 << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON()
395 << ", latest oplog optime of sync candidate "
396 << itMemberConfig.getHostAndPort() << ": "
397 << it->getHeartbeatAppliedOpTime().toBSON();
398 continue;
399 }
400 // Candidate cannot be more latent than anything we've already considered.
401 if ((closestIndex != -1) &&
402 (_getPing(itMemberConfig.getHostAndPort()) >
403 _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) {
404 LOG(2) << "Cannot select sync source with higher latency than the best candidate: "
405 << itMemberConfig.getHostAndPort();
406
407 continue;
408 }
409 // Candidate cannot be blacklisted.
410 if (_memberIsBlacklisted(itMemberConfig, now)) {
411 LOG(1) << "Cannot select sync source which is blacklisted: "
412 << itMemberConfig.getHostAndPort();
413
414 continue;
415 }
416 // This candidate has passed all tests; set 'closestIndex'
417 closestIndex = itIndex;
418 }
419 if (closestIndex != -1)
420 break; // no need for second attempt
421 }
422
423 if (closestIndex == -1) {
424 // Did not find any members to sync from
425 std::string msg("could not find member to sync from");
426 // Only log when we had a valid sync source before
427 if (!_syncSource.empty()) {
428 log() << msg << rsLog;
429 }
430 setMyHeartbeatMessage(now, msg);
431
432 _syncSource = HostAndPort();
433 return _syncSource;
434 }
435 _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
436 log() << "sync source candidate: " << _syncSource;
437 std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0);
438 setMyHeartbeatMessage(now, msg);
439 return _syncSource;
440 }
441
_memberIsBlacklisted(const MemberConfig & memberConfig,Date_t now) const442 bool TopologyCoordinator::_memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const {
443 std::map<HostAndPort, Date_t>::const_iterator blacklisted =
444 _syncSourceBlacklist.find(memberConfig.getHostAndPort());
445 if (blacklisted != _syncSourceBlacklist.end()) {
446 if (blacklisted->second > now) {
447 return true;
448 }
449 }
450 return false;
451 }
452
blacklistSyncSource(const HostAndPort & host,Date_t until)453 void TopologyCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) {
454 LOG(2) << "blacklisting " << host << " until " << until.toString();
455 _syncSourceBlacklist[host] = until;
456 }
457
unblacklistSyncSource(const HostAndPort & host,Date_t now)458 void TopologyCoordinator::unblacklistSyncSource(const HostAndPort& host, Date_t now) {
459 std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host);
460 if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) {
461 LOG(2) << "unblacklisting " << host;
462 _syncSourceBlacklist.erase(hostItr);
463 }
464 }
465
clearSyncSourceBlacklist()466 void TopologyCoordinator::clearSyncSourceBlacklist() {
467 _syncSourceBlacklist.clear();
468 }
469
prepareSyncFromResponse(const HostAndPort & target,BSONObjBuilder * response,Status * result)470 void TopologyCoordinator::prepareSyncFromResponse(const HostAndPort& target,
471 BSONObjBuilder* response,
472 Status* result) {
473 response->append("syncFromRequested", target.toString());
474
475 if (_selfIndex == -1) {
476 *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync");
477 return;
478 }
479
480 const MemberConfig& selfConfig = _selfConfig();
481 if (selfConfig.isArbiter()) {
482 *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync");
483 return;
484 }
485 if (_selfIndex == _currentPrimaryIndex) {
486 *result = Status(ErrorCodes::NotSecondary, "primaries don't sync");
487 return;
488 }
489
490 ReplSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd();
491 int targetIndex = 0;
492 for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
493 ++it) {
494 if (it->getHostAndPort() == target) {
495 targetConfig = it;
496 break;
497 }
498 ++targetIndex;
499 }
500 if (targetConfig == _rsConfig.membersEnd()) {
501 *result = Status(ErrorCodes::NodeNotFound,
502 str::stream() << "Could not find member \"" << target.toString()
503 << "\" in replica set");
504 return;
505 }
506 if (targetIndex == _selfIndex) {
507 *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself");
508 return;
509 }
510 if (targetConfig->isArbiter()) {
511 *result = Status(ErrorCodes::InvalidOptions,
512 str::stream() << "Cannot sync from \"" << target.toString()
513 << "\" because it is an arbiter");
514 return;
515 }
516 if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) {
517 *result = Status(ErrorCodes::InvalidOptions,
518 str::stream() << "Cannot sync from \"" << target.toString()
519 << "\" because it does not build indexes");
520 return;
521 }
522
523 if (selfConfig.isVoter() && !targetConfig->isVoter()) {
524 *result = Status(ErrorCodes::InvalidOptions,
525 str::stream() << "Cannot sync from \"" << target.toString()
526 << "\" because it is not a voter");
527 return;
528 }
529
530 const MemberData& hbdata = _memberData.at(targetIndex);
531 if (hbdata.hasAuthIssue()) {
532 *result =
533 Status(ErrorCodes::Unauthorized,
534 str::stream() << "not authorized to communicate with " << target.toString());
535 return;
536 }
537 if (hbdata.getHealth() == 0) {
538 *result =
539 Status(ErrorCodes::HostUnreachable,
540 str::stream() << "I cannot reach the requested member: " << target.toString());
541 return;
542 }
543 const OpTime lastOpApplied = getMyLastAppliedOpTime();
544 if (hbdata.getHeartbeatAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) {
545 warning() << "attempting to sync from " << target << ", but its latest opTime is "
546 << hbdata.getHeartbeatAppliedOpTime().getSecs() << " and ours is "
547 << lastOpApplied.getSecs() << " so this may not work";
548 response->append("warning",
549 str::stream() << "requested member \"" << target.toString()
550 << "\" is more than 10 seconds behind us");
551 // not returning bad Status, just warning
552 }
553
554 HostAndPort prevSyncSource = getSyncSourceAddress();
555 if (!prevSyncSource.empty()) {
556 response->append("prevSyncTarget", prevSyncSource.toString());
557 }
558
559 setForceSyncSourceIndex(targetIndex);
560 *result = Status::OK();
561 }
562
prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs & args,const Date_t now,BSONObjBuilder * response,Status * result)563 void TopologyCoordinator::prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
564 const Date_t now,
565 BSONObjBuilder* response,
566 Status* result) {
567 if (_rsConfig.getProtocolVersion() != 0) {
568 *result = Status(ErrorCodes::BadValue,
569 str::stream() << "replset: incompatible replset protocol version: "
570 << _rsConfig.getProtocolVersion());
571 return;
572 }
573
574 if (_selfIndex == -1) {
575 *result = Status(ErrorCodes::ReplicaSetNotFound,
576 "Cannot participate in elections because not initialized");
577 return;
578 }
579
580 if (args.setName != _rsConfig.getReplSetName()) {
581 *result =
582 Status(ErrorCodes::ReplicaSetNotFound,
583 str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName()
584 << ", received: "
585 << args.setName);
586 return;
587 }
588
589 if (args.id == static_cast<unsigned>(_selfConfig().getId())) {
590 *result = Status(ErrorCodes::BadValue,
591 str::stream() << "Received replSetFresh command from member with the "
592 "same member ID as ourself: "
593 << args.id);
594 return;
595 }
596
597 bool weAreFresher = false;
598 const OpTime lastOpApplied = getMyLastAppliedOpTime();
599 if (_rsConfig.getConfigVersion() > args.cfgver) {
600 log() << "replSet member " << args.who << " is not yet aware its cfg version "
601 << args.cfgver << " is stale";
602 response->append("info", "config version stale");
603 weAreFresher = true;
604 }
605 // check not only our own optime, but any other member we can reach
606 else if (OpTime(args.opTime, _term) < _latestKnownOpTime()) {
607 weAreFresher = true;
608 }
609 response->appendDate("opTime",
610 Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL()));
611 response->append("fresher", weAreFresher);
612
613 std::string errmsg;
614 bool doVeto = _shouldVetoMember(args, now, &errmsg);
615 response->append("veto", doVeto);
616 if (doVeto) {
617 response->append("errmsg", errmsg);
618 }
619 *result = Status::OK();
620 }
621
_shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs & args,const Date_t & now,std::string * errmsg) const622 bool TopologyCoordinator::_shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args,
623 const Date_t& now,
624 std::string* errmsg) const {
625 if (_rsConfig.getConfigVersion() < args.cfgver) {
626 // We are stale; do not veto.
627 return false;
628 }
629
630 const unsigned int memberID = args.id;
631 const int hopefulIndex = _getMemberIndex(memberID);
632 invariant(hopefulIndex != _selfIndex);
633 const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
634
635 if (hopefulIndex == -1) {
636 *errmsg = str::stream() << "replSet couldn't find member with id " << memberID;
637 return true;
638 }
639 const OpTime lastOpApplied = getMyLastAppliedOpTime();
640 if (_iAmPrimary() &&
641 lastOpApplied >= _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime()) {
642 // hbinfo is not updated for ourself, so if we are primary we have to check the
643 // primary's last optime separately
644 *errmsg = str::stream() << "I am already primary, "
645 << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
646 << " can try again once I've stepped down";
647 return true;
648 }
649
650 if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) &&
651 (_memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime() >=
652 _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime())) {
653 // other members might be aware of more up-to-date nodes
654 *errmsg =
655 str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
656 << " is trying to elect itself but "
657 << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString()
658 << " is already primary and more up-to-date";
659 return true;
660 }
661
662 if ((highestPriorityIndex != -1)) {
663 const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex);
664 const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
665
666 if (priorityMember.getPriority() > hopefulMember.getPriority()) {
667 *errmsg = str::stream() << hopefulMember.getHostAndPort().toString()
668 << " has lower priority of " << hopefulMember.getPriority()
669 << " than " << priorityMember.getHostAndPort().toString()
670 << " which has a priority of " << priorityMember.getPriority();
671 return true;
672 }
673 }
674
675 UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex);
676 reason &= ~RefusesToStand;
677 if (reason) {
678 *errmsg = str::stream() << "I don't think "
679 << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
680 << " is electable because the "
681 << _getUnelectableReasonString(reason);
682 return true;
683 }
684
685 return false;
686 }
687
688 // produce a reply to a received electCmd
prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs & args,const Date_t now,BSONObjBuilder * response,Status * result)689 void TopologyCoordinator::prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
690 const Date_t now,
691 BSONObjBuilder* response,
692 Status* result) {
693 if (_rsConfig.getProtocolVersion() != 0) {
694 *result = Status(ErrorCodes::BadValue,
695 str::stream() << "replset: incompatible replset protocol version: "
696 << _rsConfig.getProtocolVersion());
697 return;
698 }
699 if (_selfIndex == -1) {
700 *result = Status(ErrorCodes::ReplicaSetNotFound,
701 "Cannot participate in election because not initialized");
702 return;
703 }
704
705 const long long myver = _rsConfig.getConfigVersion();
706 const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
707
708 const MemberConfig* primary = _currentPrimaryMember();
709 const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid);
710 const MemberConfig* highestPriority =
711 highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex);
712
713 int vote = 0;
714 if (args.set != _rsConfig.getReplSetName()) {
715 log() << "replSet error received an elect request for '" << args.set
716 << "' but our set name is '" << _rsConfig.getReplSetName() << "'";
717 } else if (myver < args.cfgver) {
718 // we are stale. don't vote
719 log() << "replSetElect not voting because our config version is stale. Our version: "
720 << myver << ", their version: " << args.cfgver;
721 } else if (myver > args.cfgver) {
722 // they are stale!
723 log() << "replSetElect command received stale config version # during election. "
724 "Our version: "
725 << myver << ", their version: " << args.cfgver;
726 vote = -10000;
727 } else if (!hopeful) {
728 log() << "replSetElect couldn't find member with id " << args.whoid;
729 vote = -10000;
730 } else if (_iAmPrimary()) {
731 log() << "I am already primary, " << hopeful->getHostAndPort().toString()
732 << " can try again once I've stepped down";
733 vote = -10000;
734 } else if (primary) {
735 log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but "
736 << primary->getHostAndPort().toString() << " is already primary";
737 vote = -10000;
738 } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) {
739 // TODO(spencer): What if the lower-priority member is more up-to-date?
740 log() << hopeful->getHostAndPort().toString() << " has lower priority than "
741 << highestPriority->getHostAndPort().toString();
742 vote = -10000;
743 } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) {
744 log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for "
745 << _voteLease.whoHostAndPort.toString() << ' '
746 << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
747 } else {
748 _voteLease.when = now;
749 _voteLease.whoId = args.whoid;
750 _voteLease.whoHostAndPort = hopeful->getHostAndPort();
751 vote = _selfConfig().getNumVotes();
752 invariant(hopeful->getId() == args.whoid);
753 if (vote > 0) {
754 log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " ("
755 << args.whoid << ')';
756 }
757 }
758
759 response->append("vote", vote);
760 response->append("round", args.round);
761 *result = Status::OK();
762 }
763
764 // produce a reply to a heartbeat
prepareHeartbeatResponse(Date_t now,const ReplSetHeartbeatArgs & args,const std::string & ourSetName,ReplSetHeartbeatResponse * response)765 Status TopologyCoordinator::prepareHeartbeatResponse(Date_t now,
766 const ReplSetHeartbeatArgs& args,
767 const std::string& ourSetName,
768 ReplSetHeartbeatResponse* response) {
769 if (args.getProtocolVersion() != 1) {
770 return Status(ErrorCodes::BadValue,
771 str::stream() << "replset: incompatible replset protocol version: "
772 << args.getProtocolVersion());
773 }
774
775 // Verify that replica set names match
776 const std::string rshb = args.getSetName();
777 if (ourSetName != rshb) {
778 log() << "replSet set names do not match, ours: " << ourSetName
779 << "; remote node's: " << rshb;
780 response->noteMismatched();
781 return Status(ErrorCodes::InconsistentReplicaSetNames,
782 str::stream() << "Our set name of " << ourSetName << " does not match name "
783 << rshb
784 << " reported by remote node");
785 }
786
787 const MemberState myState = getMemberState();
788 if (_selfIndex == -1) {
789 if (myState.removed()) {
790 return Status(ErrorCodes::InvalidReplicaSetConfig,
791 "Our replica set configuration is invalid or does not include us");
792 }
793 } else {
794 invariant(_rsConfig.getReplSetName() == args.getSetName());
795 if (args.getSenderId() == _selfConfig().getId()) {
796 return Status(ErrorCodes::BadValue,
797 str::stream() << "Received heartbeat from member with the same "
798 "member ID as ourself: "
799 << args.getSenderId());
800 }
801 }
802
803 // This is a replica set
804 response->noteReplSet();
805
806 response->setSetName(ourSetName);
807 response->setState(myState.s);
808 if (myState.primary()) {
809 response->setElectionTime(_electionTime);
810 }
811
812 const OpTime lastOpApplied = getMyLastAppliedOpTime();
813 const OpTime lastOpDurable = getMyLastDurableOpTime();
814
815 // Are we electable
816 response->setElectable(!_getMyUnelectableReason(now, StartElectionReason::kElectionTimeout));
817
818 // Heartbeat status message
819 response->setHbMsg(_getHbmsg(now));
820 response->setTime(duration_cast<Seconds>(now - Date_t{}));
821 response->setAppliedOpTime(lastOpApplied);
822 response->setDurableOpTime(lastOpDurable);
823
824 if (!_syncSource.empty()) {
825 response->setSyncingTo(_syncSource);
826 }
827
828 if (!_rsConfig.isInitialized()) {
829 response->setConfigVersion(-2);
830 return Status::OK();
831 }
832
833 const long long v = _rsConfig.getConfigVersion();
834 response->setConfigVersion(v);
835 // Deliver new config if caller's version is older than ours
836 if (v > args.getConfigVersion()) {
837 response->setConfig(_rsConfig);
838 }
839
840 // Resolve the caller's id in our Member list
841 int from = -1;
842 if (v == args.getConfigVersion() && args.getSenderId() != -1) {
843 from = _getMemberIndex(args.getSenderId());
844 }
845 if (from == -1) {
846 // Can't find the member, so we leave out the stateDisagreement field
847 return Status::OK();
848 }
849 invariant(from != _selfIndex);
850
851 // if we thought that this node is down, let it know
852 if (!_memberData.at(from).up()) {
853 response->noteStateDisagreement();
854 }
855
856 // note that we got a heartbeat from this node
857 _memberData.at(from).setLastHeartbeatRecv(now);
858 return Status::OK();
859 }
860
prepareHeartbeatResponseV1(Date_t now,const ReplSetHeartbeatArgsV1 & args,const std::string & ourSetName,ReplSetHeartbeatResponse * response)861 Status TopologyCoordinator::prepareHeartbeatResponseV1(Date_t now,
862 const ReplSetHeartbeatArgsV1& args,
863 const std::string& ourSetName,
864 ReplSetHeartbeatResponse* response) {
865 // Verify that replica set names match
866 const std::string rshb = args.getSetName();
867 if (ourSetName != rshb) {
868 log() << "replSet set names do not match, ours: " << ourSetName
869 << "; remote node's: " << rshb;
870 return Status(ErrorCodes::InconsistentReplicaSetNames,
871 str::stream() << "Our set name of " << ourSetName << " does not match name "
872 << rshb
873 << " reported by remote node");
874 }
875
876 const MemberState myState = getMemberState();
877 if (_selfIndex == -1) {
878 if (myState.removed()) {
879 return Status(ErrorCodes::InvalidReplicaSetConfig,
880 "Our replica set configuration is invalid or does not include us");
881 }
882 } else {
883 if (args.getSenderId() == _selfConfig().getId()) {
884 return Status(ErrorCodes::BadValue,
885 str::stream() << "Received heartbeat from member with the same "
886 "member ID as ourself: "
887 << args.getSenderId());
888 }
889 }
890
891 response->setSetName(ourSetName);
892
893 response->setState(myState.s);
894
895 if (myState.primary()) {
896 response->setElectionTime(_electionTime);
897 }
898
899 const OpTime lastOpApplied = getMyLastAppliedOpTime();
900 const OpTime lastOpDurable = getMyLastDurableOpTime();
901 response->setAppliedOpTime(lastOpApplied);
902 response->setDurableOpTime(lastOpDurable);
903
904 if (_currentPrimaryIndex != -1) {
905 response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId());
906 }
907
908 response->setTerm(_term);
909
910 if (!_syncSource.empty()) {
911 response->setSyncingTo(_syncSource);
912 }
913
914 if (!_rsConfig.isInitialized()) {
915 response->setConfigVersion(-2);
916 return Status::OK();
917 }
918
919 const long long v = _rsConfig.getConfigVersion();
920 response->setConfigVersion(v);
921 // Deliver new config if caller's version is older than ours
922 if (v > args.getConfigVersion()) {
923 response->setConfig(_rsConfig);
924 }
925
926 // Resolve the caller's id in our Member list
927 int from = -1;
928 if (v == args.getConfigVersion() && args.getSenderId() != -1) {
929 from = _getMemberIndex(args.getSenderId());
930 }
931 if (from == -1) {
932 return Status::OK();
933 }
934 invariant(from != _selfIndex);
935
936 // note that we got a heartbeat from this node
937 _memberData.at(from).setLastHeartbeatRecv(now);
938 return Status::OK();
939 }
940
_getMemberIndex(int id) const941 int TopologyCoordinator::_getMemberIndex(int id) const {
942 int index = 0;
943 for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
944 ++it, ++index) {
945 if (it->getId() == id) {
946 return index;
947 }
948 }
949 return -1;
950 }
951
prepareHeartbeatRequest(Date_t now,const std::string & ourSetName,const HostAndPort & target)952 std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinator::prepareHeartbeatRequest(
953 Date_t now, const std::string& ourSetName, const HostAndPort& target) {
954 PingStats& hbStats = _pings[target];
955 Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
956 if (!_rsConfig.isInitialized() || !hbStats.trying() ||
957 (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
958 // This is either the first request ever for "target", or the heartbeat timeout has
959 // passed, so we're starting a "new" heartbeat.
960 hbStats.start(now);
961 alreadyElapsed = Milliseconds(0);
962 }
963 ReplSetHeartbeatArgs hbArgs;
964 hbArgs.setProtocolVersion(1);
965 hbArgs.setCheckEmpty(false);
966 if (_rsConfig.isInitialized()) {
967 hbArgs.setSetName(_rsConfig.getReplSetName());
968 hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
969 if (_selfIndex >= 0) {
970 const MemberConfig& me = _selfConfig();
971 hbArgs.setSenderHost(me.getHostAndPort());
972 hbArgs.setSenderId(me.getId());
973 }
974 } else {
975 hbArgs.setSetName(ourSetName);
976 hbArgs.setConfigVersion(-2);
977 }
978 if (serverGlobalParams.featureCompatibility.getVersion() !=
979 ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) {
980 hbArgs.setHeartbeatVersion(1);
981 }
982
983 const Milliseconds timeoutPeriod(
984 _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
985 : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod});
986 const Milliseconds timeout = timeoutPeriod - alreadyElapsed;
987 return std::make_pair(hbArgs, timeout);
988 }
989
prepareHeartbeatRequestV1(Date_t now,const std::string & ourSetName,const HostAndPort & target)990 std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinator::prepareHeartbeatRequestV1(
991 Date_t now, const std::string& ourSetName, const HostAndPort& target) {
992 PingStats& hbStats = _pings[target];
993 Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64());
994 if ((!_rsConfig.isInitialized()) || !hbStats.trying() ||
995 (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
996 // This is either the first request ever for "target", or the heartbeat timeout has
997 // passed, so we're starting a "new" heartbeat.
998 hbStats.start(now);
999 alreadyElapsed = Milliseconds(0);
1000 }
1001 ReplSetHeartbeatArgsV1 hbArgs;
1002 if (_rsConfig.isInitialized()) {
1003 hbArgs.setSetName(_rsConfig.getReplSetName());
1004 hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
1005 if (_selfIndex >= 0) {
1006 const MemberConfig& me = _selfConfig();
1007 hbArgs.setSenderId(me.getId());
1008 hbArgs.setSenderHost(me.getHostAndPort());
1009 }
1010 hbArgs.setTerm(_term);
1011 } else {
1012 hbArgs.setSetName(ourSetName);
1013 // Config version -2 is for uninitialized config.
1014 hbArgs.setConfigVersion(-2);
1015 hbArgs.setTerm(OpTime::kInitialTerm);
1016 }
1017 if (serverGlobalParams.featureCompatibility.getVersion() !=
1018 ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) {
1019 hbArgs.setHeartbeatVersion(1);
1020 }
1021
1022 const Milliseconds timeoutPeriod(
1023 _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
1024 : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod});
1025 const Milliseconds timeout(timeoutPeriod - alreadyElapsed);
1026 return std::make_pair(hbArgs, timeout);
1027 }
1028
processHeartbeatResponse(Date_t now,Milliseconds networkRoundTripTime,const HostAndPort & target,const StatusWith<ReplSetHeartbeatResponse> & hbResponse)1029 HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
1030 Date_t now,
1031 Milliseconds networkRoundTripTime,
1032 const HostAndPort& target,
1033 const StatusWith<ReplSetHeartbeatResponse>& hbResponse) {
1034 const MemberState originalState = getMemberState();
1035 PingStats& hbStats = _pings[target];
1036 invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
1037 const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) ||
1038 (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed);
1039 if (!hbResponse.isOK()) {
1040 if (isUnauthorized) {
1041 hbStats.hit(networkRoundTripTime);
1042 } else {
1043 hbStats.miss();
1044 }
1045 } else {
1046 hbStats.hit(networkRoundTripTime);
1047 // Log diagnostics.
1048 if (hbResponse.getValue().isStateDisagreement()) {
1049 LOG(1) << target << " thinks that we are down because they cannot send us heartbeats.";
1050 }
1051 }
1052
1053 // If a node is not PRIMARY and has no sync source, we increase the heartbeat rate in order
1054 // to help it find a sync source more quickly, which helps ensure the PRIMARY will continue to
1055 // see the majority of the cluster.
1056 //
1057 // Arbiters also decrease their heartbeat interval to at most half the election timeout period.
1058 Milliseconds heartbeatInterval = _rsConfig.getHeartbeatInterval();
1059 if (_rsConfig.getProtocolVersion() == 1) {
1060 if (getMemberState().arbiter()) {
1061 heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2,
1062 _rsConfig.getHeartbeatInterval());
1063 } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) {
1064 heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2,
1065 _rsConfig.getHeartbeatInterval() / 4);
1066 }
1067 }
1068
1069 const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
1070 Date_t nextHeartbeatStartDate;
1071 // Determine the next heartbeat start time. If a heartbeat has not succeeded or failed, and we
1072 // have not used up the timeout period, we should retry.
1073 if (hbStats.trying() && (alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriod())) {
1074 // There are still retries left, let's use one.
1075 nextHeartbeatStartDate = now;
1076 } else {
1077 nextHeartbeatStartDate = now + heartbeatInterval;
1078 }
1079
1080 if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) {
1081 const long long currentConfigVersion =
1082 _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2;
1083 const ReplSetConfig& newConfig = hbResponse.getValue().getConfig();
1084 if (newConfig.getConfigVersion() > currentConfigVersion) {
1085 HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction();
1086 nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1087 return nextAction;
1088 } else {
1089 // Could be we got the newer version before we got the response, or the
1090 // target erroneously sent us one, even through it isn't newer.
1091 if (newConfig.getConfigVersion() < currentConfigVersion) {
1092 LOG(1) << "Config version from heartbeat was older than ours.";
1093 } else {
1094 LOG(2) << "Config from heartbeat response was same as ours.";
1095 }
1096 if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component,
1097 ::mongo::LogstreamBuilder::severityCast(2))) {
1098 LogstreamBuilder lsb = log();
1099 if (_rsConfig.isInitialized()) {
1100 lsb << "Current config: " << _rsConfig.toBSON() << "; ";
1101 }
1102 lsb << "Config in heartbeat: " << newConfig.toBSON();
1103 }
1104 }
1105 }
1106
1107 // Check if the heartbeat target is in our config. If it isn't, there's nothing left to do,
1108 // so return early.
1109 if (!_rsConfig.isInitialized()) {
1110 HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
1111 nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1112 return nextAction;
1113 }
1114 // If we're not in the config, we don't need to respond to heartbeats.
1115 if (_selfIndex == -1) {
1116 LOG(1) << "Could not find ourself in current config so ignoring heartbeat from " << target
1117 << " -- current config: " << _rsConfig.toBSON();
1118 HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
1119 nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1120 return nextAction;
1121 }
1122 const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target);
1123 if (memberIndex == -1) {
1124 LOG(1) << "Could not find " << target << " in current config so ignoring --"
1125 " current config: "
1126 << _rsConfig.toBSON();
1127 HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
1128 nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1129 return nextAction;
1130 }
1131
1132 invariant(memberIndex != _selfIndex);
1133
1134 MemberData& hbData = _memberData.at(memberIndex);
1135 const MemberConfig member = _rsConfig.getMemberAt(memberIndex);
1136 bool advancedOpTime = false;
1137 if (!hbResponse.isOK()) {
1138 if (isUnauthorized) {
1139 hbData.setAuthIssue(now);
1140 }
1141 // If the heartbeat has failed i.e. used up all retries, then we mark the target node as
1142 // down.
1143 else if (hbStats.failed() || (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriod())) {
1144 hbData.setDownValues(now, hbResponse.getStatus().reason());
1145 } else {
1146 LOG(3) << "Bad heartbeat response from " << target
1147 << "; trying again; Retries left: " << (hbStats.retriesLeft()) << "; "
1148 << alreadyElapsed << " have already elapsed";
1149 }
1150 } else {
1151 ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue());
1152 LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId()
1153 << ", msg: " << hbr.getHbMsg();
1154 advancedOpTime = hbData.setUpValues(now, std::move(hbr));
1155 }
1156
1157 HeartbeatResponseAction nextAction;
1158 if (_rsConfig.getProtocolVersion() == 0) {
1159 nextAction = _updatePrimaryFromHBData(memberIndex, originalState, now);
1160 } else {
1161 nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now);
1162 }
1163
1164 nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1165 nextAction.setAdvancedOpTime(advancedOpTime);
1166 return nextAction;
1167 }
1168
haveNumNodesReachedOpTime(const OpTime & targetOpTime,int numNodes,bool durablyWritten)1169 bool TopologyCoordinator::haveNumNodesReachedOpTime(const OpTime& targetOpTime,
1170 int numNodes,
1171 bool durablyWritten) {
1172 // Replication progress that is for some reason ahead of us should not allow us to
1173 // satisfy a write concern if we aren't caught up ourselves.
1174 OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
1175 if (myOpTime < targetOpTime) {
1176 return false;
1177 }
1178
1179 for (auto&& memberData : _memberData) {
1180 // The index in the config is -1 for master-slave nodes.
1181 const auto configIndex = memberData.getConfigIndex();
1182 if (configIndex >= 0) {
1183 // We do not count arbiters towards the write concern.
1184 if (_rsConfig.getMemberAt(configIndex).isArbiter()) {
1185 continue;
1186 }
1187 }
1188
1189 const OpTime& memberOpTime =
1190 durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime();
1191
1192 if (memberOpTime >= targetOpTime) {
1193 --numNodes;
1194 }
1195
1196 if (numNodes <= 0) {
1197 return true;
1198 }
1199 }
1200 return false;
1201 }
1202
haveTaggedNodesReachedOpTime(const OpTime & opTime,const ReplSetTagPattern & tagPattern,bool durablyWritten)1203 bool TopologyCoordinator::haveTaggedNodesReachedOpTime(const OpTime& opTime,
1204 const ReplSetTagPattern& tagPattern,
1205 bool durablyWritten) {
1206 ReplSetTagMatch matcher(tagPattern);
1207 for (auto&& memberData : _memberData) {
1208 const OpTime& memberOpTime =
1209 durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime();
1210 if (memberOpTime >= opTime) {
1211 // This node has reached the desired optime, now we need to check if it is a part
1212 // of the tagPattern.
1213 int memberIndex = memberData.getConfigIndex();
1214 invariant(memberIndex >= 0);
1215 const MemberConfig& memberConfig = _rsConfig.getMemberAt(memberIndex);
1216 for (MemberConfig::TagIterator it = memberConfig.tagsBegin();
1217 it != memberConfig.tagsEnd();
1218 ++it) {
1219 if (matcher.update(*it)) {
1220 return true;
1221 }
1222 }
1223 }
1224 }
1225 return false;
1226 }
1227
checkMemberTimeouts(Date_t now)1228 HeartbeatResponseAction TopologyCoordinator::checkMemberTimeouts(Date_t now) {
1229 bool stepdown = false;
1230 for (int memberIndex = 0; memberIndex < static_cast<int>(_memberData.size()); memberIndex++) {
1231 auto& memberData = _memberData[memberIndex];
1232 if (!memberData.isSelf() && !memberData.lastUpdateStale() &&
1233 now - memberData.getLastUpdate() >= _rsConfig.getElectionTimeoutPeriod()) {
1234 memberData.markLastUpdateStale();
1235 if (_iAmPrimary()) {
1236 stepdown = stepdown || setMemberAsDown(now, memberIndex);
1237 }
1238 }
1239 }
1240 if (stepdown) {
1241 log() << "can't see a majority of the set, relinquishing primary";
1242 return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
1243 }
1244 return HeartbeatResponseAction::makeNoAction();
1245 }
1246
getHostsWrittenTo(const OpTime & op,bool durablyWritten,bool skipSelf)1247 std::vector<HostAndPort> TopologyCoordinator::getHostsWrittenTo(const OpTime& op,
1248 bool durablyWritten,
1249 bool skipSelf) {
1250 std::vector<HostAndPort> hosts;
1251 for (const auto& memberData : _memberData) {
1252 if (skipSelf && memberData.isSelf()) {
1253 continue;
1254 }
1255
1256 if (durablyWritten) {
1257 if (memberData.getLastDurableOpTime() < op) {
1258 continue;
1259 }
1260 } else if (memberData.getLastAppliedOpTime() < op) {
1261 continue;
1262 }
1263
1264 hosts.push_back(memberData.getHostAndPort());
1265 }
1266 return hosts;
1267 }
1268
setMemberAsDown(Date_t now,const int memberIndex)1269 bool TopologyCoordinator::setMemberAsDown(Date_t now, const int memberIndex) {
1270 invariant(memberIndex != _selfIndex);
1271 invariant(memberIndex != -1);
1272 invariant(_currentPrimaryIndex == _selfIndex);
1273 MemberData& hbData = _memberData.at(memberIndex);
1274 hbData.setDownValues(now, "no response within election timeout period");
1275
1276 if (CannotSeeMajority & _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) {
1277 return true;
1278 }
1279
1280 return false;
1281 }
1282
getStalestLiveMember() const1283 std::pair<int, Date_t> TopologyCoordinator::getStalestLiveMember() const {
1284 Date_t earliestDate = Date_t::max();
1285 int earliestMemberId = -1;
1286 for (const auto& memberData : _memberData) {
1287 if (memberData.isSelf()) {
1288 continue;
1289 }
1290 if (memberData.lastUpdateStale()) {
1291 // Already stale.
1292 continue;
1293 }
1294 LOG(3) << "memberData lastupdate is: " << memberData.getLastUpdate();
1295 if (earliestDate > memberData.getLastUpdate()) {
1296 earliestDate = memberData.getLastUpdate();
1297 earliestMemberId = memberData.getMemberId();
1298 }
1299 }
1300 LOG(3) << "stalest member " << earliestMemberId << " date: " << earliestDate;
1301 return std::make_pair(earliestMemberId, earliestDate);
1302 }
1303
resetAllMemberTimeouts(Date_t now)1304 void TopologyCoordinator::resetAllMemberTimeouts(Date_t now) {
1305 for (auto&& memberData : _memberData)
1306 memberData.updateLiveness(now);
1307 }
1308
resetMemberTimeouts(Date_t now,const stdx::unordered_set<HostAndPort> & member_set)1309 void TopologyCoordinator::resetMemberTimeouts(Date_t now,
1310 const stdx::unordered_set<HostAndPort>& member_set) {
1311 for (auto&& memberData : _memberData) {
1312 if (member_set.count(memberData.getHostAndPort()))
1313 memberData.updateLiveness(now);
1314 }
1315 }
1316
getMyLastAppliedOpTime() const1317 OpTime TopologyCoordinator::getMyLastAppliedOpTime() const {
1318 return _selfMemberData().getLastAppliedOpTime();
1319 }
1320
getMyLastDurableOpTime() const1321 OpTime TopologyCoordinator::getMyLastDurableOpTime() const {
1322 return _selfMemberData().getLastDurableOpTime();
1323 }
1324
getMyMemberData()1325 MemberData* TopologyCoordinator::getMyMemberData() {
1326 return &_memberData[_selfMemberDataIndex()];
1327 }
1328
findMemberDataByMemberId(const int memberId)1329 MemberData* TopologyCoordinator::findMemberDataByMemberId(const int memberId) {
1330 const int memberIndex = _getMemberIndex(memberId);
1331 if (memberIndex >= 0)
1332 return &_memberData[memberIndex];
1333 return nullptr;
1334 }
1335
findMemberDataByRid(const OID rid)1336 MemberData* TopologyCoordinator::findMemberDataByRid(const OID rid) {
1337 for (auto& memberData : _memberData) {
1338 if (memberData.getRid() == rid)
1339 return &memberData;
1340 }
1341 return nullptr;
1342 }
1343
addSlaveMemberData(const OID rid)1344 MemberData* TopologyCoordinator::addSlaveMemberData(const OID rid) {
1345 invariant(!_memberData.empty()); // Must always have our own entry first.
1346 invariant(!_rsConfig.isInitialized()); // Used only for master-slave.
1347 _memberData.emplace_back();
1348 auto* result = &_memberData.back();
1349 result->setRid(rid);
1350 return result;
1351 }
1352
_updatePrimaryFromHBDataV1(int updatedConfigIndex,const MemberState & originalState,Date_t now)1353 HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1(
1354 int updatedConfigIndex, const MemberState& originalState, Date_t now) {
1355 //
1356 // Updates the local notion of which remote node, if any is primary.
1357 // Start the priority takeover process if we are eligible.
1358 //
1359
1360 invariant(updatedConfigIndex != _selfIndex);
1361
1362 // If we are missing from the config, do not participate in primary maintenance or election.
1363 if (_selfIndex == -1) {
1364 return HeartbeatResponseAction::makeNoAction();
1365 }
1366 // If we are the primary, there must be no other primary, otherwise its higher term would
1367 // have already made us step down.
1368 if (_currentPrimaryIndex == _selfIndex) {
1369 return HeartbeatResponseAction::makeNoAction();
1370 }
1371
1372 // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex.
1373 int primaryIndex = -1;
1374 for (size_t i = 0; i < _memberData.size(); i++) {
1375 const MemberData& member = _memberData.at(i);
1376 if (member.getState().primary() && member.up()) {
1377 if (primaryIndex == -1 || _memberData.at(primaryIndex).getTerm() < member.getTerm()) {
1378 primaryIndex = i;
1379 }
1380 }
1381 }
1382 _currentPrimaryIndex = primaryIndex;
1383 if (_currentPrimaryIndex == -1) {
1384 return HeartbeatResponseAction::makeNoAction();
1385 }
1386
1387 // Clear last heartbeat message on ourselves.
1388 setMyHeartbeatMessage(now, "");
1389
1390 // Takeover when the replset is stable.
1391 //
1392 // Take over the primary only if the remote primary is in the latest term I know.
1393 // This is done only when we get a heartbeat response from the primary.
1394 // Otherwise, there must be an outstanding election, which may succeed or not, but
1395 // the remote primary will become aware of that election eventually and step down.
1396 if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) {
1397
1398 // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled.
1399 bool catchupTakeoverDisabled =
1400 ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() ||
1401 ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay();
1402
1403 bool scheduleCatchupTakeover = false;
1404 bool schedulePriorityTakeover = false;
1405
1406 if (!catchupTakeoverDisabled && (_memberData.at(primaryIndex).getLastAppliedOpTime() <
1407 _memberData.at(_selfIndex).getLastAppliedOpTime())) {
1408 LOG(2) << "I can take over the primary due to fresher data."
1409 << " Current primary index: " << primaryIndex << " in term "
1410 << _memberData.at(primaryIndex).getTerm() << "."
1411 << " Current primary optime: "
1412 << _memberData.at(primaryIndex).getLastAppliedOpTime()
1413 << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime();
1414
1415 scheduleCatchupTakeover = true;
1416 }
1417
1418 if (_rsConfig.getMemberAt(primaryIndex).getPriority() <
1419 _rsConfig.getMemberAt(_selfIndex).getPriority()) {
1420 LOG(2) << "I can take over the primary due to higher priority."
1421 << " Current primary index: " << primaryIndex << " in term "
1422 << _memberData.at(primaryIndex).getTerm();
1423
1424 schedulePriorityTakeover = true;
1425 }
1426
1427 // Calculate rank of current node. A rank of 0 indicates that it has the highest priority.
1428 auto currentNodePriority = _rsConfig.getMemberAt(_selfIndex).getPriority();
1429
1430 // Schedule a priority takeover early only if we know that the current node has the highest
1431 // priority in the replica set, has a higher priority than the primary, and is the most
1432 // up to date node.
1433 // Otherwise, prefer to schedule a catchup takeover over a priority takeover
1434 if (scheduleCatchupTakeover && schedulePriorityTakeover &&
1435 _rsConfig.calculatePriorityRank(currentNodePriority) == 0) {
1436 LOG(2) << "I can take over the primary because I have a higher priority, the highest "
1437 << "priority in the replica set, and fresher data."
1438 << " Current primary index: " << primaryIndex << " in term "
1439 << _memberData.at(primaryIndex).getTerm();
1440 return HeartbeatResponseAction::makePriorityTakeoverAction();
1441 }
1442 if (scheduleCatchupTakeover) {
1443 return HeartbeatResponseAction::makeCatchupTakeoverAction();
1444 }
1445 if (schedulePriorityTakeover) {
1446 return HeartbeatResponseAction::makePriorityTakeoverAction();
1447 }
1448 }
1449 return HeartbeatResponseAction::makeNoAction();
1450 }
1451
_updatePrimaryFromHBData(int updatedConfigIndex,const MemberState & originalState,Date_t now)1452 HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBData(
1453 int updatedConfigIndex, const MemberState& originalState, Date_t now) {
1454 // This method has two interrelated responsibilities, performed in two phases.
1455 //
1456 // First, it updates the local notion of which remote node, if any is primary. In the
1457 // process, it may request a remote primary to step down because there is a higher priority
1458 // node waiting, or because the local node thinks it is primary and that it has a more
1459 // recent electionTime. It may instead decide that the local node should step down itself,
1460 // because a remote has a more recent election time.
1461 //
1462 // Second, if there is no remote primary, and the local node is not primary, it considers
1463 // whether or not to stand for election.
1464 invariant(updatedConfigIndex != _selfIndex);
1465
1466 // We are missing from the config, so do not participate in primary maintenance or election.
1467 if (_selfIndex == -1) {
1468 return HeartbeatResponseAction::makeNoAction();
1469 }
1470
1471 ////////////////////
1472 // Phase 1
1473 ////////////////////
1474
1475 // If we believe the node whose data was just updated is primary, confirm that
1476 // the updated data supports that notion. If not, erase our notion of who is primary.
1477 if (updatedConfigIndex == _currentPrimaryIndex) {
1478 const MemberData& updatedHBData = _memberData.at(updatedConfigIndex);
1479 if (!updatedHBData.up() || !updatedHBData.getState().primary()) {
1480 _currentPrimaryIndex = -1;
1481 }
1482 }
1483
1484 // If the current primary is not highest priority and up to date (within 10s),
1485 // have them/me stepdown.
1486 if (_currentPrimaryIndex != -1) {
1487 // check if we should ask the primary (possibly ourselves) to step down
1488 const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
1489 if (highestPriorityIndex != -1) {
1490 const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex);
1491 const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
1492 const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex
1493 ? getMyLastAppliedOpTime()
1494 : _memberData.at(highestPriorityIndex).getHeartbeatAppliedOpTime();
1495
1496 if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) &&
1497 _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime)) {
1498 const OpTime latestOpTime = _latestKnownOpTime();
1499
1500 if (_iAmPrimary()) {
1501 if (_leaderMode == LeaderMode::kSteppingDown) {
1502 return HeartbeatResponseAction::makeNoAction();
1503 }
1504 log() << "Stepping down self (priority " << currentPrimaryMember.getPriority()
1505 << ") because " << highestPriorityMember.getHostAndPort()
1506 << " has higher priority " << highestPriorityMember.getPriority()
1507 << " and is only "
1508 << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
1509 << " seconds behind me";
1510 const Date_t until =
1511 now + VoteLease::leaseTime + _rsConfig.getHeartbeatInterval();
1512 if (_electionSleepUntil < until) {
1513 _electionSleepUntil = until;
1514 }
1515 return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
1516 } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) {
1517 // If this node is the highest priority node, and it is not in
1518 // an inter-election sleep period, ask the current primary to step down.
1519 // This is an optimization, because the remote primary will almost certainly
1520 // notice this node's electability promptly, via its own heartbeat process.
1521 log() << "Requesting that " << currentPrimaryMember.getHostAndPort()
1522 << " (priority " << currentPrimaryMember.getPriority()
1523 << ") step down because I have higher priority "
1524 << highestPriorityMember.getPriority() << " and am only "
1525 << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
1526 << " seconds behind it";
1527 int primaryIndex = _currentPrimaryIndex;
1528 _currentPrimaryIndex = -1;
1529 return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex);
1530 }
1531 }
1532 }
1533 }
1534
1535 // Scan the member list's heartbeat data for who is primary, and update
1536 // _currentPrimaryIndex and _role, or request a remote to step down, as necessary.
1537 {
1538 int remotePrimaryIndex = -1;
1539 for (std::vector<MemberData>::const_iterator it = _memberData.begin();
1540 it != _memberData.end();
1541 ++it) {
1542 const int itIndex = indexOfIterator(_memberData, it);
1543 if (itIndex == _selfIndex) {
1544 continue;
1545 }
1546
1547 if (it->getState().primary() && it->up()) {
1548 if (remotePrimaryIndex != -1) {
1549 // two other nodes think they are primary (asynchronously polled)
1550 // -- wait for things to settle down.
1551 warning() << "two remote primaries (transiently)";
1552 return HeartbeatResponseAction::makeNoAction();
1553 }
1554 remotePrimaryIndex = itIndex;
1555 }
1556 }
1557
1558 if (remotePrimaryIndex != -1) {
1559 // If it's the same as last time, don't do anything further.
1560 if (_currentPrimaryIndex == remotePrimaryIndex) {
1561 return HeartbeatResponseAction::makeNoAction();
1562 }
1563 // Clear last heartbeat message on ourselves (why?)
1564 setMyHeartbeatMessage(now, "");
1565
1566 // If we are also primary, this is a problem. Determine who should step down.
1567 if (_iAmPrimary()) {
1568 Timestamp remoteElectionTime = _memberData.at(remotePrimaryIndex).getElectionTime();
1569 log() << "another primary seen with election time " << remoteElectionTime
1570 << " my election time is " << _electionTime;
1571
1572 // Step down whomever has the older election time.
1573 if (remoteElectionTime > _electionTime) {
1574 if (_leaderMode == LeaderMode::kSteppingDown) {
1575 return HeartbeatResponseAction::makeNoAction();
1576 }
1577 log() << "stepping down; another primary was elected more recently";
1578 return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
1579 } else {
1580 log() << "another PRIMARY detected and it should step down"
1581 " since it was elected earlier than me";
1582 return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex);
1583 }
1584 }
1585
1586 _currentPrimaryIndex = remotePrimaryIndex;
1587 return HeartbeatResponseAction::makeNoAction();
1588 }
1589 }
1590
1591 ////////////////////
1592 // Phase 2
1593 ////////////////////
1594
1595 // We do not believe any remote to be primary.
1596
1597 // If we are primary, check if we can still see majority of the set;
1598 // stepdown if we can't.
1599 if (_iAmPrimary()) {
1600 if (CannotSeeMajority &
1601 _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) {
1602 if (_leaderMode == LeaderMode::kSteppingDown) {
1603 return HeartbeatResponseAction::makeNoAction();
1604 }
1605 log() << "can't see a majority of the set, relinquishing primary";
1606 return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
1607 }
1608
1609 LOG(2) << "Choosing to remain primary";
1610 return HeartbeatResponseAction::makeNoAction();
1611 }
1612
1613 fassert(18505, _currentPrimaryIndex == -1);
1614
1615 const MemberState currentState = getMemberState();
1616 if (originalState.recovering() && currentState.secondary()) {
1617 // We just transitioned from RECOVERING to SECONDARY, this can only happen if we
1618 // received a heartbeat with an auth error when previously all the heartbeats we'd
1619 // received had auth errors. In this case, don't return makeElectAction() because
1620 // that could cause the election to start before the ReplicationCoordinator has updated
1621 // its notion of the member state to SECONDARY. Instead return noAction so that the
1622 // ReplicationCooridinator knows to update its tracking of the member state off of the
1623 // TopologyCoordinator, and leave starting the election until the next heartbeat comes
1624 // back.
1625 return HeartbeatResponseAction::makeNoAction();
1626 }
1627
1628 // At this point, there is no primary anywhere. Check to see if we should become a candidate.
1629 const auto status = checkShouldStandForElection(now);
1630 if (!status.isOK()) {
1631 // NOTE: This log line is checked in unit test(s).
1632 LOG(2) << "TopologyCoordinator::_updatePrimaryFromHBData - " << status.reason();
1633 return HeartbeatResponseAction::makeNoAction();
1634 }
1635 fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout));
1636 return HeartbeatResponseAction::makeElectAction();
1637 }
1638
checkShouldStandForElection(Date_t now) const1639 Status TopologyCoordinator::checkShouldStandForElection(Date_t now) const {
1640 if (_currentPrimaryIndex != -1) {
1641 return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"};
1642 }
1643 invariant(_role != Role::kLeader);
1644
1645 if (_role == Role::kCandidate) {
1646 return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"};
1647 }
1648
1649 const UnelectableReasonMask unelectableReason =
1650 _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout);
1651 if (NotCloseEnoughToLatestOptime & unelectableReason) {
1652 return {ErrorCodes::NodeNotElectable,
1653 str::stream() << "Not standing for election because "
1654 << _getUnelectableReasonString(unelectableReason)
1655 << "; my last optime is "
1656 << getMyLastAppliedOpTime().toString()
1657 << " and the newest is "
1658 << _latestKnownOpTime().toString()};
1659 }
1660 if (unelectableReason) {
1661 return {ErrorCodes::NodeNotElectable,
1662 str::stream() << "Not standing for election because "
1663 << _getUnelectableReasonString(unelectableReason)};
1664 }
1665 if (_electionSleepUntil > now) {
1666 if (_rsConfig.getProtocolVersion() == 1) {
1667 return {
1668 ErrorCodes::NodeNotElectable,
1669 str::stream() << "Not standing for election before "
1670 << dateToISOStringLocal(_electionSleepUntil)
1671 << " because I stood up or learned about a new term too recently"};
1672 } else {
1673 return {ErrorCodes::NodeNotElectable,
1674 str::stream() << "Not standing for election before "
1675 << dateToISOStringLocal(_electionSleepUntil)
1676 << " because I stood too recently"};
1677 }
1678 }
1679 // All checks passed. Start election proceedings.
1680 return Status::OK();
1681 }
1682
_aMajoritySeemsToBeUp() const1683 bool TopologyCoordinator::_aMajoritySeemsToBeUp() const {
1684 int vUp = 0;
1685 for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
1686 ++it) {
1687 const int itIndex = indexOfIterator(_memberData, it);
1688 if (itIndex == _selfIndex || it->up()) {
1689 vUp += _rsConfig.getMemberAt(itIndex).getNumVotes();
1690 }
1691 }
1692
1693 return vUp * 2 > _rsConfig.getTotalVotingMembers();
1694 }
1695
_findHealthyPrimaryOfEqualOrGreaterPriority(const int candidateIndex) const1696 int TopologyCoordinator::_findHealthyPrimaryOfEqualOrGreaterPriority(
1697 const int candidateIndex) const {
1698 const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority();
1699 for (auto it = _memberData.begin(); it != _memberData.end(); ++it) {
1700 if (!it->up() || it->getState() != MemberState::RS_PRIMARY) {
1701 continue;
1702 }
1703 const int itIndex = indexOfIterator(_memberData, it);
1704 const double priority = _rsConfig.getMemberAt(itIndex).getPriority();
1705 if (itIndex != candidateIndex && priority >= candidatePriority) {
1706 return itIndex;
1707 }
1708 }
1709
1710 return -1;
1711 }
1712
_isOpTimeCloseEnoughToLatestToElect(const OpTime & otherOpTime) const1713 bool TopologyCoordinator::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const {
1714 const OpTime latestKnownOpTime = _latestKnownOpTime();
1715 // Use addition instead of subtraction to avoid overflow.
1716 return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs());
1717 }
1718
_amIFreshEnoughForPriorityTakeover() const1719 bool TopologyCoordinator::_amIFreshEnoughForPriorityTakeover() const {
1720 const OpTime latestKnownOpTime = _latestKnownOpTime();
1721
1722 // Rules are:
1723 // - If the terms don't match, we don't call for priority takeover.
1724 // - If our optime and the latest optime happen in different seconds, our optime must be within
1725 // at least priorityTakeoverFreshnessWindowSeconds seconds of the latest optime.
1726 // - If our optime and the latest optime happen in the same second, our optime must be within
1727 // at least 1000 oplog entries of the latest optime (i.e. the increment portion of the timestamp
1728 // must be within 1000). This is to handle the case where a primary had its clock set far into
1729 // the future, took some writes, then had its clock set back. In that case the timestamp
1730 // component of all future oplog entries generated will be the same, until real world time
1731 // passes the timestamp component of the last oplog entry.
1732
1733 const OpTime ourLastOpApplied = getMyLastAppliedOpTime();
1734 if (ourLastOpApplied.getTerm() != latestKnownOpTime.getTerm()) {
1735 return false;
1736 }
1737
1738 if (ourLastOpApplied.getTimestamp().getSecs() != latestKnownOpTime.getTimestamp().getSecs()) {
1739 return ourLastOpApplied.getTimestamp().getSecs() + priorityTakeoverFreshnessWindowSeconds >=
1740 latestKnownOpTime.getTimestamp().getSecs();
1741 } else {
1742 return ourLastOpApplied.getTimestamp().getInc() + 1000 >=
1743 latestKnownOpTime.getTimestamp().getInc();
1744 }
1745 }
1746
_amIFreshEnoughForCatchupTakeover() const1747 bool TopologyCoordinator::_amIFreshEnoughForCatchupTakeover() const {
1748
1749 const OpTime latestKnownOpTime = _latestKnownOpTime();
1750
1751 // Rules are:
1752 // - We must have the freshest optime of all the up nodes.
1753 // - We must specifically have a fresher optime than the primary (can't be equal).
1754 // - The term of our last applied op must be less than the current term. This ensures that no
1755 // writes have happened since the most recent election and that the primary is still in
1756 // catchup mode.
1757
1758 // There is no point to a catchup takeover if we aren't the freshest node because
1759 // another node would immediately perform another catchup takeover when we become primary.
1760 const OpTime ourLastOpApplied = getMyLastAppliedOpTime();
1761 if (ourLastOpApplied < latestKnownOpTime) {
1762 return false;
1763 }
1764
1765 if (_currentPrimaryIndex == -1) {
1766 return false;
1767 }
1768
1769 // If we aren't ahead of the primary, there is no point to having a catchup takeover.
1770 const OpTime primaryLastOpApplied = _memberData[_currentPrimaryIndex].getLastAppliedOpTime();
1771
1772 if (ourLastOpApplied <= primaryLastOpApplied) {
1773 return false;
1774 }
1775
1776 // If the term of our last applied op is less than the current term, the primary didn't write
1777 // anything and it is still in catchup mode.
1778 return ourLastOpApplied.getTerm() < _term;
1779 }
1780
_iAmPrimary() const1781 bool TopologyCoordinator::_iAmPrimary() const {
1782 if (_role == Role::kLeader) {
1783 invariant(_currentPrimaryIndex == _selfIndex);
1784 invariant(_leaderMode != LeaderMode::kNotLeader);
1785 return true;
1786 }
1787 return false;
1788 }
1789
_latestKnownOpTime() const1790 OpTime TopologyCoordinator::_latestKnownOpTime() const {
1791 OpTime latest = getMyLastAppliedOpTime();
1792 for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
1793 ++it) {
1794 // Ignore self
1795 // TODO(russotto): Simplify when heartbeat and spanning tree times are combined.
1796 if (it->isSelf()) {
1797 continue;
1798 }
1799 // Ignore down members
1800 if (!it->up()) {
1801 continue;
1802 }
1803 // Ignore removed nodes (not in config, so not valid).
1804 if (it->getState().removed()) {
1805 continue;
1806 }
1807
1808 OpTime optime = it->getHeartbeatAppliedOpTime();
1809
1810 if (optime > latest) {
1811 latest = optime;
1812 }
1813 }
1814
1815 return latest;
1816 }
1817
_isMemberHigherPriority(int memberOneIndex,int memberTwoIndex) const1818 bool TopologyCoordinator::_isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const {
1819 if (memberOneIndex == -1)
1820 return false;
1821
1822 if (memberTwoIndex == -1)
1823 return true;
1824
1825 return _rsConfig.getMemberAt(memberOneIndex).getPriority() >
1826 _rsConfig.getMemberAt(memberTwoIndex).getPriority();
1827 }
1828
_getHighestPriorityElectableIndex(Date_t now) const1829 int TopologyCoordinator::_getHighestPriorityElectableIndex(Date_t now) const {
1830 int maxIndex = -1;
1831 for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) {
1832 UnelectableReasonMask reason = currentIndex == _selfIndex
1833 ? _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)
1834 : _getUnelectableReason(currentIndex);
1835 if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) {
1836 maxIndex = currentIndex;
1837 }
1838 }
1839
1840 return maxIndex;
1841 }
1842
prepareForUnconditionalStepDown()1843 bool TopologyCoordinator::prepareForUnconditionalStepDown() {
1844 if (_leaderMode == LeaderMode::kSteppingDown) {
1845 // Can only be processing one required stepdown at a time.
1846 return false;
1847 }
1848 // Heartbeat-initiated stepdowns take precedence over stepdown command initiated stepdowns, so
1849 // it's safe to transition from kAttemptingStepDown to kSteppingDown.
1850 _setLeaderMode(LeaderMode::kSteppingDown);
1851 return true;
1852 }
1853
1854 StatusWith<TopologyCoordinator::StepDownAttemptAbortFn>
prepareForStepDownAttempt()1855 TopologyCoordinator::prepareForStepDownAttempt() {
1856 if (_leaderMode == LeaderMode::kSteppingDown ||
1857 _leaderMode == LeaderMode::kAttemptingStepDown) {
1858 return Status{ErrorCodes::ConflictingOperationInProgress,
1859 "This node is already in the process of stepping down"};
1860 }
1861
1862 if (_leaderMode == LeaderMode::kNotLeader) {
1863 return Status{ErrorCodes::NotMaster, "This node is not a primary."};
1864 }
1865
1866 invariant(_leaderMode == LeaderMode::kMaster || _leaderMode == LeaderMode::kLeaderElect);
1867 const auto previousLeaderMode = _leaderMode;
1868 _setLeaderMode(LeaderMode::kAttemptingStepDown);
1869
1870 return {[this, previousLeaderMode] {
1871 if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) {
1872 _setLeaderMode(previousLeaderMode);
1873 }
1874 }};
1875 }
1876
changeMemberState_forTest(const MemberState & newMemberState,const Timestamp & electionTime)1877 void TopologyCoordinator::changeMemberState_forTest(const MemberState& newMemberState,
1878 const Timestamp& electionTime) {
1879 invariant(_selfIndex != -1);
1880 if (newMemberState == getMemberState())
1881 return;
1882 switch (newMemberState.s) {
1883 case MemberState::RS_PRIMARY:
1884 _role = Role::kCandidate;
1885 processWinElection(OID(), electionTime);
1886 invariant(_role == Role::kLeader);
1887 break;
1888 case MemberState::RS_SECONDARY:
1889 case MemberState::RS_ROLLBACK:
1890 case MemberState::RS_RECOVERING:
1891 case MemberState::RS_STARTUP2:
1892 _role = Role::kFollower;
1893 _followerMode = newMemberState.s;
1894 if (_currentPrimaryIndex == _selfIndex) {
1895 _currentPrimaryIndex = -1;
1896 _setLeaderMode(LeaderMode::kNotLeader);
1897 }
1898 break;
1899 case MemberState::RS_STARTUP:
1900 updateConfig(ReplSetConfig(), -1, Date_t());
1901 break;
1902 default:
1903 severe() << "Cannot switch to state " << newMemberState;
1904 invariant(false);
1905 }
1906 if (getMemberState() != newMemberState.s) {
1907 severe() << "Expected to enter state " << newMemberState << " but am now in "
1908 << getMemberState();
1909 invariant(false);
1910 }
1911 log() << newMemberState;
1912 }
1913
_setCurrentPrimaryForTest(int primaryIndex)1914 void TopologyCoordinator::_setCurrentPrimaryForTest(int primaryIndex) {
1915 if (primaryIndex == _selfIndex) {
1916 changeMemberState_forTest(MemberState::RS_PRIMARY);
1917 } else {
1918 if (_iAmPrimary()) {
1919 changeMemberState_forTest(MemberState::RS_SECONDARY);
1920 }
1921 if (primaryIndex != -1) {
1922 ReplSetHeartbeatResponse hbResponse;
1923 hbResponse.setState(MemberState::RS_PRIMARY);
1924 hbResponse.setElectionTime(Timestamp());
1925 hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime());
1926 hbResponse.setSyncingTo(HostAndPort());
1927 hbResponse.setHbMsg("");
1928 _memberData.at(primaryIndex)
1929 .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(),
1930 std::move(hbResponse));
1931 }
1932 _currentPrimaryIndex = primaryIndex;
1933 }
1934 }
1935
_currentPrimaryMember() const1936 const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const {
1937 if (_currentPrimaryIndex == -1)
1938 return NULL;
1939
1940 return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
1941 }
1942
prepareStatusResponse(const ReplSetStatusArgs & rsStatusArgs,BSONObjBuilder * response,Status * result)1943 void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
1944 BSONObjBuilder* response,
1945 Status* result) {
1946 // output for each member
1947 vector<BSONObj> membersOut;
1948 const MemberState myState = getMemberState();
1949 const Date_t now = rsStatusArgs.now;
1950 const OpTime lastOpApplied = getMyLastAppliedOpTime();
1951 const OpTime lastOpDurable = getMyLastDurableOpTime();
1952 const BSONObj& initialSyncStatus = rsStatusArgs.initialSyncStatus;
1953
1954 if (_selfIndex == -1) {
1955 // We're REMOVED or have an invalid config
1956 response->append("state", static_cast<int>(myState.s));
1957 response->append("stateStr", myState.toString());
1958 response->append("uptime", rsStatusArgs.selfUptime);
1959
1960 appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion());
1961
1962 response->appendDate("optimeDate",
1963 Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
1964 if (_maintenanceModeCalls) {
1965 response->append("maintenanceMode", _maintenanceModeCalls);
1966 }
1967 response->append("lastHeartbeatMessage", "");
1968 response->append("syncingTo", "");
1969 response->append("syncSourceHost", "");
1970 response->append("syncSourceId", -1);
1971
1972 response->append("infoMessage", _getHbmsg(now));
1973 *result = Status(ErrorCodes::InvalidReplicaSetConfig,
1974 "Our replica set config is invalid or we are not a member of it");
1975 return;
1976 }
1977
1978 for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
1979 ++it) {
1980 const int itIndex = indexOfIterator(_memberData, it);
1981 if (itIndex == _selfIndex) {
1982 // add self
1983 BSONObjBuilder bb;
1984 bb.append("_id", _selfConfig().getId());
1985 bb.append("name", _selfConfig().getHostAndPort().toString());
1986 bb.append("health", 1.0);
1987 bb.append("state", static_cast<int>(myState.s));
1988 bb.append("stateStr", myState.toString());
1989 bb.append("uptime", rsStatusArgs.selfUptime);
1990 if (!_selfConfig().isArbiter()) {
1991 appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion());
1992 bb.appendDate("optimeDate",
1993 Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
1994 }
1995
1996 if (!_syncSource.empty() && !_iAmPrimary()) {
1997 bb.append("syncingTo", _syncSource.toString());
1998 bb.append("syncSourceHost", _syncSource.toString());
1999 const MemberConfig* member = _rsConfig.findMemberByHostAndPort(_syncSource);
2000 bb.append("syncSourceId", member ? member->getId() : -1);
2001 } else {
2002 bb.append("syncingTo", "");
2003 bb.append("syncSourceHost", "");
2004 bb.append("syncSourceId", -1);
2005 }
2006
2007 if (_maintenanceModeCalls) {
2008 bb.append("maintenanceMode", _maintenanceModeCalls);
2009 }
2010
2011 bb.append("infoMessage", _getHbmsg(now));
2012
2013 if (myState.primary()) {
2014 bb.append("electionTime", _electionTime);
2015 bb.appendDate("electionDate",
2016 Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs())));
2017 }
2018 bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion());
2019 bb.append("self", true);
2020 bb.append("lastHeartbeatMessage", "");
2021 membersOut.push_back(bb.obj());
2022 } else {
2023 // add non-self member
2024 const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex);
2025 BSONObjBuilder bb;
2026 bb.append("_id", itConfig.getId());
2027 bb.append("name", itConfig.getHostAndPort().toString());
2028 double h = it->getHealth();
2029 bb.append("health", h);
2030 const MemberState state = it->getState();
2031 bb.append("state", static_cast<int>(state.s));
2032 if (h == 0) {
2033 // if we can't connect the state info is from the past
2034 // and could be confusing to show
2035 bb.append("stateStr", "(not reachable/healthy)");
2036 } else {
2037 bb.append("stateStr", it->getState().toString());
2038 }
2039
2040 const unsigned int uptime = static_cast<unsigned int>((
2041 it->getUpSince() != Date_t() ? durationCount<Seconds>(now - it->getUpSince()) : 0));
2042 bb.append("uptime", uptime);
2043 if (!itConfig.isArbiter()) {
2044 appendOpTime(
2045 &bb, "optime", it->getHeartbeatAppliedOpTime(), _rsConfig.getProtocolVersion());
2046 appendOpTime(&bb,
2047 "optimeDurable",
2048 it->getHeartbeatDurableOpTime(),
2049 _rsConfig.getProtocolVersion());
2050
2051 bb.appendDate("optimeDate",
2052 Date_t::fromDurationSinceEpoch(
2053 Seconds(it->getHeartbeatAppliedOpTime().getSecs())));
2054 bb.appendDate("optimeDurableDate",
2055 Date_t::fromDurationSinceEpoch(
2056 Seconds(it->getHeartbeatDurableOpTime().getSecs())));
2057 }
2058 bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
2059 bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
2060 Milliseconds ping = _getPing(itConfig.getHostAndPort());
2061 bb.append("pingMs", durationCount<Milliseconds>(ping));
2062 bb.append("lastHeartbeatMessage", it->getLastHeartbeatMsg());
2063 if (it->hasAuthIssue()) {
2064 bb.append("authenticated", false);
2065 }
2066 const HostAndPort& syncSource = it->getSyncSource();
2067 if (!syncSource.empty() && !state.primary()) {
2068 bb.append("syncingTo", syncSource.toString());
2069 bb.append("syncSourceHost", syncSource.toString());
2070 const MemberConfig* member = _rsConfig.findMemberByHostAndPort(syncSource);
2071 bb.append("syncSourceId", member ? member->getId() : -1);
2072 } else {
2073 bb.append("syncingTo", "");
2074 bb.append("syncSourceHost", "");
2075 bb.append("syncSourceId", -1);
2076 }
2077
2078 bb.append("infoMessage", "");
2079
2080 if (state == MemberState::RS_PRIMARY) {
2081 bb.append("electionTime", it->getElectionTime());
2082 bb.appendDate(
2083 "electionDate",
2084 Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs())));
2085 }
2086 bb.appendIntOrLL("configVersion", it->getConfigVersion());
2087 membersOut.push_back(bb.obj());
2088 }
2089 }
2090
2091 // sort members bson
2092 sort(membersOut.begin(), membersOut.end(), SimpleBSONObjComparator::kInstance.makeLessThan());
2093
2094 response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : "");
2095 response->append("date", now);
2096 response->append("myState", myState.s);
2097 response->append("term", _term);
2098
2099 // Add sync source info
2100 if (!_syncSource.empty() && !myState.primary() && !myState.removed()) {
2101 response->append("syncingTo", _syncSource.toString());
2102 response->append("syncSourceHost", _syncSource.toString());
2103 const MemberConfig* member = _rsConfig.findMemberByHostAndPort(_syncSource);
2104 response->append("syncSourceId", member ? member->getId() : -1);
2105 } else {
2106 response->append("syncingTo", "");
2107 response->append("syncSourceHost", "");
2108 response->append("syncSourceId", -1);
2109 }
2110
2111 if (_rsConfig.isConfigServer()) {
2112 response->append("configsvr", true);
2113 }
2114
2115 response->append("heartbeatIntervalMillis",
2116 durationCount<Milliseconds>(_rsConfig.getHeartbeatInterval()));
2117
2118 // New optimes, to hold them all.
2119 BSONObjBuilder optimes;
2120 _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime");
2121 if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) {
2122 rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime");
2123 }
2124
2125 appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion());
2126 appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion());
2127 response->append("optimes", optimes.obj());
2128
2129 if (!initialSyncStatus.isEmpty()) {
2130 response->append("initialSyncStatus", initialSyncStatus);
2131 }
2132
2133 response->append("members", membersOut);
2134 *result = Status::OK();
2135 }
2136
prepareReplSetUpdatePositionCommand(OpTime currentCommittedSnapshotOpTime) const2137 StatusWith<BSONObj> TopologyCoordinator::prepareReplSetUpdatePositionCommand(
2138 OpTime currentCommittedSnapshotOpTime) const {
2139 BSONObjBuilder cmdBuilder;
2140 invariant(_rsConfig.isInitialized());
2141 // Do not send updates if we have been removed from the config.
2142 if (_selfIndex == -1) {
2143 return Status(ErrorCodes::NodeNotFound,
2144 "This node is not in the current replset configuration.");
2145 }
2146 cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
2147 // Create an array containing objects each live member connected to us and for ourself.
2148 BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
2149 for (const auto& memberData : _memberData) {
2150 if (memberData.getLastAppliedOpTime().isNull()) {
2151 // Don't include info on members we haven't heard from yet.
2152 continue;
2153 }
2154 // Don't include members we think are down.
2155 if (!memberData.isSelf() && memberData.lastUpdateStale()) {
2156 continue;
2157 }
2158
2159 BSONObjBuilder entry(arrayBuilder.subobjStart());
2160 memberData.getLastDurableOpTime().append(&entry,
2161 UpdatePositionArgs::kDurableOpTimeFieldName);
2162 memberData.getLastAppliedOpTime().append(&entry,
2163 UpdatePositionArgs::kAppliedOpTimeFieldName);
2164 entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId());
2165 entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
2166 }
2167 arrayBuilder.done();
2168
2169 // Add metadata to command
2170 prepareReplSetMetadata(currentCommittedSnapshotOpTime)
2171 .writeToMetadata(&cmdBuilder)
2172 .transitional_ignore();
2173 return cmdBuilder.obj();
2174 }
2175
fillMemberData(BSONObjBuilder * result)2176 void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) {
2177 BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
2178 {
2179 for (const auto& memberData : _memberData) {
2180 BSONObjBuilder entry(replicationProgress.subobjStart());
2181 entry.append("rid", memberData.getRid());
2182 const auto lastDurableOpTime = memberData.getLastDurableOpTime();
2183 if (_rsConfig.getProtocolVersion() == 1) {
2184 BSONObjBuilder opTime(entry.subobjStart("optime"));
2185 opTime.append("ts", lastDurableOpTime.getTimestamp());
2186 opTime.append("term", lastDurableOpTime.getTerm());
2187 opTime.done();
2188 } else {
2189 entry.append("optime", lastDurableOpTime.getTimestamp());
2190 }
2191 entry.append("host", memberData.getHostAndPort().toString());
2192 if (_selfIndex >= 0) {
2193 const int memberId = memberData.getMemberId();
2194 invariant(memberId >= 0);
2195 entry.append("memberId", memberId);
2196 }
2197 }
2198 }
2199 }
2200
fillIsMasterForReplSet(IsMasterResponse * const response,const SplitHorizon::Parameters & horizonParams)2201 void TopologyCoordinator::fillIsMasterForReplSet(IsMasterResponse* const response,
2202 const SplitHorizon::Parameters& horizonParams) {
2203 const MemberState myState = getMemberState();
2204 if (!_rsConfig.isInitialized()) {
2205 response->markAsNoConfig();
2206 return;
2207 }
2208
2209 response->setReplSetName(_rsConfig.getReplSetName());
2210 if (myState.removed()) {
2211 response->markAsNoConfig();
2212 return;
2213 }
2214
2215 invariant(!_rsConfig.members().empty());
2216
2217 const auto& self = _rsConfig.members()[_selfIndex];
2218
2219 const auto horizon = self.determineHorizon(horizonParams).toString();
2220
2221 for (const auto& member : _rsConfig.members()) {
2222 if (member.isHidden() || member.getSlaveDelay() > Seconds{0}) {
2223 continue;
2224 }
2225 auto hostView = member.getHostAndPort(horizon);
2226
2227 if (member.isElectable()) {
2228 response->addHost(std::move(hostView));
2229 } else if (member.isArbiter()) {
2230 response->addArbiter(std::move(hostView));
2231 } else {
2232 response->addPassive(std::move(hostView));
2233 }
2234 }
2235
2236 response->setReplSetVersion(_rsConfig.getConfigVersion());
2237 // "ismaster" is false if we are not primary. If we're stepping down, we're waiting for the
2238 // Replication State Transition Lock before we can change to secondary, but we should report
2239 // "ismaster" false to indicate that we can't accept new writes.
2240 response->setIsMaster(myState.primary() && _leaderMode != LeaderMode::kSteppingDown);
2241 response->setIsSecondary(myState.secondary());
2242
2243 const MemberConfig* curPrimary = _currentPrimaryMember();
2244 if (curPrimary) {
2245 response->setPrimary(curPrimary->getHostAndPort(horizon));
2246 }
2247
2248 const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
2249 if (selfConfig.isArbiter()) {
2250 response->setIsArbiterOnly(true);
2251 } else if (selfConfig.getPriority() == 0) {
2252 response->setIsPassive(true);
2253 }
2254 if (selfConfig.getSlaveDelay() > Seconds(0)) {
2255 response->setSlaveDelay(selfConfig.getSlaveDelay());
2256 }
2257 if (selfConfig.isHidden()) {
2258 response->setIsHidden(true);
2259 }
2260 if (!selfConfig.shouldBuildIndexes()) {
2261 response->setShouldBuildIndexes(false);
2262 }
2263 const ReplSetTagConfig tagConfig = _rsConfig.getTagConfig();
2264 if (selfConfig.hasTags(tagConfig)) {
2265 for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd();
2266 ++tag) {
2267 std::string tagKey = tagConfig.getTagKey(*tag);
2268 if (tagKey[0] == '$') {
2269 // Filter out internal tags
2270 continue;
2271 }
2272 response->addTag(tagKey, tagConfig.getTagValue(*tag));
2273 }
2274 }
2275 response->setMe(selfConfig.getHostAndPort(horizon));
2276 if (_iAmPrimary()) {
2277 response->setElectionId(_electionId);
2278 }
2279 }
2280
2281 StatusWith<TopologyCoordinator::PrepareFreezeResponseResult>
prepareFreezeResponse(Date_t now,int secs,BSONObjBuilder * response)2282 TopologyCoordinator::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) {
2283 if (_role != TopologyCoordinator::Role::kFollower) {
2284 std::string msg = str::stream()
2285 << "cannot freeze node when primary or running for election. state: "
2286 << (_role == TopologyCoordinator::Role::kLeader ? "Primary" : "Running-Election");
2287 log() << msg;
2288 return Status(ErrorCodes::NotSecondary, msg);
2289 }
2290
2291 if (secs == 0) {
2292 _stepDownUntil = now;
2293 log() << "'unfreezing'";
2294 response->append("info", "unfreezing");
2295
2296 if (_rsConfig.getProtocolVersion() == 1)
2297 return PrepareFreezeResponseResult::kSingleNodeSelfElect;
2298 if (_isElectableNodeInSingleNodeReplicaSet()) {
2299 // If we are a one-node replica set, we're the one member,
2300 // we're electable, we're not in maintenance mode, and we are currently in followerMode
2301 // SECONDARY, we must transition to candidate now that our stepdown period
2302 // is no longer active, in leiu of heartbeats.
2303 _role = Role::kCandidate;
2304 return PrepareFreezeResponseResult::kSingleNodeSelfElect;
2305 }
2306 } else {
2307 if (secs == 1)
2308 response->append("warning", "you really want to freeze for only 1 second?");
2309
2310 _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs));
2311 log() << "'freezing' for " << secs << " seconds";
2312 }
2313
2314 return PrepareFreezeResponseResult::kNoAction;
2315 }
2316
becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now)2317 bool TopologyCoordinator::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) {
2318 if (_stepDownUntil > now) {
2319 return false;
2320 }
2321
2322 if (_isElectableNodeInSingleNodeReplicaSet()) {
2323 // If the new config describes a one-node replica set, we're the one member,
2324 // we're electable, we're not in maintenance mode, and we are currently in followerMode
2325 // SECONDARY, we must transition to candidate, in leiu of heartbeats.
2326 _role = Role::kCandidate;
2327 return true;
2328 }
2329 return false;
2330 }
2331
setElectionSleepUntil(Date_t newTime)2332 void TopologyCoordinator::setElectionSleepUntil(Date_t newTime) {
2333 if (_electionSleepUntil < newTime) {
2334 _electionSleepUntil = newTime;
2335 }
2336 }
2337
getElectionTime() const2338 Timestamp TopologyCoordinator::getElectionTime() const {
2339 return _electionTime;
2340 }
2341
getElectionId() const2342 OID TopologyCoordinator::getElectionId() const {
2343 return _electionId;
2344 }
2345
getCurrentPrimaryIndex() const2346 int TopologyCoordinator::getCurrentPrimaryIndex() const {
2347 return _currentPrimaryIndex;
2348 }
2349
getStepDownTime() const2350 Date_t TopologyCoordinator::getStepDownTime() const {
2351 return _stepDownUntil;
2352 }
2353
_updateHeartbeatDataForReconfig(const ReplSetConfig & newConfig,int selfIndex,Date_t now)2354 void TopologyCoordinator::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig,
2355 int selfIndex,
2356 Date_t now) {
2357 std::vector<MemberData> oldHeartbeats;
2358 _memberData.swap(oldHeartbeats);
2359
2360 int index = 0;
2361 for (ReplSetConfig::MemberIterator it = newConfig.membersBegin(); it != newConfig.membersEnd();
2362 ++it, ++index) {
2363 const MemberConfig& newMemberConfig = *it;
2364 MemberData newHeartbeatData;
2365 for (auto&& oldMemberData : oldHeartbeats) {
2366 if ((oldMemberData.getMemberId() == newMemberConfig.getId() &&
2367 oldMemberData.getHostAndPort() == newMemberConfig.getHostAndPort()) ||
2368 (index == selfIndex && oldMemberData.isSelf())) {
2369 // This member existed in the old config with the same member ID and
2370 // HostAndPort, so copy its heartbeat data over.
2371 newHeartbeatData = oldMemberData;
2372 break;
2373 }
2374 }
2375 newHeartbeatData.setConfigIndex(index);
2376 newHeartbeatData.setIsSelf(index == selfIndex);
2377 newHeartbeatData.setHostAndPort(newMemberConfig.getHostAndPort());
2378 newHeartbeatData.setMemberId(newMemberConfig.getId());
2379 _memberData.push_back(newHeartbeatData);
2380 }
2381 if (selfIndex < 0) {
2382 // It's necessary to have self member data even if self isn't in the configuration.
2383 // We don't need data for the other nodes (which no longer know about us, or soon won't)
2384 _memberData.clear();
2385 // We're not in the config, we can't sync any more.
2386 _syncSource = HostAndPort();
2387 MemberData newHeartbeatData;
2388 for (auto&& oldMemberData : oldHeartbeats) {
2389 if (oldMemberData.isSelf()) {
2390 newHeartbeatData = oldMemberData;
2391 break;
2392 }
2393 }
2394 newHeartbeatData.setConfigIndex(-1);
2395 newHeartbeatData.setIsSelf(true);
2396 _memberData.push_back(newHeartbeatData);
2397 }
2398 }
2399
2400 // This function installs a new config object and recreates MemberData objects
2401 // that reflect the new config.
updateConfig(const ReplSetConfig & newConfig,int selfIndex,Date_t now)2402 void TopologyCoordinator::updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now) {
2403 invariant(_role != Role::kCandidate);
2404 invariant(selfIndex < newConfig.getNumMembers());
2405
2406 // Reset term on startup and upgrade/downgrade of protocol version.
2407 if (!_rsConfig.isInitialized() ||
2408 _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) {
2409 if (newConfig.getProtocolVersion() == 1) {
2410 _term = OpTime::kInitialTerm;
2411 } else {
2412 invariant(newConfig.getProtocolVersion() == 0);
2413 _term = OpTime::kUninitializedTerm;
2414 }
2415 LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config";
2416 }
2417
2418 _updateHeartbeatDataForReconfig(newConfig, selfIndex, now);
2419 _rsConfig = newConfig;
2420 _selfIndex = selfIndex;
2421 _forceSyncSourceIndex = -1;
2422
2423 if (_role == Role::kLeader) {
2424 if (_selfIndex == -1) {
2425 log() << "Could not remain primary because no longer a member of the replica set";
2426 } else if (!_selfConfig().isElectable()) {
2427 log() << " Could not remain primary because no longer electable";
2428 } else {
2429 // Don't stepdown if you don't have to.
2430 _currentPrimaryIndex = _selfIndex;
2431 return;
2432 }
2433 _role = Role::kFollower;
2434 _setLeaderMode(LeaderMode::kNotLeader);
2435 }
2436
2437 // By this point we know we are in Role::kFollower
2438 _currentPrimaryIndex = -1; // force secondaries to re-detect who the primary is
2439
2440 if (_isElectableNodeInSingleNodeReplicaSet()) {
2441 // If the new config describes a one-node replica set, we're the one member,
2442 // we're electable, we're not in maintenance mode and we are currently in followerMode
2443 // SECONDARY, we must transition to candidate, in leiu of heartbeats.
2444 _role = Role::kCandidate;
2445 }
2446 }
_getHbmsg(Date_t now) const2447 std::string TopologyCoordinator::_getHbmsg(Date_t now) const {
2448 // ignore messages over 2 minutes old
2449 if ((now - _hbmsgTime) > Seconds{120}) {
2450 return "";
2451 }
2452 return _hbmsg;
2453 }
2454
setMyHeartbeatMessage(const Date_t now,const std::string & message)2455 void TopologyCoordinator::setMyHeartbeatMessage(const Date_t now, const std::string& message) {
2456 _hbmsgTime = now;
2457 _hbmsg = message;
2458 }
2459
_selfConfig() const2460 const MemberConfig& TopologyCoordinator::_selfConfig() const {
2461 return _rsConfig.getMemberAt(_selfIndex);
2462 }
2463
_selfMemberData() const2464 const MemberData& TopologyCoordinator::_selfMemberData() const {
2465 return _memberData[_selfMemberDataIndex()];
2466 }
2467
_selfMemberDataIndex() const2468 const int TopologyCoordinator::_selfMemberDataIndex() const {
2469 invariant(!_memberData.empty());
2470 if (_selfIndex >= 0)
2471 return _selfIndex;
2472 // In master-slave mode, the first entry is for self. If there is no config
2473 // or we're not in the config, the first-and-only entry should be for self.
2474 return 0;
2475 }
2476
_getUnelectableReason(int index) const2477 TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getUnelectableReason(
2478 int index) const {
2479 invariant(index != _selfIndex);
2480 const MemberConfig& memberConfig = _rsConfig.getMemberAt(index);
2481 const MemberData& hbData = _memberData.at(index);
2482 UnelectableReasonMask result = None;
2483 if (memberConfig.isArbiter()) {
2484 result |= ArbiterIAm;
2485 }
2486 if (memberConfig.getPriority() <= 0) {
2487 result |= NoPriority;
2488 }
2489 if (hbData.getState() != MemberState::RS_SECONDARY) {
2490 result |= NotSecondary;
2491 }
2492 if (_rsConfig.getProtocolVersion() == 0 &&
2493 !_isOpTimeCloseEnoughToLatestToElect(hbData.getHeartbeatAppliedOpTime())) {
2494 result |= NotCloseEnoughToLatestOptime;
2495 }
2496 if (hbData.up() && hbData.isUnelectable()) {
2497 result |= RefusesToStand;
2498 }
2499 invariant(result || memberConfig.isElectable());
2500 return result;
2501 }
2502
_getMyUnelectableReason(const Date_t now,StartElectionReason reason) const2503 TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getMyUnelectableReason(
2504 const Date_t now, StartElectionReason reason) const {
2505 UnelectableReasonMask result = None;
2506 const OpTime lastApplied = getMyLastAppliedOpTime();
2507 if (lastApplied.isNull()) {
2508 result |= NoData;
2509 }
2510 if (!_aMajoritySeemsToBeUp()) {
2511 result |= CannotSeeMajority;
2512 }
2513 if (_selfIndex == -1) {
2514 result |= NotInitialized;
2515 return result;
2516 }
2517 if (_selfConfig().isArbiter()) {
2518 result |= ArbiterIAm;
2519 }
2520 if (_selfConfig().getPriority() <= 0) {
2521 result |= NoPriority;
2522 }
2523 if (_stepDownUntil > now) {
2524 result |= StepDownPeriodActive;
2525 }
2526
2527 // Cannot be electable unless secondary or already primary
2528 if (!getMemberState().secondary() && !_iAmPrimary()) {
2529 result |= NotSecondary;
2530 }
2531
2532 if (_rsConfig.getProtocolVersion() == 0) {
2533 // Election rules only for protocol version 0.
2534 if (_voteLease.whoId != -1 &&
2535 _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() &&
2536 _voteLease.when + VoteLease::leaseTime >= now) {
2537 result |= VotedTooRecently;
2538 }
2539 if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied)) {
2540 result |= NotCloseEnoughToLatestOptime;
2541 }
2542 } else {
2543 // Election rules only for protocol version 1.
2544 invariant(_rsConfig.getProtocolVersion() == 1);
2545 if (reason == StartElectionReason::kPriorityTakeover &&
2546 !_amIFreshEnoughForPriorityTakeover()) {
2547 result |= NotCloseEnoughToLatestForPriorityTakeover;
2548 }
2549
2550 if (reason == StartElectionReason::kCatchupTakeover &&
2551 !_amIFreshEnoughForCatchupTakeover()) {
2552 result |= NotFreshEnoughForCatchupTakeover;
2553 }
2554 }
2555 return result;
2556 }
2557
_getUnelectableReasonString(const UnelectableReasonMask ur) const2558 std::string TopologyCoordinator::_getUnelectableReasonString(const UnelectableReasonMask ur) const {
2559 invariant(ur);
2560 str::stream ss;
2561 bool hasWrittenToStream = false;
2562 if (ur & NoData) {
2563 ss << "node has no applied oplog entries";
2564 hasWrittenToStream = true;
2565 }
2566 if (ur & VotedTooRecently) {
2567 if (hasWrittenToStream) {
2568 ss << "; ";
2569 }
2570 hasWrittenToStream = true;
2571 ss << "I recently voted for " << _voteLease.whoHostAndPort.toString();
2572 }
2573 if (ur & CannotSeeMajority) {
2574 if (hasWrittenToStream) {
2575 ss << "; ";
2576 }
2577 hasWrittenToStream = true;
2578 ss << "I cannot see a majority";
2579 }
2580 if (ur & ArbiterIAm) {
2581 if (hasWrittenToStream) {
2582 ss << "; ";
2583 }
2584 hasWrittenToStream = true;
2585 ss << "member is an arbiter";
2586 }
2587 if (ur & NoPriority) {
2588 if (hasWrittenToStream) {
2589 ss << "; ";
2590 }
2591 hasWrittenToStream = true;
2592 ss << "member has zero priority";
2593 }
2594 if (ur & StepDownPeriodActive) {
2595 if (hasWrittenToStream) {
2596 ss << "; ";
2597 }
2598 hasWrittenToStream = true;
2599 ss << "I am still waiting for stepdown period to end at "
2600 << dateToISOStringLocal(_stepDownUntil);
2601 }
2602 if (ur & NotSecondary) {
2603 if (hasWrittenToStream) {
2604 ss << "; ";
2605 }
2606 hasWrittenToStream = true;
2607 ss << "member is not currently a secondary";
2608 }
2609 if (ur & NotCloseEnoughToLatestOptime) {
2610 if (hasWrittenToStream) {
2611 ss << "; ";
2612 }
2613 hasWrittenToStream = true;
2614 ss << "member is more than 10 seconds behind the most up-to-date member";
2615 }
2616 if (ur & NotCloseEnoughToLatestForPriorityTakeover) {
2617 if (hasWrittenToStream) {
2618 ss << "; ";
2619 }
2620 hasWrittenToStream = true;
2621 ss << "member is not caught up enough to the most up-to-date member to call for priority "
2622 "takeover - must be within "
2623 << priorityTakeoverFreshnessWindowSeconds << " seconds";
2624 }
2625 if (ur & NotFreshEnoughForCatchupTakeover) {
2626 if (hasWrittenToStream) {
2627 ss << "; ";
2628 }
2629 hasWrittenToStream = true;
2630 ss << "member is either not the most up-to-date member or not ahead of the primary, and "
2631 "therefore cannot call for catchup takeover";
2632 }
2633 if (ur & NotInitialized) {
2634 if (hasWrittenToStream) {
2635 ss << "; ";
2636 }
2637 hasWrittenToStream = true;
2638 ss << "node is not a member of a valid replica set configuration";
2639 }
2640 if (ur & RefusesToStand) {
2641 if (hasWrittenToStream) {
2642 ss << "; ";
2643 }
2644 hasWrittenToStream = true;
2645 ss << "most recent heartbeat indicates node will not stand for election";
2646 }
2647 if (!hasWrittenToStream) {
2648 severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur);
2649 fassertFailed(26011);
2650 }
2651 ss << " (mask 0x" << integerToHex(ur) << ")";
2652 return ss;
2653 }
2654
_getPing(const HostAndPort & host)2655 Milliseconds TopologyCoordinator::_getPing(const HostAndPort& host) {
2656 return _pings[host].getMillis();
2657 }
2658
_setElectionTime(const Timestamp & newElectionTime)2659 void TopologyCoordinator::_setElectionTime(const Timestamp& newElectionTime) {
2660 _electionTime = newElectionTime;
2661 }
2662
_getTotalPings()2663 int TopologyCoordinator::_getTotalPings() {
2664 PingMap::iterator it = _pings.begin();
2665 PingMap::iterator end = _pings.end();
2666 int totalPings = 0;
2667 while (it != end) {
2668 totalPings += it->second.getCount();
2669 it++;
2670 }
2671 return totalPings;
2672 }
2673
getMaybeUpHostAndPorts() const2674 std::vector<HostAndPort> TopologyCoordinator::getMaybeUpHostAndPorts() const {
2675 std::vector<HostAndPort> upHosts;
2676 for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
2677 ++it) {
2678 const int itIndex = indexOfIterator(_memberData, it);
2679 if (itIndex == _selfIndex) {
2680 continue; // skip ourselves
2681 }
2682 if (!it->maybeUp()) {
2683 continue; // skip DOWN nodes
2684 }
2685
2686 upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort());
2687 }
2688 return upHosts;
2689 }
2690
voteForMyself(Date_t now)2691 bool TopologyCoordinator::voteForMyself(Date_t now) {
2692 if (_role != Role::kCandidate) {
2693 return false;
2694 }
2695 int selfId = _selfConfig().getId();
2696 if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) {
2697 log() << "not voting yea for " << selfId << " voted for "
2698 << _voteLease.whoHostAndPort.toString() << ' '
2699 << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
2700 return false;
2701 }
2702 _voteLease.when = now;
2703 _voteLease.whoId = selfId;
2704 _voteLease.whoHostAndPort = _selfConfig().getHostAndPort();
2705 return true;
2706 }
2707
isSteppingDown() const2708 bool TopologyCoordinator::isSteppingDown() const {
2709 return _leaderMode == LeaderMode::kAttemptingStepDown ||
2710 _leaderMode == LeaderMode::kSteppingDown;
2711 }
2712
_setLeaderMode(TopologyCoordinator::LeaderMode newMode)2713 void TopologyCoordinator::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) {
2714 // Invariants for valid state transitions.
2715 switch (_leaderMode) {
2716 case LeaderMode::kNotLeader:
2717 invariant(newMode == LeaderMode::kLeaderElect);
2718 break;
2719 case LeaderMode::kLeaderElect:
2720 invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case
2721 newMode == LeaderMode::kMaster ||
2722 newMode == LeaderMode::kAttemptingStepDown ||
2723 newMode == LeaderMode::kSteppingDown);
2724 break;
2725 case LeaderMode::kMaster:
2726 invariant(newMode == LeaderMode::kNotLeader || // TODO(SERVER-30852): remove this case
2727 newMode == LeaderMode::kAttemptingStepDown ||
2728 newMode == LeaderMode::kSteppingDown);
2729 break;
2730 case LeaderMode::kAttemptingStepDown:
2731 invariant(newMode == LeaderMode::kNotLeader || newMode == LeaderMode::kMaster ||
2732 newMode == LeaderMode::kSteppingDown || newMode == LeaderMode::kLeaderElect);
2733 break;
2734 case LeaderMode::kSteppingDown:
2735 invariant(newMode == LeaderMode::kNotLeader);
2736 break;
2737 }
2738 _leaderMode = std::move(newMode);
2739 }
2740
getMemberState() const2741 MemberState TopologyCoordinator::getMemberState() const {
2742 if (_selfIndex == -1) {
2743 if (_rsConfig.isInitialized()) {
2744 return MemberState::RS_REMOVED;
2745 }
2746 return MemberState::RS_STARTUP;
2747 }
2748
2749 if (_rsConfig.isConfigServer()) {
2750 if (_options.clusterRole != ClusterRole::ConfigServer && !skipShardingConfigurationChecks) {
2751 return MemberState::RS_REMOVED;
2752 } else {
2753 invariant(_storageEngineSupportsReadCommitted != ReadCommittedSupport::kUnknown);
2754 if (_storageEngineSupportsReadCommitted == ReadCommittedSupport::kNo) {
2755 return MemberState::RS_REMOVED;
2756 }
2757 }
2758 } else {
2759 if (_options.clusterRole == ClusterRole::ConfigServer && !skipShardingConfigurationChecks) {
2760 return MemberState::RS_REMOVED;
2761 }
2762 }
2763
2764 if (_role == Role::kLeader) {
2765 invariant(_currentPrimaryIndex == _selfIndex);
2766 invariant(_leaderMode != LeaderMode::kNotLeader);
2767 return MemberState::RS_PRIMARY;
2768 }
2769 const MemberConfig& myConfig = _selfConfig();
2770 if (myConfig.isArbiter()) {
2771 return MemberState::RS_ARBITER;
2772 }
2773 if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_memberData, _selfIndex))) &&
2774 (_followerMode == MemberState::RS_SECONDARY)) {
2775 return MemberState::RS_RECOVERING;
2776 }
2777 return _followerMode;
2778 }
2779
canAcceptWrites() const2780 bool TopologyCoordinator::canAcceptWrites() const {
2781 return _leaderMode == LeaderMode::kMaster;
2782 }
2783
setElectionInfo(OID electionId,Timestamp electionOpTime)2784 void TopologyCoordinator::setElectionInfo(OID electionId, Timestamp electionOpTime) {
2785 invariant(_role == Role::kLeader);
2786 _electionTime = electionOpTime;
2787 _electionId = electionId;
2788 }
2789
processWinElection(OID electionId,Timestamp electionOpTime)2790 void TopologyCoordinator::processWinElection(OID electionId, Timestamp electionOpTime) {
2791 invariant(_role == Role::kCandidate);
2792 invariant(_leaderMode == LeaderMode::kNotLeader);
2793 _role = Role::kLeader;
2794 _setLeaderMode(LeaderMode::kLeaderElect);
2795 setElectionInfo(electionId, electionOpTime);
2796 _currentPrimaryIndex = _selfIndex;
2797 _syncSource = HostAndPort();
2798 _forceSyncSourceIndex = -1;
2799 // Prevent last committed optime from updating until we finish draining.
2800 _firstOpTimeOfMyTerm =
2801 OpTime(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<int>::max());
2802 }
2803
processLoseElection()2804 void TopologyCoordinator::processLoseElection() {
2805 invariant(_role == Role::kCandidate);
2806 invariant(_leaderMode == LeaderMode::kNotLeader);
2807 const HostAndPort syncSourceAddress = getSyncSourceAddress();
2808 _electionTime = Timestamp(0, 0);
2809 _electionId = OID();
2810 _role = Role::kFollower;
2811
2812 // Clear voteLease time, if we voted for ourselves in this election.
2813 // This will allow us to vote for others.
2814 if (_voteLease.whoId == _selfConfig().getId()) {
2815 _voteLease.when = Date_t();
2816 }
2817 }
2818
attemptStepDown(long long termAtStart,Date_t now,Date_t waitUntil,Date_t stepDownUntil,bool force)2819 bool TopologyCoordinator::attemptStepDown(
2820 long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) {
2821
2822 if (_role != Role::kLeader || _leaderMode == LeaderMode::kSteppingDown ||
2823 _term != termAtStart) {
2824 uasserted(ErrorCodes::PrimarySteppedDown,
2825 "While waiting for secondaries to catch up before stepping down, "
2826 "this node decided to step down for other reasons");
2827 }
2828 invariant(_leaderMode == LeaderMode::kAttemptingStepDown);
2829
2830 if (now >= stepDownUntil) {
2831 uasserted(ErrorCodes::ExceededTimeLimit,
2832 "By the time we were ready to step down, we were already past the "
2833 "time we were supposed to step down until");
2834 }
2835
2836 if (!_canCompleteStepDownAttempt(now, waitUntil, force)) {
2837 // Stepdown attempt failed.
2838
2839 // Check waitUntil after at least one stepdown attempt, so that stepdown could succeed even
2840 // if secondaryCatchUpPeriodSecs == 0.
2841 if (now >= waitUntil) {
2842 uasserted(ErrorCodes::ExceededTimeLimit,
2843 str::stream() << "No electable secondaries caught up as of "
2844 << dateToISOStringLocal(now)
2845 << "Please use the replSetStepDown command with the argument "
2846 << "{force: true} to force node to step down.");
2847 }
2848
2849 // Stepdown attempt failed, but in a way that can be retried
2850 return false;
2851 }
2852
2853 // Stepdown attempt success!
2854 _stepDownUntil = stepDownUntil;
2855 _stepDownSelfAndReplaceWith(-1);
2856 return true;
2857 }
2858
_canCompleteStepDownAttempt(Date_t now,Date_t waitUntil,bool force)2859 bool TopologyCoordinator::_canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force) {
2860 const bool forceNow = force && (now >= waitUntil);
2861 if (forceNow) {
2862 return true;
2863 }
2864
2865 return isSafeToStepDown();
2866 }
2867
_isCaughtUpAndElectable(int memberIndex,OpTime lastApplied)2868 bool TopologyCoordinator::_isCaughtUpAndElectable(int memberIndex, OpTime lastApplied) {
2869 if (_getUnelectableReason(memberIndex)) {
2870 return false;
2871 }
2872
2873 return (_memberData.at(memberIndex).getLastAppliedOpTime() >= lastApplied);
2874 }
2875
isSafeToStepDown()2876 bool TopologyCoordinator::isSafeToStepDown() {
2877 if (!_rsConfig.isInitialized() || _selfIndex < 0) {
2878 return false;
2879 }
2880
2881 OpTime lastApplied = getMyLastAppliedOpTime();
2882
2883 auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName);
2884 invariant(tagStatus.isOK());
2885
2886 // Check if a majority of nodes have reached the last applied optime.
2887 if (!haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false)) {
2888 return false;
2889 }
2890
2891 // Now check that we also have at least one caught up node that is electable.
2892 for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) {
2893 // ignore your self
2894 if (memberIndex == _selfIndex) {
2895 continue;
2896 }
2897 if (_isCaughtUpAndElectable(memberIndex, lastApplied)) {
2898 return true;
2899 }
2900 }
2901
2902 return false;
2903 }
2904
chooseElectionHandoffCandidate()2905 int TopologyCoordinator::chooseElectionHandoffCandidate() {
2906
2907 OpTime lastApplied = getMyLastAppliedOpTime();
2908
2909 int bestCandidateIndex = -1;
2910 int highestPriority = -1;
2911
2912 for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) {
2913
2914 // Skip your own member index.
2915 if (memberIndex == _selfIndex) {
2916 continue;
2917 }
2918
2919 // Skip this node if it is not eligible to become primary. This includes nodes with
2920 // priority 0.
2921 if (!_isCaughtUpAndElectable(memberIndex, lastApplied)) {
2922 continue;
2923 }
2924
2925 // Only update best if priority is strictly greater. This guarantees that
2926 // we will pick the member with the lowest index in case of a tie. Note that
2927 // member priority is always a non-negative number.
2928 auto memberPriority = _rsConfig.getMemberAt(memberIndex).getPriority();
2929 if (memberPriority > highestPriority) {
2930 bestCandidateIndex = memberIndex;
2931 highestPriority = memberPriority;
2932 }
2933 }
2934
2935 // This is the most suitable node.
2936 return bestCandidateIndex;
2937 }
2938
setFollowerMode(MemberState::MS newMode)2939 void TopologyCoordinator::setFollowerMode(MemberState::MS newMode) {
2940 invariant(_role == Role::kFollower);
2941 switch (newMode) {
2942 case MemberState::RS_RECOVERING:
2943 case MemberState::RS_ROLLBACK:
2944 case MemberState::RS_SECONDARY:
2945 case MemberState::RS_STARTUP2:
2946 _followerMode = newMode;
2947 break;
2948 default:
2949 invariant(false);
2950 }
2951
2952 if (_followerMode != MemberState::RS_SECONDARY) {
2953 return;
2954 }
2955
2956 // When a single node replica set transitions to SECONDARY, we must check if we should
2957 // be a candidate here. This is necessary because a single node replica set has no
2958 // heartbeats that would normally change the role to candidate.
2959
2960 if (_isElectableNodeInSingleNodeReplicaSet()) {
2961 _role = Role::kCandidate;
2962 }
2963 }
2964
_isElectableNodeInSingleNodeReplicaSet() const2965 bool TopologyCoordinator::_isElectableNodeInSingleNodeReplicaSet() const {
2966 return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 &&
2967 _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() &&
2968 _maintenanceModeCalls == 0;
2969 }
2970
finishUnconditionalStepDown()2971 void TopologyCoordinator::finishUnconditionalStepDown() {
2972 invariant(_leaderMode == LeaderMode::kSteppingDown);
2973
2974 int remotePrimaryIndex = -1;
2975 for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
2976 ++it) {
2977 const int itIndex = indexOfIterator(_memberData, it);
2978 if (itIndex == _selfIndex) {
2979 continue;
2980 }
2981
2982 if (it->getState().primary() && it->up()) {
2983 if (remotePrimaryIndex != -1) {
2984 // two other nodes think they are primary (asynchronously polled)
2985 // -- wait for things to settle down.
2986 remotePrimaryIndex = -1;
2987 warning() << "two remote primaries (transiently)";
2988 break;
2989 }
2990 remotePrimaryIndex = itIndex;
2991 }
2992 }
2993 _stepDownSelfAndReplaceWith(remotePrimaryIndex);
2994 }
2995
_stepDownSelfAndReplaceWith(int newPrimary)2996 void TopologyCoordinator::_stepDownSelfAndReplaceWith(int newPrimary) {
2997 invariant(_role == Role::kLeader);
2998 invariant(_selfIndex != -1);
2999 invariant(_selfIndex != newPrimary);
3000 invariant(_selfIndex == _currentPrimaryIndex);
3001 _currentPrimaryIndex = newPrimary;
3002 _role = Role::kFollower;
3003 _setLeaderMode(LeaderMode::kNotLeader);
3004 }
3005
updateLastCommittedOpTime()3006 bool TopologyCoordinator::updateLastCommittedOpTime() {
3007 // If we're not primary or we're stepping down due to learning of a new term then we must not
3008 // advance the commit point. If we are stepping down due to a user request, however, then it
3009 // is safe to advance the commit point, and in fact we must since the stepdown request may be
3010 // waiting for the commit point to advance enough to be able to safely complete the step down.
3011 if (!_iAmPrimary() || _leaderMode == LeaderMode::kSteppingDown) {
3012 return false;
3013 }
3014
3015 // Whether we use the applied or durable OpTime for the commit point is decided here.
3016 const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal();
3017
3018 std::vector<OpTime> votingNodesOpTimes;
3019 for (const auto& memberData : _memberData) {
3020 int memberIndex = memberData.getConfigIndex();
3021 invariant(memberIndex >= 0);
3022 const auto& memberConfig = _rsConfig.getMemberAt(memberIndex);
3023 if (memberConfig.isVoter()) {
3024 const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime()
3025 : memberData.getLastAppliedOpTime();
3026 votingNodesOpTimes.push_back(opTime);
3027 }
3028 }
3029
3030 invariant(votingNodesOpTimes.size() > 0);
3031 if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) {
3032 return false;
3033 }
3034 std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
3035
3036 // need the majority to have this OpTime
3037 OpTime committedOpTime =
3038 votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
3039 return advanceLastCommittedOpTime(committedOpTime);
3040 }
3041
advanceLastCommittedOpTime(OpTime committedOpTime)3042 bool TopologyCoordinator::advanceLastCommittedOpTime(OpTime committedOpTime) {
3043 if (_selfIndex == -1) {
3044 // The config hasn't been installed or we are not in the config. This could happen
3045 // on heartbeats before installing a config.
3046 return false;
3047 }
3048
3049 // This check is performed to ensure primaries do not commit an OpTime from a previous term.
3050 if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) {
3051 LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: "
3052 << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm;
3053 return false;
3054 }
3055
3056 // Arbiters don't have data so they always advance their commit point via heartbeats.
3057 if (!_selfConfig().isArbiter() &&
3058 getMyLastAppliedOpTime().getTerm() != committedOpTime.getTerm()) {
3059 committedOpTime = std::min(committedOpTime, getMyLastAppliedOpTime());
3060 }
3061
3062 if (committedOpTime == _lastCommittedOpTime) {
3063 return false; // Hasn't changed, so ignore it.
3064 }
3065
3066 if (committedOpTime < _lastCommittedOpTime) {
3067 LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime
3068 << ", currentCommittedOpTime: " << _lastCommittedOpTime;
3069 return false;
3070 }
3071
3072 LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime;
3073 _lastCommittedOpTime = committedOpTime;
3074 return true;
3075 }
3076
resetLastCommittedOpTime()3077 void TopologyCoordinator::resetLastCommittedOpTime() {
3078 _lastCommittedOpTime = OpTime();
3079 }
3080
getLastCommittedOpTime() const3081 OpTime TopologyCoordinator::getLastCommittedOpTime() const {
3082 return _lastCommittedOpTime;
3083 }
3084
canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const3085 bool TopologyCoordinator::canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const {
3086
3087 if (termWhenDrainCompleted != _term) {
3088 return false;
3089 }
3090 // Allow completing the transition to primary even when in the middle of a stepdown attempt,
3091 // in case the stepdown attempt fails.
3092 // Allow calling this function even if the node is already primary on PV upgrade.
3093 if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown &&
3094 _leaderMode != LeaderMode::kMaster) {
3095 return false;
3096 }
3097
3098 return true;
3099 }
3100
completeTransitionToPrimary(const OpTime & firstOpTimeOfTerm)3101 Status TopologyCoordinator::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) {
3102 if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) {
3103 return Status(ErrorCodes::PrimarySteppedDown,
3104 "By the time this node was ready to complete its transition to PRIMARY it "
3105 "was no longer eligible to do so");
3106 }
3107 if (_leaderMode == LeaderMode::kLeaderElect) {
3108 _setLeaderMode(LeaderMode::kMaster);
3109 }
3110 _firstOpTimeOfMyTerm = firstOpTimeOfTerm;
3111 return Status::OK();
3112 }
3113
adjustMaintenanceCountBy(int inc)3114 void TopologyCoordinator::adjustMaintenanceCountBy(int inc) {
3115 invariant(_role == Role::kFollower);
3116 _maintenanceModeCalls += inc;
3117 invariant(_maintenanceModeCalls >= 0);
3118 }
3119
getMaintenanceCount() const3120 int TopologyCoordinator::getMaintenanceCount() const {
3121 return _maintenanceModeCalls;
3122 }
3123
updateTerm(long long term,Date_t now)3124 TopologyCoordinator::UpdateTermResult TopologyCoordinator::updateTerm(long long term, Date_t now) {
3125 if (term <= _term) {
3126 return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate;
3127 }
3128 // Don't run election if we just stood up or learned about a new term.
3129 _electionSleepUntil = now + _rsConfig.getElectionTimeoutPeriod();
3130
3131 // Don't update the term just yet if we are going to step down, as we don't want to report
3132 // that we are primary in the new term.
3133 if (_iAmPrimary()) {
3134 return TopologyCoordinator::UpdateTermResult::kTriggerStepDown;
3135 }
3136 LOG(1) << "Updating term from " << _term << " to " << term;
3137 _term = term;
3138 return TopologyCoordinator::UpdateTermResult::kUpdatedTerm;
3139 }
3140
3141
getTerm() const3142 long long TopologyCoordinator::getTerm() const {
3143 return _term;
3144 }
3145
3146 // TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the
3147 // replset. Passing metadata is unnecessary.
shouldChangeSyncSource(const HostAndPort & currentSource,const rpc::ReplSetMetadata & replMetadata,boost::optional<rpc::OplogQueryMetadata> oqMetadata,Date_t now) const3148 bool TopologyCoordinator::shouldChangeSyncSource(
3149 const HostAndPort& currentSource,
3150 const rpc::ReplSetMetadata& replMetadata,
3151 boost::optional<rpc::OplogQueryMetadata> oqMetadata,
3152 Date_t now) const {
3153 // Methodology:
3154 // If there exists a viable sync source member other than currentSource, whose oplog has
3155 // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's,
3156 // return true.
3157 // If the currentSource has the same replication progress as we do and has no source for further
3158 // progress, return true.
3159
3160 if (_selfIndex == -1) {
3161 log() << "Not choosing new sync source because we are not in the config.";
3162 return false;
3163 }
3164
3165 // If the user requested a sync source change, return true.
3166 if (_forceSyncSourceIndex != -1) {
3167 log() << "Choosing new sync source because the user has requested to use "
3168 << _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort()
3169 << " as a sync source";
3170 return true;
3171 }
3172
3173 if (_rsConfig.getProtocolVersion() == 1 &&
3174 replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
3175 log() << "Choosing new sync source because the config version supplied by " << currentSource
3176 << ", " << replMetadata.getConfigVersion() << ", does not match ours, "
3177 << _rsConfig.getConfigVersion();
3178 return true;
3179 }
3180
3181 const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
3182 // PV0 doesn't use metadata, we have to consult _rsConfig.
3183 if (currentSourceIndex == -1) {
3184 log() << "Choosing new sync source because " << currentSource.toString()
3185 << " is not in our config";
3186 return true;
3187 }
3188
3189 invariant(currentSourceIndex != _selfIndex);
3190
3191 // If OplogQueryMetadata was provided, use its values, otherwise use the ones in
3192 // ReplSetMetadata.
3193 OpTime currentSourceOpTime;
3194 int syncSourceIndex = -1;
3195 int primaryIndex = -1;
3196 if (oqMetadata) {
3197 currentSourceOpTime =
3198 std::max(oqMetadata->getLastOpApplied(),
3199 _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
3200 syncSourceIndex = oqMetadata->getSyncSourceIndex();
3201 primaryIndex = oqMetadata->getPrimaryIndex();
3202 } else {
3203 currentSourceOpTime =
3204 std::max(replMetadata.getLastOpVisible(),
3205 _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
3206 syncSourceIndex = replMetadata.getSyncSourceIndex();
3207 primaryIndex = replMetadata.getPrimaryIndex();
3208 }
3209
3210 if (currentSourceOpTime.isNull()) {
3211 // Haven't received a heartbeat from the sync source yet, so can't tell if we should
3212 // change.
3213 return false;
3214 }
3215
3216 // Change sync source if they are not ahead of us, and don't have a sync source,
3217 // unless they are primary.
3218 const OpTime myLastOpTime = getMyLastAppliedOpTime();
3219 if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 &&
3220 currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) {
3221 std::stringstream logMessage;
3222 logMessage << "Choosing new sync source because our current sync source, "
3223 << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime
3224 << ") which is not ahead of ours (" << myLastOpTime
3225 << "), it does not have a sync source, and it's not the primary";
3226 if (primaryIndex >= 0) {
3227 logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)";
3228 } else {
3229 logMessage << " (sync source does not know the primary)";
3230 }
3231 log() << logMessage.str();
3232 return true;
3233 }
3234
3235 if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) {
3236 log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent "
3237 "OpTime, "
3238 << currentSourceOpTime.toString() << ", of our current sync source, " << currentSource
3239 << ", against the OpTimes of the other nodes in this replica set.";
3240 } else {
3241 unsigned int currentSecs = currentSourceOpTime.getSecs();
3242 unsigned int goalSecs = currentSecs + durationCount<Seconds>(_options.maxSyncSourceLagSecs);
3243
3244 for (std::vector<MemberData>::const_iterator it = _memberData.begin();
3245 it != _memberData.end();
3246 ++it) {
3247 const int itIndex = indexOfIterator(_memberData, it);
3248 const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex);
3249 if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) &&
3250 (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) &&
3251 it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) &&
3252 goalSecs < it->getHeartbeatAppliedOpTime().getSecs()) {
3253 log() << "Choosing new sync source because the most recent OpTime of our sync "
3254 "source, "
3255 << currentSource << ", is " << currentSourceOpTime.toString()
3256 << " which is more than " << _options.maxSyncSourceLagSecs
3257 << " behind member " << candidateConfig.getHostAndPort().toString()
3258 << " whose most recent OpTime is "
3259 << it->getHeartbeatAppliedOpTime().toString();
3260 invariant(itIndex != _selfIndex);
3261 return true;
3262 }
3263 }
3264 }
3265
3266 return false;
3267 }
3268
prepareReplSetMetadata(const OpTime & lastVisibleOpTime) const3269 rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata(
3270 const OpTime& lastVisibleOpTime) const {
3271 return rpc::ReplSetMetadata(_term,
3272 _lastCommittedOpTime,
3273 lastVisibleOpTime,
3274 _rsConfig.getConfigVersion(),
3275 _rsConfig.getReplicaSetId(),
3276 _currentPrimaryIndex,
3277 _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
3278 }
3279
prepareOplogQueryMetadata(int rbid) const3280 rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) const {
3281 return rpc::OplogQueryMetadata(_lastCommittedOpTime,
3282 getMyLastAppliedOpTime(),
3283 rbid,
3284 _currentPrimaryIndex,
3285 _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
3286 }
3287
summarizeAsHtml(ReplSetHtmlSummary * output)3288 void TopologyCoordinator::summarizeAsHtml(ReplSetHtmlSummary* output) {
3289 output->setConfig(_rsConfig);
3290 output->setHBData(_memberData);
3291 output->setSelfIndex(_selfIndex);
3292 output->setPrimaryIndex(_currentPrimaryIndex);
3293 output->setSelfState(getMemberState());
3294 output->setSelfHeartbeatMessage(_hbmsg);
3295 }
3296
processReplSetRequestVotes(const ReplSetRequestVotesArgs & args,ReplSetRequestVotesResponse * response)3297 void TopologyCoordinator::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
3298 ReplSetRequestVotesResponse* response) {
3299 response->setTerm(_term);
3300
3301 if (args.getTerm() < _term) {
3302 response->setVoteGranted(false);
3303 response->setReason(str::stream() << "candidate's term (" << args.getTerm()
3304 << ") is lower than mine ("
3305 << _term
3306 << ")");
3307 } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) {
3308 response->setVoteGranted(false);
3309 response->setReason(str::stream() << "candidate's config version ("
3310 << args.getConfigVersion()
3311 << ") differs from mine ("
3312 << _rsConfig.getConfigVersion()
3313 << ")");
3314 } else if (args.getSetName() != _rsConfig.getReplSetName()) {
3315 response->setVoteGranted(false);
3316 response->setReason(str::stream() << "candidate's set name (" << args.getSetName()
3317 << ") differs from mine ("
3318 << _rsConfig.getReplSetName()
3319 << ")");
3320 } else if (args.getLastDurableOpTime() < getMyLastAppliedOpTime()) {
3321 response->setVoteGranted(false);
3322 response
3323 ->setReason(str::stream()
3324 << "candidate's data is staler than mine. candidate's last applied OpTime: "
3325 << args.getLastDurableOpTime().toString()
3326 << ", my last applied OpTime: "
3327 << getMyLastAppliedOpTime().toString());
3328 } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) {
3329 response->setVoteGranted(false);
3330 response->setReason(str::stream()
3331 << "already voted for another candidate ("
3332 << _rsConfig.getMemberAt(_lastVote.getCandidateIndex()).getHostAndPort()
3333 << ") this term ("
3334 << _lastVote.getTerm()
3335 << ")");
3336 } else {
3337 int betterPrimary = _findHealthyPrimaryOfEqualOrGreaterPriority(args.getCandidateIndex());
3338 if (_selfConfig().isArbiter() && betterPrimary >= 0) {
3339 response->setVoteGranted(false);
3340 response->setReason(str::stream()
3341 << "can see a healthy primary ("
3342 << _rsConfig.getMemberAt(betterPrimary).getHostAndPort()
3343 << ") of equal or greater priority");
3344 } else {
3345 if (!args.isADryRun()) {
3346 _lastVote.setTerm(args.getTerm());
3347 _lastVote.setCandidateIndex(args.getCandidateIndex());
3348 }
3349 response->setVoteGranted(true);
3350 }
3351 }
3352 }
3353
loadLastVote(const LastVote & lastVote)3354 void TopologyCoordinator::loadLastVote(const LastVote& lastVote) {
3355 _lastVote = lastVote;
3356 }
3357
voteForMyselfV1()3358 void TopologyCoordinator::voteForMyselfV1() {
3359 _lastVote.setTerm(_term);
3360 _lastVote.setCandidateIndex(_selfIndex);
3361 }
3362
setPrimaryIndex(long long primaryIndex)3363 void TopologyCoordinator::setPrimaryIndex(long long primaryIndex) {
3364 _currentPrimaryIndex = primaryIndex;
3365 }
3366
becomeCandidateIfElectable(const Date_t now,StartElectionReason reason)3367 Status TopologyCoordinator::becomeCandidateIfElectable(const Date_t now,
3368 StartElectionReason reason) {
3369 if (_role == Role::kLeader) {
3370 return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"};
3371 }
3372
3373 if (_role == Role::kCandidate) {
3374 return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"};
3375 }
3376
3377 const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, reason);
3378 if (unelectableReason) {
3379 return {ErrorCodes::NodeNotElectable,
3380 str::stream() << "Not standing for election because "
3381 << _getUnelectableReasonString(unelectableReason)};
3382 }
3383
3384 // All checks passed, become a candidate and start election proceedings.
3385 _role = Role::kCandidate;
3386
3387 return Status::OK();
3388 }
3389
setStorageEngineSupportsReadCommitted(bool supported)3390 void TopologyCoordinator::setStorageEngineSupportsReadCommitted(bool supported) {
3391 _storageEngineSupportsReadCommitted =
3392 supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo;
3393 }
3394
restartHeartbeats()3395 void TopologyCoordinator::restartHeartbeats() {
3396 for (auto& hb : _memberData) {
3397 hb.restart();
3398 }
3399 }
3400
latestKnownOpTimeSinceHeartbeatRestart() const3401 boost::optional<OpTime> TopologyCoordinator::latestKnownOpTimeSinceHeartbeatRestart() const {
3402 // The smallest OpTime in PV1.
3403 OpTime latest(Timestamp(0, 0), 0);
3404 for (size_t i = 0; i < _memberData.size(); i++) {
3405 auto& peer = _memberData[i];
3406
3407 if (static_cast<int>(i) == _selfIndex) {
3408 continue;
3409 }
3410 // If any heartbeat is not fresh enough, return none.
3411 if (!peer.isUpdatedSinceRestart()) {
3412 return boost::none;
3413 }
3414 // Ignore down members
3415 if (!peer.up()) {
3416 continue;
3417 }
3418 if (peer.getHeartbeatAppliedOpTime() > latest) {
3419 latest = peer.getHeartbeatAppliedOpTime();
3420 }
3421 }
3422 return latest;
3423 }
3424
3425 } // namespace repl
3426 } // namespace mongo
3427