1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/repl/topology_coordinator.h"
36 
37 #include <limits>
38 #include <string>
39 
40 #include "mongo/db/audit.h"
41 #include "mongo/db/client.h"
42 #include "mongo/db/mongod_options.h"
43 #include "mongo/db/operation_context.h"
44 #include "mongo/db/repl/heartbeat_response_action.h"
45 #include "mongo/db/repl/is_master_response.h"
46 #include "mongo/db/repl/isself.h"
47 #include "mongo/db/repl/repl_set_heartbeat_args.h"
48 #include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
49 #include "mongo/db/repl/repl_set_heartbeat_response.h"
50 #include "mongo/db/repl/repl_set_html_summary.h"
51 #include "mongo/db/repl/repl_set_request_votes_args.h"
52 #include "mongo/db/repl/rslog.h"
53 #include "mongo/db/repl/update_position_args.h"
54 #include "mongo/db/server_parameters.h"
55 #include "mongo/rpc/metadata/oplog_query_metadata.h"
56 #include "mongo/rpc/metadata/repl_set_metadata.h"
57 #include "mongo/util/assert_util.h"
58 #include "mongo/util/fail_point_service.h"
59 #include "mongo/util/hex.h"
60 #include "mongo/util/log.h"
61 #include "mongo/util/mongoutils/str.h"
62 #include "mongo/util/scopeguard.h"
63 
64 namespace mongo {
65 namespace repl {
66 
67 using std::vector;
68 
69 MONGO_FP_DECLARE(forceSyncSourceCandidate);
70 
71 const Seconds TopologyCoordinator::VoteLease::leaseTime = Seconds(30);
72 
73 // Controls how caught up in replication a secondary with higher priority than the current primary
74 // must be before it will call for a priority takeover election.
75 MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2);
76 
77 // If this fail point is enabled, TopologyCoordinator::shouldChangeSyncSource() will ignore
78 // the option TopologyCoordinator::Options::maxSyncSourceLagSecs. The sync source will not be
79 // re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds.
80 MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs);
81 
82 constexpr Milliseconds TopologyCoordinator::PingStats::UninitializedPingTime;
83 
roleToString(TopologyCoordinator::Role role)84 std::string TopologyCoordinator::roleToString(TopologyCoordinator::Role role) {
85     switch (role) {
86         case TopologyCoordinator::Role::kLeader:
87             return "leader";
88         case TopologyCoordinator::Role::kFollower:
89             return "follower";
90         case TopologyCoordinator::Role::kCandidate:
91             return "candidate";
92     }
93     invariant(false);
94 }
95 
~TopologyCoordinator()96 TopologyCoordinator::~TopologyCoordinator() {}
97 
operator <<(std::ostream & os,TopologyCoordinator::Role role)98 std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role) {
99     return os << TopologyCoordinator::roleToString(role);
100 }
101 
operator <<(std::ostream & os,TopologyCoordinator::PrepareFreezeResponseResult result)102 std::ostream& operator<<(std::ostream& os,
103                          TopologyCoordinator::PrepareFreezeResponseResult result) {
104     switch (result) {
105         case TopologyCoordinator::PrepareFreezeResponseResult::kNoAction:
106             return os << "no action";
107         case TopologyCoordinator::PrepareFreezeResponseResult::kSingleNodeSelfElect:
108             return os << "single node self elect";
109     }
110     MONGO_UNREACHABLE;
111 }
112 
113 namespace {
114 template <typename T>
indexOfIterator(const std::vector<T> & vec,typename std::vector<T>::const_iterator & it)115 int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) {
116     return static_cast<int>(it - vec.begin());
117 }
118 
119 /**
120  * Returns true if the only up heartbeats are auth errors.
121  */
_hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData> & hbdata,const int selfIndex)122 bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData>& hbdata, const int selfIndex) {
123     bool foundAuthError = false;
124     for (std::vector<MemberData>::const_iterator it = hbdata.begin(); it != hbdata.end(); ++it) {
125         if (indexOfIterator(hbdata, it) == selfIndex) {
126             continue;
127         }
128 
129         if (it->up()) {
130             return false;
131         }
132 
133         if (it->hasAuthIssue()) {
134             foundAuthError = true;
135         }
136     }
137 
138     return foundAuthError;
139 }
140 
appendOpTime(BSONObjBuilder * bob,const char * elemName,const OpTime & opTime,const long long pv)141 void appendOpTime(BSONObjBuilder* bob,
142                   const char* elemName,
143                   const OpTime& opTime,
144                   const long long pv) {
145     if (pv == 1) {
146         opTime.append(bob, elemName);
147     } else {
148         bob->append(elemName, opTime.getTimestamp());
149     }
150 }
151 }  // namespace
152 
start(Date_t now)153 void TopologyCoordinator::PingStats::start(Date_t now) {
154     _lastHeartbeatStartDate = now;
155     _numFailuresSinceLastStart = 0;
156     _state = HeartbeatState::TRYING;
157 }
158 
hit(Milliseconds millis)159 void TopologyCoordinator::PingStats::hit(Milliseconds millis) {
160     _state = HeartbeatState::SUCCEEDED;
161     ++hitCount;
162 
163     averagePingTimeMs = averagePingTimeMs == UninitializedPingTime
164         ? millis
165         : Milliseconds((averagePingTimeMs * 4 + millis) / 5);
166 }
167 
miss()168 void TopologyCoordinator::PingStats::miss() {
169     ++_numFailuresSinceLastStart;
170     // Transition to 'FAILED' state if this was our last retry.
171     if (_numFailuresSinceLastStart > kMaxHeartbeatRetries) {
172         _state = PingStats::HeartbeatState::FAILED;
173     }
174 }
175 
TopologyCoordinator(Options options)176 TopologyCoordinator::TopologyCoordinator(Options options)
177     : _role(Role::kFollower),
178       _term(OpTime::kUninitializedTerm),
179       _currentPrimaryIndex(-1),
180       _forceSyncSourceIndex(-1),
181       _options(std::move(options)),
182       _selfIndex(-1),
183       _maintenanceModeCalls(0),
184       _followerMode(MemberState::RS_STARTUP2) {
185     invariant(getMemberState() == MemberState::RS_STARTUP);
186     // Need an entry for self in the memberHearbeatData.
187     _memberData.emplace_back();
188     _memberData.back().setIsSelf(true);
189 }
190 
getRole() const191 TopologyCoordinator::Role TopologyCoordinator::getRole() const {
192     return _role;
193 }
194 
setForceSyncSourceIndex(int index)195 void TopologyCoordinator::setForceSyncSourceIndex(int index) {
196     invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
197     _forceSyncSourceIndex = index;
198 }
199 
getSyncSourceAddress() const200 HostAndPort TopologyCoordinator::getSyncSourceAddress() const {
201     return _syncSource;
202 }
203 
chooseNewSyncSource(Date_t now,const OpTime & lastOpTimeFetched,ChainingPreference chainingPreference)204 HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
205                                                      const OpTime& lastOpTimeFetched,
206                                                      ChainingPreference chainingPreference) {
207     // If we are not a member of the current replica set configuration, no sync source is valid.
208     if (_selfIndex == -1) {
209         LOG(1) << "Cannot sync from any members because we are not in the replica set config";
210         return HostAndPort();
211     }
212 
213     MONGO_FAIL_POINT_BLOCK(forceSyncSourceCandidate, customArgs) {
214         const auto& data = customArgs.getData();
215         const auto hostAndPortElem = data["hostAndPort"];
216         if (!hostAndPortElem) {
217             severe() << "'forceSyncSoureCandidate' parameter set with invalid host and port: "
218                      << data;
219             fassertFailed(50835);
220         }
221 
222         const auto hostAndPort = HostAndPort(hostAndPortElem.checkAndGetStringData());
223         const int syncSourceIndex = _rsConfig.findMemberIndexByHostAndPort(hostAndPort);
224         if (syncSourceIndex < 0) {
225             log() << "'forceSyncSourceCandidate' failed due to host and port not in "
226                      "replica set config: "
227                   << hostAndPort.toString();
228             fassertFailed(50836);
229         }
230 
231 
232         if (_memberIsBlacklisted(_rsConfig.getMemberAt(syncSourceIndex), now)) {
233             log() << "Cannot select a sync source because forced candidate is blacklisted: "
234                   << hostAndPort.toString();
235             _syncSource = HostAndPort();
236             return _syncSource;
237         }
238 
239         _syncSource = _rsConfig.getMemberAt(syncSourceIndex).getHostAndPort();
240         log() << "choosing sync source candidate due to 'forceSyncSourceCandidate' parameter: "
241               << _syncSource;
242         std::string msg(str::stream() << "syncing from: " << _syncSource.toString()
243                                       << " by 'forceSyncSourceCandidate' parameter");
244         setMyHeartbeatMessage(now, msg);
245         return _syncSource;
246     }
247 
248     // if we have a target we've requested to sync from, use it
249     if (_forceSyncSourceIndex != -1) {
250         invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
251         _syncSource = _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
252         _forceSyncSourceIndex = -1;
253         log() << "choosing sync source candidate by request: " << _syncSource;
254         std::string msg(str::stream() << "syncing from: " << _syncSource.toString()
255                                       << " by request");
256         setMyHeartbeatMessage(now, msg);
257         return _syncSource;
258     }
259 
260     // wait for 2N pings (not counting ourselves) before choosing a sync target
261     int needMorePings = (_memberData.size() - 1) * 2 - _getTotalPings();
262 
263     if (needMorePings > 0) {
264         OCCASIONALLY log() << "waiting for " << needMorePings
265                            << " pings from other members before syncing";
266         _syncSource = HostAndPort();
267         return _syncSource;
268     }
269 
270     // If we are only allowed to sync from the primary, set that
271     if (chainingPreference == ChainingPreference::kUseConfiguration &&
272         !_rsConfig.isChainingAllowed()) {
273         if (_currentPrimaryIndex == -1) {
274             LOG(1) << "Cannot select a sync source because chaining is"
275                       " not allowed and primary is unknown/down";
276             _syncSource = HostAndPort();
277             return _syncSource;
278         } else if (_memberIsBlacklisted(*_currentPrimaryMember(), now)) {
279             LOG(1) << "Cannot select a sync source because chaining is not allowed and primary "
280                       "member is blacklisted: "
281                    << _currentPrimaryMember()->getHostAndPort();
282             _syncSource = HostAndPort();
283             return _syncSource;
284         } else if (_currentPrimaryIndex == _selfIndex) {
285             LOG(1)
286                 << "Cannot select a sync source because chaining is not allowed and we are primary";
287             _syncSource = HostAndPort();
288             return _syncSource;
289         } else {
290             _syncSource = _currentPrimaryMember()->getHostAndPort();
291             log() << "chaining not allowed, choosing primary as sync source candidate: "
292                   << _syncSource;
293             std::string msg(str::stream() << "syncing from primary: " << _syncSource.toString());
294             setMyHeartbeatMessage(now, msg);
295             return _syncSource;
296         }
297     }
298 
299     // find the member with the lowest ping time that is ahead of me
300 
301     // choose a time that will exclude no candidates by default, in case we don't see a primary
302     OpTime oldestSyncOpTime;
303 
304     // Find primary's oplog time. Reject sync candidates that are more than
305     // _options.maxSyncSourceLagSecs seconds behind.
306     if (_currentPrimaryIndex != -1) {
307         OpTime primaryOpTime = _memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime();
308 
309         // Check if primaryOpTime is still close to 0 because we haven't received
310         // our first heartbeat from a new primary yet.
311         unsigned int maxLag =
312             static_cast<unsigned int>(durationCount<Seconds>(_options.maxSyncSourceLagSecs));
313         if (primaryOpTime.getSecs() >= maxLag) {
314             oldestSyncOpTime =
315                 OpTime(Timestamp(primaryOpTime.getSecs() - maxLag, 0), primaryOpTime.getTerm());
316         }
317     }
318 
319     int closestIndex = -1;
320 
321     // Make two attempts, with less restrictive rules the second time.
322     //
323     // During the first attempt, we ignore those nodes that have a larger slave
324     // delay, hidden nodes or non-voting, and nodes that are excessively behind.
325     //
326     // For the second attempt include those nodes, in case those are the only ones we can reach.
327     //
328     // This loop attempts to set 'closestIndex', to select a viable candidate.
329     for (int attempts = 0; attempts < 2; ++attempts) {
330         for (std::vector<MemberData>::const_iterator it = _memberData.begin();
331              it != _memberData.end();
332              ++it) {
333             const int itIndex = indexOfIterator(_memberData, it);
334             // Don't consider ourselves.
335             if (itIndex == _selfIndex) {
336                 continue;
337             }
338 
339             const MemberConfig& itMemberConfig(_rsConfig.getMemberAt(itIndex));
340 
341             // Candidate must be up to be considered.
342             if (!it->up()) {
343                 LOG(2) << "Cannot select sync source because it is not up: "
344                        << itMemberConfig.getHostAndPort();
345                 continue;
346             }
347             // Candidate must be PRIMARY or SECONDARY state to be considered.
348             if (!it->getState().readable()) {
349                 LOG(2) << "Cannot select sync source because it is not readable: "
350                        << itMemberConfig.getHostAndPort();
351                 continue;
352             }
353 
354             // On the first attempt, we skip candidates that do not match these criteria.
355             if (attempts == 0) {
356                 // Candidate must be a voter if we are a voter.
357                 if (_selfConfig().isVoter() && !itMemberConfig.isVoter()) {
358                     LOG(2) << "Cannot select sync source because we are a voter and it is not: "
359                            << itMemberConfig.getHostAndPort();
360                     continue;
361                 }
362                 // Candidates must not be hidden.
363                 if (itMemberConfig.isHidden()) {
364                     LOG(2) << "Cannot select sync source because it is hidden: "
365                            << itMemberConfig.getHostAndPort();
366                     continue;
367                 }
368                 // Candidates cannot be excessively behind.
369                 if (it->getHeartbeatAppliedOpTime() < oldestSyncOpTime) {
370                     LOG(2) << "Cannot select sync source because it is too far behind."
371                            << "Latest optime of sync candidate " << itMemberConfig.getHostAndPort()
372                            << ": " << it->getHeartbeatAppliedOpTime()
373                            << ", oldest acceptable optime: " << oldestSyncOpTime;
374                     continue;
375                 }
376                 // Candidate must not have a configured delay larger than ours.
377                 if (_selfConfig().getSlaveDelay() < itMemberConfig.getSlaveDelay()) {
378                     LOG(2) << "Cannot select sync source with larger slaveDelay than ours: "
379                            << itMemberConfig.getHostAndPort();
380                     continue;
381                 }
382             }
383             // Candidate must build indexes if we build indexes, to be considered.
384             if (_selfConfig().shouldBuildIndexes()) {
385                 if (!itMemberConfig.shouldBuildIndexes()) {
386                     LOG(2) << "Cannot select sync source with shouldBuildIndex differences: "
387                            << itMemberConfig.getHostAndPort();
388                     continue;
389                 }
390             }
391             // only consider candidates that are ahead of where we are
392             if (it->getHeartbeatAppliedOpTime() <= lastOpTimeFetched) {
393                 LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. "
394                        << "My last fetched oplog optime: " << lastOpTimeFetched.toBSON()
395                        << ", latest oplog optime of sync candidate "
396                        << itMemberConfig.getHostAndPort() << ": "
397                        << it->getHeartbeatAppliedOpTime().toBSON();
398                 continue;
399             }
400             // Candidate cannot be more latent than anything we've already considered.
401             if ((closestIndex != -1) &&
402                 (_getPing(itMemberConfig.getHostAndPort()) >
403                  _getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()))) {
404                 LOG(2) << "Cannot select sync source with higher latency than the best candidate: "
405                        << itMemberConfig.getHostAndPort();
406 
407                 continue;
408             }
409             // Candidate cannot be blacklisted.
410             if (_memberIsBlacklisted(itMemberConfig, now)) {
411                 LOG(1) << "Cannot select sync source which is blacklisted: "
412                        << itMemberConfig.getHostAndPort();
413 
414                 continue;
415             }
416             // This candidate has passed all tests; set 'closestIndex'
417             closestIndex = itIndex;
418         }
419         if (closestIndex != -1)
420             break;  // no need for second attempt
421     }
422 
423     if (closestIndex == -1) {
424         // Did not find any members to sync from
425         std::string msg("could not find member to sync from");
426         // Only log when we had a valid sync source before
427         if (!_syncSource.empty()) {
428             log() << msg << rsLog;
429         }
430         setMyHeartbeatMessage(now, msg);
431 
432         _syncSource = HostAndPort();
433         return _syncSource;
434     }
435     _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
436     log() << "sync source candidate: " << _syncSource;
437     std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0);
438     setMyHeartbeatMessage(now, msg);
439     return _syncSource;
440 }
441 
_memberIsBlacklisted(const MemberConfig & memberConfig,Date_t now) const442 bool TopologyCoordinator::_memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const {
443     std::map<HostAndPort, Date_t>::const_iterator blacklisted =
444         _syncSourceBlacklist.find(memberConfig.getHostAndPort());
445     if (blacklisted != _syncSourceBlacklist.end()) {
446         if (blacklisted->second > now) {
447             return true;
448         }
449     }
450     return false;
451 }
452 
blacklistSyncSource(const HostAndPort & host,Date_t until)453 void TopologyCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) {
454     LOG(2) << "blacklisting " << host << " until " << until.toString();
455     _syncSourceBlacklist[host] = until;
456 }
457 
unblacklistSyncSource(const HostAndPort & host,Date_t now)458 void TopologyCoordinator::unblacklistSyncSource(const HostAndPort& host, Date_t now) {
459     std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host);
460     if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) {
461         LOG(2) << "unblacklisting " << host;
462         _syncSourceBlacklist.erase(hostItr);
463     }
464 }
465 
clearSyncSourceBlacklist()466 void TopologyCoordinator::clearSyncSourceBlacklist() {
467     _syncSourceBlacklist.clear();
468 }
469 
prepareSyncFromResponse(const HostAndPort & target,BSONObjBuilder * response,Status * result)470 void TopologyCoordinator::prepareSyncFromResponse(const HostAndPort& target,
471                                                   BSONObjBuilder* response,
472                                                   Status* result) {
473     response->append("syncFromRequested", target.toString());
474 
475     if (_selfIndex == -1) {
476         *result = Status(ErrorCodes::NotSecondary, "Removed and uninitialized nodes do not sync");
477         return;
478     }
479 
480     const MemberConfig& selfConfig = _selfConfig();
481     if (selfConfig.isArbiter()) {
482         *result = Status(ErrorCodes::NotSecondary, "arbiters don't sync");
483         return;
484     }
485     if (_selfIndex == _currentPrimaryIndex) {
486         *result = Status(ErrorCodes::NotSecondary, "primaries don't sync");
487         return;
488     }
489 
490     ReplSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd();
491     int targetIndex = 0;
492     for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
493          ++it) {
494         if (it->getHostAndPort() == target) {
495             targetConfig = it;
496             break;
497         }
498         ++targetIndex;
499     }
500     if (targetConfig == _rsConfig.membersEnd()) {
501         *result = Status(ErrorCodes::NodeNotFound,
502                          str::stream() << "Could not find member \"" << target.toString()
503                                        << "\" in replica set");
504         return;
505     }
506     if (targetIndex == _selfIndex) {
507         *result = Status(ErrorCodes::InvalidOptions, "I cannot sync from myself");
508         return;
509     }
510     if (targetConfig->isArbiter()) {
511         *result = Status(ErrorCodes::InvalidOptions,
512                          str::stream() << "Cannot sync from \"" << target.toString()
513                                        << "\" because it is an arbiter");
514         return;
515     }
516     if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) {
517         *result = Status(ErrorCodes::InvalidOptions,
518                          str::stream() << "Cannot sync from \"" << target.toString()
519                                        << "\" because it does not build indexes");
520         return;
521     }
522 
523     if (selfConfig.isVoter() && !targetConfig->isVoter()) {
524         *result = Status(ErrorCodes::InvalidOptions,
525                          str::stream() << "Cannot sync from \"" << target.toString()
526                                        << "\" because it is not a voter");
527         return;
528     }
529 
530     const MemberData& hbdata = _memberData.at(targetIndex);
531     if (hbdata.hasAuthIssue()) {
532         *result =
533             Status(ErrorCodes::Unauthorized,
534                    str::stream() << "not authorized to communicate with " << target.toString());
535         return;
536     }
537     if (hbdata.getHealth() == 0) {
538         *result =
539             Status(ErrorCodes::HostUnreachable,
540                    str::stream() << "I cannot reach the requested member: " << target.toString());
541         return;
542     }
543     const OpTime lastOpApplied = getMyLastAppliedOpTime();
544     if (hbdata.getHeartbeatAppliedOpTime().getSecs() + 10 < lastOpApplied.getSecs()) {
545         warning() << "attempting to sync from " << target << ", but its latest opTime is "
546                   << hbdata.getHeartbeatAppliedOpTime().getSecs() << " and ours is "
547                   << lastOpApplied.getSecs() << " so this may not work";
548         response->append("warning",
549                          str::stream() << "requested member \"" << target.toString()
550                                        << "\" is more than 10 seconds behind us");
551         // not returning bad Status, just warning
552     }
553 
554     HostAndPort prevSyncSource = getSyncSourceAddress();
555     if (!prevSyncSource.empty()) {
556         response->append("prevSyncTarget", prevSyncSource.toString());
557     }
558 
559     setForceSyncSourceIndex(targetIndex);
560     *result = Status::OK();
561 }
562 
prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs & args,const Date_t now,BSONObjBuilder * response,Status * result)563 void TopologyCoordinator::prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
564                                                const Date_t now,
565                                                BSONObjBuilder* response,
566                                                Status* result) {
567     if (_rsConfig.getProtocolVersion() != 0) {
568         *result = Status(ErrorCodes::BadValue,
569                          str::stream() << "replset: incompatible replset protocol version: "
570                                        << _rsConfig.getProtocolVersion());
571         return;
572     }
573 
574     if (_selfIndex == -1) {
575         *result = Status(ErrorCodes::ReplicaSetNotFound,
576                          "Cannot participate in elections because not initialized");
577         return;
578     }
579 
580     if (args.setName != _rsConfig.getReplSetName()) {
581         *result =
582             Status(ErrorCodes::ReplicaSetNotFound,
583                    str::stream() << "Wrong repl set name. Expected: " << _rsConfig.getReplSetName()
584                                  << ", received: "
585                                  << args.setName);
586         return;
587     }
588 
589     if (args.id == static_cast<unsigned>(_selfConfig().getId())) {
590         *result = Status(ErrorCodes::BadValue,
591                          str::stream() << "Received replSetFresh command from member with the "
592                                           "same member ID as ourself: "
593                                        << args.id);
594         return;
595     }
596 
597     bool weAreFresher = false;
598     const OpTime lastOpApplied = getMyLastAppliedOpTime();
599     if (_rsConfig.getConfigVersion() > args.cfgver) {
600         log() << "replSet member " << args.who << " is not yet aware its cfg version "
601               << args.cfgver << " is stale";
602         response->append("info", "config version stale");
603         weAreFresher = true;
604     }
605     // check not only our own optime, but any other member we can reach
606     else if (OpTime(args.opTime, _term) < _latestKnownOpTime()) {
607         weAreFresher = true;
608     }
609     response->appendDate("opTime",
610                          Date_t::fromMillisSinceEpoch(lastOpApplied.getTimestamp().asLL()));
611     response->append("fresher", weAreFresher);
612 
613     std::string errmsg;
614     bool doVeto = _shouldVetoMember(args, now, &errmsg);
615     response->append("veto", doVeto);
616     if (doVeto) {
617         response->append("errmsg", errmsg);
618     }
619     *result = Status::OK();
620 }
621 
_shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs & args,const Date_t & now,std::string * errmsg) const622 bool TopologyCoordinator::_shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args,
623                                             const Date_t& now,
624                                             std::string* errmsg) const {
625     if (_rsConfig.getConfigVersion() < args.cfgver) {
626         // We are stale; do not veto.
627         return false;
628     }
629 
630     const unsigned int memberID = args.id;
631     const int hopefulIndex = _getMemberIndex(memberID);
632     invariant(hopefulIndex != _selfIndex);
633     const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
634 
635     if (hopefulIndex == -1) {
636         *errmsg = str::stream() << "replSet couldn't find member with id " << memberID;
637         return true;
638     }
639     const OpTime lastOpApplied = getMyLastAppliedOpTime();
640     if (_iAmPrimary() &&
641         lastOpApplied >= _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime()) {
642         // hbinfo is not updated for ourself, so if we are primary we have to check the
643         // primary's last optime separately
644         *errmsg = str::stream() << "I am already primary, "
645                                 << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
646                                 << " can try again once I've stepped down";
647         return true;
648     }
649 
650     if (_currentPrimaryIndex != -1 && (hopefulIndex != _currentPrimaryIndex) &&
651         (_memberData.at(_currentPrimaryIndex).getHeartbeatAppliedOpTime() >=
652          _memberData.at(hopefulIndex).getHeartbeatAppliedOpTime())) {
653         // other members might be aware of more up-to-date nodes
654         *errmsg =
655             str::stream() << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
656                           << " is trying to elect itself but "
657                           << _rsConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString()
658                           << " is already primary and more up-to-date";
659         return true;
660     }
661 
662     if ((highestPriorityIndex != -1)) {
663         const MemberConfig& hopefulMember = _rsConfig.getMemberAt(hopefulIndex);
664         const MemberConfig& priorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
665 
666         if (priorityMember.getPriority() > hopefulMember.getPriority()) {
667             *errmsg = str::stream() << hopefulMember.getHostAndPort().toString()
668                                     << " has lower priority of " << hopefulMember.getPriority()
669                                     << " than " << priorityMember.getHostAndPort().toString()
670                                     << " which has a priority of " << priorityMember.getPriority();
671             return true;
672         }
673     }
674 
675     UnelectableReasonMask reason = _getUnelectableReason(hopefulIndex);
676     reason &= ~RefusesToStand;
677     if (reason) {
678         *errmsg = str::stream() << "I don't think "
679                                 << _rsConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
680                                 << " is electable because the "
681                                 << _getUnelectableReasonString(reason);
682         return true;
683     }
684 
685     return false;
686 }
687 
688 // produce a reply to a received electCmd
prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs & args,const Date_t now,BSONObjBuilder * response,Status * result)689 void TopologyCoordinator::prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
690                                                const Date_t now,
691                                                BSONObjBuilder* response,
692                                                Status* result) {
693     if (_rsConfig.getProtocolVersion() != 0) {
694         *result = Status(ErrorCodes::BadValue,
695                          str::stream() << "replset: incompatible replset protocol version: "
696                                        << _rsConfig.getProtocolVersion());
697         return;
698     }
699     if (_selfIndex == -1) {
700         *result = Status(ErrorCodes::ReplicaSetNotFound,
701                          "Cannot participate in election because not initialized");
702         return;
703     }
704 
705     const long long myver = _rsConfig.getConfigVersion();
706     const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
707 
708     const MemberConfig* primary = _currentPrimaryMember();
709     const MemberConfig* hopeful = _rsConfig.findMemberByID(args.whoid);
710     const MemberConfig* highestPriority =
711         highestPriorityIndex == -1 ? NULL : &_rsConfig.getMemberAt(highestPriorityIndex);
712 
713     int vote = 0;
714     if (args.set != _rsConfig.getReplSetName()) {
715         log() << "replSet error received an elect request for '" << args.set
716               << "' but our set name is '" << _rsConfig.getReplSetName() << "'";
717     } else if (myver < args.cfgver) {
718         // we are stale.  don't vote
719         log() << "replSetElect not voting because our config version is stale. Our version: "
720               << myver << ", their version: " << args.cfgver;
721     } else if (myver > args.cfgver) {
722         // they are stale!
723         log() << "replSetElect command received stale config version # during election. "
724                  "Our version: "
725               << myver << ", their version: " << args.cfgver;
726         vote = -10000;
727     } else if (!hopeful) {
728         log() << "replSetElect couldn't find member with id " << args.whoid;
729         vote = -10000;
730     } else if (_iAmPrimary()) {
731         log() << "I am already primary, " << hopeful->getHostAndPort().toString()
732               << " can try again once I've stepped down";
733         vote = -10000;
734     } else if (primary) {
735         log() << hopeful->getHostAndPort().toString() << " is trying to elect itself but "
736               << primary->getHostAndPort().toString() << " is already primary";
737         vote = -10000;
738     } else if (highestPriority && highestPriority->getPriority() > hopeful->getPriority()) {
739         // TODO(spencer): What if the lower-priority member is more up-to-date?
740         log() << hopeful->getHostAndPort().toString() << " has lower priority than "
741               << highestPriority->getHostAndPort().toString();
742         vote = -10000;
743     } else if (_voteLease.when + VoteLease::leaseTime >= now && _voteLease.whoId != args.whoid) {
744         log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for "
745               << _voteLease.whoHostAndPort.toString() << ' '
746               << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
747     } else {
748         _voteLease.when = now;
749         _voteLease.whoId = args.whoid;
750         _voteLease.whoHostAndPort = hopeful->getHostAndPort();
751         vote = _selfConfig().getNumVotes();
752         invariant(hopeful->getId() == args.whoid);
753         if (vote > 0) {
754             log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " ("
755                   << args.whoid << ')';
756         }
757     }
758 
759     response->append("vote", vote);
760     response->append("round", args.round);
761     *result = Status::OK();
762 }
763 
764 // produce a reply to a heartbeat
prepareHeartbeatResponse(Date_t now,const ReplSetHeartbeatArgs & args,const std::string & ourSetName,ReplSetHeartbeatResponse * response)765 Status TopologyCoordinator::prepareHeartbeatResponse(Date_t now,
766                                                      const ReplSetHeartbeatArgs& args,
767                                                      const std::string& ourSetName,
768                                                      ReplSetHeartbeatResponse* response) {
769     if (args.getProtocolVersion() != 1) {
770         return Status(ErrorCodes::BadValue,
771                       str::stream() << "replset: incompatible replset protocol version: "
772                                     << args.getProtocolVersion());
773     }
774 
775     // Verify that replica set names match
776     const std::string rshb = args.getSetName();
777     if (ourSetName != rshb) {
778         log() << "replSet set names do not match, ours: " << ourSetName
779               << "; remote node's: " << rshb;
780         response->noteMismatched();
781         return Status(ErrorCodes::InconsistentReplicaSetNames,
782                       str::stream() << "Our set name of " << ourSetName << " does not match name "
783                                     << rshb
784                                     << " reported by remote node");
785     }
786 
787     const MemberState myState = getMemberState();
788     if (_selfIndex == -1) {
789         if (myState.removed()) {
790             return Status(ErrorCodes::InvalidReplicaSetConfig,
791                           "Our replica set configuration is invalid or does not include us");
792         }
793     } else {
794         invariant(_rsConfig.getReplSetName() == args.getSetName());
795         if (args.getSenderId() == _selfConfig().getId()) {
796             return Status(ErrorCodes::BadValue,
797                           str::stream() << "Received heartbeat from member with the same "
798                                            "member ID as ourself: "
799                                         << args.getSenderId());
800         }
801     }
802 
803     // This is a replica set
804     response->noteReplSet();
805 
806     response->setSetName(ourSetName);
807     response->setState(myState.s);
808     if (myState.primary()) {
809         response->setElectionTime(_electionTime);
810     }
811 
812     const OpTime lastOpApplied = getMyLastAppliedOpTime();
813     const OpTime lastOpDurable = getMyLastDurableOpTime();
814 
815     // Are we electable
816     response->setElectable(!_getMyUnelectableReason(now, StartElectionReason::kElectionTimeout));
817 
818     // Heartbeat status message
819     response->setHbMsg(_getHbmsg(now));
820     response->setTime(duration_cast<Seconds>(now - Date_t{}));
821     response->setAppliedOpTime(lastOpApplied);
822     response->setDurableOpTime(lastOpDurable);
823 
824     if (!_syncSource.empty()) {
825         response->setSyncingTo(_syncSource);
826     }
827 
828     if (!_rsConfig.isInitialized()) {
829         response->setConfigVersion(-2);
830         return Status::OK();
831     }
832 
833     const long long v = _rsConfig.getConfigVersion();
834     response->setConfigVersion(v);
835     // Deliver new config if caller's version is older than ours
836     if (v > args.getConfigVersion()) {
837         response->setConfig(_rsConfig);
838     }
839 
840     // Resolve the caller's id in our Member list
841     int from = -1;
842     if (v == args.getConfigVersion() && args.getSenderId() != -1) {
843         from = _getMemberIndex(args.getSenderId());
844     }
845     if (from == -1) {
846         // Can't find the member, so we leave out the stateDisagreement field
847         return Status::OK();
848     }
849     invariant(from != _selfIndex);
850 
851     // if we thought that this node is down, let it know
852     if (!_memberData.at(from).up()) {
853         response->noteStateDisagreement();
854     }
855 
856     // note that we got a heartbeat from this node
857     _memberData.at(from).setLastHeartbeatRecv(now);
858     return Status::OK();
859 }
860 
prepareHeartbeatResponseV1(Date_t now,const ReplSetHeartbeatArgsV1 & args,const std::string & ourSetName,ReplSetHeartbeatResponse * response)861 Status TopologyCoordinator::prepareHeartbeatResponseV1(Date_t now,
862                                                        const ReplSetHeartbeatArgsV1& args,
863                                                        const std::string& ourSetName,
864                                                        ReplSetHeartbeatResponse* response) {
865     // Verify that replica set names match
866     const std::string rshb = args.getSetName();
867     if (ourSetName != rshb) {
868         log() << "replSet set names do not match, ours: " << ourSetName
869               << "; remote node's: " << rshb;
870         return Status(ErrorCodes::InconsistentReplicaSetNames,
871                       str::stream() << "Our set name of " << ourSetName << " does not match name "
872                                     << rshb
873                                     << " reported by remote node");
874     }
875 
876     const MemberState myState = getMemberState();
877     if (_selfIndex == -1) {
878         if (myState.removed()) {
879             return Status(ErrorCodes::InvalidReplicaSetConfig,
880                           "Our replica set configuration is invalid or does not include us");
881         }
882     } else {
883         if (args.getSenderId() == _selfConfig().getId()) {
884             return Status(ErrorCodes::BadValue,
885                           str::stream() << "Received heartbeat from member with the same "
886                                            "member ID as ourself: "
887                                         << args.getSenderId());
888         }
889     }
890 
891     response->setSetName(ourSetName);
892 
893     response->setState(myState.s);
894 
895     if (myState.primary()) {
896         response->setElectionTime(_electionTime);
897     }
898 
899     const OpTime lastOpApplied = getMyLastAppliedOpTime();
900     const OpTime lastOpDurable = getMyLastDurableOpTime();
901     response->setAppliedOpTime(lastOpApplied);
902     response->setDurableOpTime(lastOpDurable);
903 
904     if (_currentPrimaryIndex != -1) {
905         response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId());
906     }
907 
908     response->setTerm(_term);
909 
910     if (!_syncSource.empty()) {
911         response->setSyncingTo(_syncSource);
912     }
913 
914     if (!_rsConfig.isInitialized()) {
915         response->setConfigVersion(-2);
916         return Status::OK();
917     }
918 
919     const long long v = _rsConfig.getConfigVersion();
920     response->setConfigVersion(v);
921     // Deliver new config if caller's version is older than ours
922     if (v > args.getConfigVersion()) {
923         response->setConfig(_rsConfig);
924     }
925 
926     // Resolve the caller's id in our Member list
927     int from = -1;
928     if (v == args.getConfigVersion() && args.getSenderId() != -1) {
929         from = _getMemberIndex(args.getSenderId());
930     }
931     if (from == -1) {
932         return Status::OK();
933     }
934     invariant(from != _selfIndex);
935 
936     // note that we got a heartbeat from this node
937     _memberData.at(from).setLastHeartbeatRecv(now);
938     return Status::OK();
939 }
940 
_getMemberIndex(int id) const941 int TopologyCoordinator::_getMemberIndex(int id) const {
942     int index = 0;
943     for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
944          ++it, ++index) {
945         if (it->getId() == id) {
946             return index;
947         }
948     }
949     return -1;
950 }
951 
prepareHeartbeatRequest(Date_t now,const std::string & ourSetName,const HostAndPort & target)952 std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinator::prepareHeartbeatRequest(
953     Date_t now, const std::string& ourSetName, const HostAndPort& target) {
954     PingStats& hbStats = _pings[target];
955     Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
956     if (!_rsConfig.isInitialized() || !hbStats.trying() ||
957         (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
958         // This is either the first request ever for "target", or the heartbeat timeout has
959         // passed, so we're starting a "new" heartbeat.
960         hbStats.start(now);
961         alreadyElapsed = Milliseconds(0);
962     }
963     ReplSetHeartbeatArgs hbArgs;
964     hbArgs.setProtocolVersion(1);
965     hbArgs.setCheckEmpty(false);
966     if (_rsConfig.isInitialized()) {
967         hbArgs.setSetName(_rsConfig.getReplSetName());
968         hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
969         if (_selfIndex >= 0) {
970             const MemberConfig& me = _selfConfig();
971             hbArgs.setSenderHost(me.getHostAndPort());
972             hbArgs.setSenderId(me.getId());
973         }
974     } else {
975         hbArgs.setSetName(ourSetName);
976         hbArgs.setConfigVersion(-2);
977     }
978     if (serverGlobalParams.featureCompatibility.getVersion() !=
979         ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) {
980         hbArgs.setHeartbeatVersion(1);
981     }
982 
983     const Milliseconds timeoutPeriod(
984         _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
985                                   : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod});
986     const Milliseconds timeout = timeoutPeriod - alreadyElapsed;
987     return std::make_pair(hbArgs, timeout);
988 }
989 
prepareHeartbeatRequestV1(Date_t now,const std::string & ourSetName,const HostAndPort & target)990 std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinator::prepareHeartbeatRequestV1(
991     Date_t now, const std::string& ourSetName, const HostAndPort& target) {
992     PingStats& hbStats = _pings[target];
993     Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64());
994     if ((!_rsConfig.isInitialized()) || !hbStats.trying() ||
995         (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriodMillis())) {
996         // This is either the first request ever for "target", or the heartbeat timeout has
997         // passed, so we're starting a "new" heartbeat.
998         hbStats.start(now);
999         alreadyElapsed = Milliseconds(0);
1000     }
1001     ReplSetHeartbeatArgsV1 hbArgs;
1002     if (_rsConfig.isInitialized()) {
1003         hbArgs.setSetName(_rsConfig.getReplSetName());
1004         hbArgs.setConfigVersion(_rsConfig.getConfigVersion());
1005         if (_selfIndex >= 0) {
1006             const MemberConfig& me = _selfConfig();
1007             hbArgs.setSenderId(me.getId());
1008             hbArgs.setSenderHost(me.getHostAndPort());
1009         }
1010         hbArgs.setTerm(_term);
1011     } else {
1012         hbArgs.setSetName(ourSetName);
1013         // Config version -2 is for uninitialized config.
1014         hbArgs.setConfigVersion(-2);
1015         hbArgs.setTerm(OpTime::kInitialTerm);
1016     }
1017     if (serverGlobalParams.featureCompatibility.getVersion() !=
1018         ServerGlobalParams::FeatureCompatibility::Version::kFullyDowngradedTo34) {
1019         hbArgs.setHeartbeatVersion(1);
1020     }
1021 
1022     const Milliseconds timeoutPeriod(
1023         _rsConfig.isInitialized() ? _rsConfig.getHeartbeatTimeoutPeriodMillis()
1024                                   : Milliseconds{ReplSetConfig::kDefaultHeartbeatTimeoutPeriod});
1025     const Milliseconds timeout(timeoutPeriod - alreadyElapsed);
1026     return std::make_pair(hbArgs, timeout);
1027 }
1028 
processHeartbeatResponse(Date_t now,Milliseconds networkRoundTripTime,const HostAndPort & target,const StatusWith<ReplSetHeartbeatResponse> & hbResponse)1029 HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
1030     Date_t now,
1031     Milliseconds networkRoundTripTime,
1032     const HostAndPort& target,
1033     const StatusWith<ReplSetHeartbeatResponse>& hbResponse) {
1034     const MemberState originalState = getMemberState();
1035     PingStats& hbStats = _pings[target];
1036     invariant(hbStats.getLastHeartbeatStartDate() != Date_t());
1037     const bool isUnauthorized = (hbResponse.getStatus().code() == ErrorCodes::Unauthorized) ||
1038         (hbResponse.getStatus().code() == ErrorCodes::AuthenticationFailed);
1039     if (!hbResponse.isOK()) {
1040         if (isUnauthorized) {
1041             hbStats.hit(networkRoundTripTime);
1042         } else {
1043             hbStats.miss();
1044         }
1045     } else {
1046         hbStats.hit(networkRoundTripTime);
1047         // Log diagnostics.
1048         if (hbResponse.getValue().isStateDisagreement()) {
1049             LOG(1) << target << " thinks that we are down because they cannot send us heartbeats.";
1050         }
1051     }
1052 
1053     // If a node is not PRIMARY and has no sync source, we increase the heartbeat rate in order
1054     // to help it find a sync source more quickly, which helps ensure the PRIMARY will continue to
1055     // see the majority of the cluster.
1056     //
1057     // Arbiters also decrease their heartbeat interval to at most half the election timeout period.
1058     Milliseconds heartbeatInterval = _rsConfig.getHeartbeatInterval();
1059     if (_rsConfig.getProtocolVersion() == 1) {
1060         if (getMemberState().arbiter()) {
1061             heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2,
1062                                          _rsConfig.getHeartbeatInterval());
1063         } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) {
1064             heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2,
1065                                          _rsConfig.getHeartbeatInterval() / 4);
1066         }
1067     }
1068 
1069     const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
1070     Date_t nextHeartbeatStartDate;
1071     // Determine the next heartbeat start time. If a heartbeat has not succeeded or failed, and we
1072     // have not used up the timeout period, we should retry.
1073     if (hbStats.trying() && (alreadyElapsed < _rsConfig.getHeartbeatTimeoutPeriod())) {
1074         // There are still retries left, let's use one.
1075         nextHeartbeatStartDate = now;
1076     } else {
1077         nextHeartbeatStartDate = now + heartbeatInterval;
1078     }
1079 
1080     if (hbResponse.isOK() && hbResponse.getValue().hasConfig()) {
1081         const long long currentConfigVersion =
1082             _rsConfig.isInitialized() ? _rsConfig.getConfigVersion() : -2;
1083         const ReplSetConfig& newConfig = hbResponse.getValue().getConfig();
1084         if (newConfig.getConfigVersion() > currentConfigVersion) {
1085             HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeReconfigAction();
1086             nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1087             return nextAction;
1088         } else {
1089             // Could be we got the newer version before we got the response, or the
1090             // target erroneously sent us one, even through it isn't newer.
1091             if (newConfig.getConfigVersion() < currentConfigVersion) {
1092                 LOG(1) << "Config version from heartbeat was older than ours.";
1093             } else {
1094                 LOG(2) << "Config from heartbeat response was same as ours.";
1095             }
1096             if (logger::globalLogDomain()->shouldLog(MongoLogDefaultComponent_component,
1097                                                      ::mongo::LogstreamBuilder::severityCast(2))) {
1098                 LogstreamBuilder lsb = log();
1099                 if (_rsConfig.isInitialized()) {
1100                     lsb << "Current config: " << _rsConfig.toBSON() << "; ";
1101                 }
1102                 lsb << "Config in heartbeat: " << newConfig.toBSON();
1103             }
1104         }
1105     }
1106 
1107     // Check if the heartbeat target is in our config.  If it isn't, there's nothing left to do,
1108     // so return early.
1109     if (!_rsConfig.isInitialized()) {
1110         HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
1111         nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1112         return nextAction;
1113     }
1114     // If we're not in the config, we don't need to respond to heartbeats.
1115     if (_selfIndex == -1) {
1116         LOG(1) << "Could not find ourself in current config so ignoring heartbeat from " << target
1117                << " -- current config: " << _rsConfig.toBSON();
1118         HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
1119         nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1120         return nextAction;
1121     }
1122     const int memberIndex = _rsConfig.findMemberIndexByHostAndPort(target);
1123     if (memberIndex == -1) {
1124         LOG(1) << "Could not find " << target << " in current config so ignoring --"
1125                                                  " current config: "
1126                << _rsConfig.toBSON();
1127         HeartbeatResponseAction nextAction = HeartbeatResponseAction::makeNoAction();
1128         nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1129         return nextAction;
1130     }
1131 
1132     invariant(memberIndex != _selfIndex);
1133 
1134     MemberData& hbData = _memberData.at(memberIndex);
1135     const MemberConfig member = _rsConfig.getMemberAt(memberIndex);
1136     bool advancedOpTime = false;
1137     if (!hbResponse.isOK()) {
1138         if (isUnauthorized) {
1139             hbData.setAuthIssue(now);
1140         }
1141         // If the heartbeat has failed i.e. used up all retries, then we mark the target node as
1142         // down.
1143         else if (hbStats.failed() || (alreadyElapsed >= _rsConfig.getHeartbeatTimeoutPeriod())) {
1144             hbData.setDownValues(now, hbResponse.getStatus().reason());
1145         } else {
1146             LOG(3) << "Bad heartbeat response from " << target
1147                    << "; trying again; Retries left: " << (hbStats.retriesLeft()) << "; "
1148                    << alreadyElapsed << " have already elapsed";
1149         }
1150     } else {
1151         ReplSetHeartbeatResponse hbr = std::move(hbResponse.getValue());
1152         LOG(3) << "setUpValues: heartbeat response good for member _id:" << member.getId()
1153                << ", msg:  " << hbr.getHbMsg();
1154         advancedOpTime = hbData.setUpValues(now, std::move(hbr));
1155     }
1156 
1157     HeartbeatResponseAction nextAction;
1158     if (_rsConfig.getProtocolVersion() == 0) {
1159         nextAction = _updatePrimaryFromHBData(memberIndex, originalState, now);
1160     } else {
1161         nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now);
1162     }
1163 
1164     nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate);
1165     nextAction.setAdvancedOpTime(advancedOpTime);
1166     return nextAction;
1167 }
1168 
haveNumNodesReachedOpTime(const OpTime & targetOpTime,int numNodes,bool durablyWritten)1169 bool TopologyCoordinator::haveNumNodesReachedOpTime(const OpTime& targetOpTime,
1170                                                     int numNodes,
1171                                                     bool durablyWritten) {
1172     // Replication progress that is for some reason ahead of us should not allow us to
1173     // satisfy a write concern if we aren't caught up ourselves.
1174     OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
1175     if (myOpTime < targetOpTime) {
1176         return false;
1177     }
1178 
1179     for (auto&& memberData : _memberData) {
1180         // The index in the config is -1 for master-slave nodes.
1181         const auto configIndex = memberData.getConfigIndex();
1182         if (configIndex >= 0) {
1183             // We do not count arbiters towards the write concern.
1184             if (_rsConfig.getMemberAt(configIndex).isArbiter()) {
1185                 continue;
1186             }
1187         }
1188 
1189         const OpTime& memberOpTime =
1190             durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime();
1191 
1192         if (memberOpTime >= targetOpTime) {
1193             --numNodes;
1194         }
1195 
1196         if (numNodes <= 0) {
1197             return true;
1198         }
1199     }
1200     return false;
1201 }
1202 
haveTaggedNodesReachedOpTime(const OpTime & opTime,const ReplSetTagPattern & tagPattern,bool durablyWritten)1203 bool TopologyCoordinator::haveTaggedNodesReachedOpTime(const OpTime& opTime,
1204                                                        const ReplSetTagPattern& tagPattern,
1205                                                        bool durablyWritten) {
1206     ReplSetTagMatch matcher(tagPattern);
1207     for (auto&& memberData : _memberData) {
1208         const OpTime& memberOpTime =
1209             durablyWritten ? memberData.getLastDurableOpTime() : memberData.getLastAppliedOpTime();
1210         if (memberOpTime >= opTime) {
1211             // This node has reached the desired optime, now we need to check if it is a part
1212             // of the tagPattern.
1213             int memberIndex = memberData.getConfigIndex();
1214             invariant(memberIndex >= 0);
1215             const MemberConfig& memberConfig = _rsConfig.getMemberAt(memberIndex);
1216             for (MemberConfig::TagIterator it = memberConfig.tagsBegin();
1217                  it != memberConfig.tagsEnd();
1218                  ++it) {
1219                 if (matcher.update(*it)) {
1220                     return true;
1221                 }
1222             }
1223         }
1224     }
1225     return false;
1226 }
1227 
checkMemberTimeouts(Date_t now)1228 HeartbeatResponseAction TopologyCoordinator::checkMemberTimeouts(Date_t now) {
1229     bool stepdown = false;
1230     for (int memberIndex = 0; memberIndex < static_cast<int>(_memberData.size()); memberIndex++) {
1231         auto& memberData = _memberData[memberIndex];
1232         if (!memberData.isSelf() && !memberData.lastUpdateStale() &&
1233             now - memberData.getLastUpdate() >= _rsConfig.getElectionTimeoutPeriod()) {
1234             memberData.markLastUpdateStale();
1235             if (_iAmPrimary()) {
1236                 stepdown = stepdown || setMemberAsDown(now, memberIndex);
1237             }
1238         }
1239     }
1240     if (stepdown) {
1241         log() << "can't see a majority of the set, relinquishing primary";
1242         return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
1243     }
1244     return HeartbeatResponseAction::makeNoAction();
1245 }
1246 
getHostsWrittenTo(const OpTime & op,bool durablyWritten,bool skipSelf)1247 std::vector<HostAndPort> TopologyCoordinator::getHostsWrittenTo(const OpTime& op,
1248                                                                 bool durablyWritten,
1249                                                                 bool skipSelf) {
1250     std::vector<HostAndPort> hosts;
1251     for (const auto& memberData : _memberData) {
1252         if (skipSelf && memberData.isSelf()) {
1253             continue;
1254         }
1255 
1256         if (durablyWritten) {
1257             if (memberData.getLastDurableOpTime() < op) {
1258                 continue;
1259             }
1260         } else if (memberData.getLastAppliedOpTime() < op) {
1261             continue;
1262         }
1263 
1264         hosts.push_back(memberData.getHostAndPort());
1265     }
1266     return hosts;
1267 }
1268 
setMemberAsDown(Date_t now,const int memberIndex)1269 bool TopologyCoordinator::setMemberAsDown(Date_t now, const int memberIndex) {
1270     invariant(memberIndex != _selfIndex);
1271     invariant(memberIndex != -1);
1272     invariant(_currentPrimaryIndex == _selfIndex);
1273     MemberData& hbData = _memberData.at(memberIndex);
1274     hbData.setDownValues(now, "no response within election timeout period");
1275 
1276     if (CannotSeeMajority & _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) {
1277         return true;
1278     }
1279 
1280     return false;
1281 }
1282 
getStalestLiveMember() const1283 std::pair<int, Date_t> TopologyCoordinator::getStalestLiveMember() const {
1284     Date_t earliestDate = Date_t::max();
1285     int earliestMemberId = -1;
1286     for (const auto& memberData : _memberData) {
1287         if (memberData.isSelf()) {
1288             continue;
1289         }
1290         if (memberData.lastUpdateStale()) {
1291             // Already stale.
1292             continue;
1293         }
1294         LOG(3) << "memberData lastupdate is: " << memberData.getLastUpdate();
1295         if (earliestDate > memberData.getLastUpdate()) {
1296             earliestDate = memberData.getLastUpdate();
1297             earliestMemberId = memberData.getMemberId();
1298         }
1299     }
1300     LOG(3) << "stalest member " << earliestMemberId << " date: " << earliestDate;
1301     return std::make_pair(earliestMemberId, earliestDate);
1302 }
1303 
resetAllMemberTimeouts(Date_t now)1304 void TopologyCoordinator::resetAllMemberTimeouts(Date_t now) {
1305     for (auto&& memberData : _memberData)
1306         memberData.updateLiveness(now);
1307 }
1308 
resetMemberTimeouts(Date_t now,const stdx::unordered_set<HostAndPort> & member_set)1309 void TopologyCoordinator::resetMemberTimeouts(Date_t now,
1310                                               const stdx::unordered_set<HostAndPort>& member_set) {
1311     for (auto&& memberData : _memberData) {
1312         if (member_set.count(memberData.getHostAndPort()))
1313             memberData.updateLiveness(now);
1314     }
1315 }
1316 
getMyLastAppliedOpTime() const1317 OpTime TopologyCoordinator::getMyLastAppliedOpTime() const {
1318     return _selfMemberData().getLastAppliedOpTime();
1319 }
1320 
getMyLastDurableOpTime() const1321 OpTime TopologyCoordinator::getMyLastDurableOpTime() const {
1322     return _selfMemberData().getLastDurableOpTime();
1323 }
1324 
getMyMemberData()1325 MemberData* TopologyCoordinator::getMyMemberData() {
1326     return &_memberData[_selfMemberDataIndex()];
1327 }
1328 
findMemberDataByMemberId(const int memberId)1329 MemberData* TopologyCoordinator::findMemberDataByMemberId(const int memberId) {
1330     const int memberIndex = _getMemberIndex(memberId);
1331     if (memberIndex >= 0)
1332         return &_memberData[memberIndex];
1333     return nullptr;
1334 }
1335 
findMemberDataByRid(const OID rid)1336 MemberData* TopologyCoordinator::findMemberDataByRid(const OID rid) {
1337     for (auto& memberData : _memberData) {
1338         if (memberData.getRid() == rid)
1339             return &memberData;
1340     }
1341     return nullptr;
1342 }
1343 
addSlaveMemberData(const OID rid)1344 MemberData* TopologyCoordinator::addSlaveMemberData(const OID rid) {
1345     invariant(!_memberData.empty());        // Must always have our own entry first.
1346     invariant(!_rsConfig.isInitialized());  // Used only for master-slave.
1347     _memberData.emplace_back();
1348     auto* result = &_memberData.back();
1349     result->setRid(rid);
1350     return result;
1351 }
1352 
_updatePrimaryFromHBDataV1(int updatedConfigIndex,const MemberState & originalState,Date_t now)1353 HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1(
1354     int updatedConfigIndex, const MemberState& originalState, Date_t now) {
1355     //
1356     // Updates the local notion of which remote node, if any is primary.
1357     // Start the priority takeover process if we are eligible.
1358     //
1359 
1360     invariant(updatedConfigIndex != _selfIndex);
1361 
1362     // If we are missing from the config, do not participate in primary maintenance or election.
1363     if (_selfIndex == -1) {
1364         return HeartbeatResponseAction::makeNoAction();
1365     }
1366     // If we are the primary, there must be no other primary, otherwise its higher term would
1367     // have already made us step down.
1368     if (_currentPrimaryIndex == _selfIndex) {
1369         return HeartbeatResponseAction::makeNoAction();
1370     }
1371 
1372     // Scan the member list's heartbeat data for who is primary, and update _currentPrimaryIndex.
1373     int primaryIndex = -1;
1374     for (size_t i = 0; i < _memberData.size(); i++) {
1375         const MemberData& member = _memberData.at(i);
1376         if (member.getState().primary() && member.up()) {
1377             if (primaryIndex == -1 || _memberData.at(primaryIndex).getTerm() < member.getTerm()) {
1378                 primaryIndex = i;
1379             }
1380         }
1381     }
1382     _currentPrimaryIndex = primaryIndex;
1383     if (_currentPrimaryIndex == -1) {
1384         return HeartbeatResponseAction::makeNoAction();
1385     }
1386 
1387     // Clear last heartbeat message on ourselves.
1388     setMyHeartbeatMessage(now, "");
1389 
1390     // Takeover when the replset is stable.
1391     //
1392     // Take over the primary only if the remote primary is in the latest term I know.
1393     // This is done only when we get a heartbeat response from the primary.
1394     // Otherwise, there must be an outstanding election, which may succeed or not, but
1395     // the remote primary will become aware of that election eventually and step down.
1396     if (_memberData.at(primaryIndex).getTerm() == _term && updatedConfigIndex == primaryIndex) {
1397 
1398         // Don't schedule catchup takeover if catchup takeover or primary catchup is disabled.
1399         bool catchupTakeoverDisabled =
1400             ReplSetConfig::kCatchUpDisabled == _rsConfig.getCatchUpTimeoutPeriod() ||
1401             ReplSetConfig::kCatchUpTakeoverDisabled == _rsConfig.getCatchUpTakeoverDelay();
1402 
1403         bool scheduleCatchupTakeover = false;
1404         bool schedulePriorityTakeover = false;
1405 
1406         if (!catchupTakeoverDisabled && (_memberData.at(primaryIndex).getLastAppliedOpTime() <
1407                                          _memberData.at(_selfIndex).getLastAppliedOpTime())) {
1408             LOG(2) << "I can take over the primary due to fresher data."
1409                    << " Current primary index: " << primaryIndex << " in term "
1410                    << _memberData.at(primaryIndex).getTerm() << "."
1411                    << " Current primary optime: "
1412                    << _memberData.at(primaryIndex).getLastAppliedOpTime()
1413                    << " My optime: " << _memberData.at(_selfIndex).getLastAppliedOpTime();
1414 
1415             scheduleCatchupTakeover = true;
1416         }
1417 
1418         if (_rsConfig.getMemberAt(primaryIndex).getPriority() <
1419             _rsConfig.getMemberAt(_selfIndex).getPriority()) {
1420             LOG(2) << "I can take over the primary due to higher priority."
1421                    << " Current primary index: " << primaryIndex << " in term "
1422                    << _memberData.at(primaryIndex).getTerm();
1423 
1424             schedulePriorityTakeover = true;
1425         }
1426 
1427         // Calculate rank of current node. A rank of 0 indicates that it has the highest priority.
1428         auto currentNodePriority = _rsConfig.getMemberAt(_selfIndex).getPriority();
1429 
1430         // Schedule a priority takeover early only if we know that the current node has the highest
1431         // priority in the replica set, has a higher priority than the primary, and is the most
1432         // up to date node.
1433         // Otherwise, prefer to schedule a catchup takeover over a priority takeover
1434         if (scheduleCatchupTakeover && schedulePriorityTakeover &&
1435             _rsConfig.calculatePriorityRank(currentNodePriority) == 0) {
1436             LOG(2) << "I can take over the primary because I have a higher priority, the highest "
1437                    << "priority in the replica set, and fresher data."
1438                    << " Current primary index: " << primaryIndex << " in term "
1439                    << _memberData.at(primaryIndex).getTerm();
1440             return HeartbeatResponseAction::makePriorityTakeoverAction();
1441         }
1442         if (scheduleCatchupTakeover) {
1443             return HeartbeatResponseAction::makeCatchupTakeoverAction();
1444         }
1445         if (schedulePriorityTakeover) {
1446             return HeartbeatResponseAction::makePriorityTakeoverAction();
1447         }
1448     }
1449     return HeartbeatResponseAction::makeNoAction();
1450 }
1451 
_updatePrimaryFromHBData(int updatedConfigIndex,const MemberState & originalState,Date_t now)1452 HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBData(
1453     int updatedConfigIndex, const MemberState& originalState, Date_t now) {
1454     // This method has two interrelated responsibilities, performed in two phases.
1455     //
1456     // First, it updates the local notion of which remote node, if any is primary.  In the
1457     // process, it may request a remote primary to step down because there is a higher priority
1458     // node waiting, or because the local node thinks it is primary and that it has a more
1459     // recent electionTime.  It may instead decide that the local node should step down itself,
1460     // because a remote has a more recent election time.
1461     //
1462     // Second, if there is no remote primary, and the local node is not primary, it considers
1463     // whether or not to stand for election.
1464     invariant(updatedConfigIndex != _selfIndex);
1465 
1466     // We are missing from the config, so do not participate in primary maintenance or election.
1467     if (_selfIndex == -1) {
1468         return HeartbeatResponseAction::makeNoAction();
1469     }
1470 
1471     ////////////////////
1472     // Phase 1
1473     ////////////////////
1474 
1475     // If we believe the node whose data was just updated is primary, confirm that
1476     // the updated data supports that notion.  If not, erase our notion of who is primary.
1477     if (updatedConfigIndex == _currentPrimaryIndex) {
1478         const MemberData& updatedHBData = _memberData.at(updatedConfigIndex);
1479         if (!updatedHBData.up() || !updatedHBData.getState().primary()) {
1480             _currentPrimaryIndex = -1;
1481         }
1482     }
1483 
1484     // If the current primary is not highest priority and up to date (within 10s),
1485     // have them/me stepdown.
1486     if (_currentPrimaryIndex != -1) {
1487         // check if we should ask the primary (possibly ourselves) to step down
1488         const int highestPriorityIndex = _getHighestPriorityElectableIndex(now);
1489         if (highestPriorityIndex != -1) {
1490             const MemberConfig& currentPrimaryMember = _rsConfig.getMemberAt(_currentPrimaryIndex);
1491             const MemberConfig& highestPriorityMember = _rsConfig.getMemberAt(highestPriorityIndex);
1492             const OpTime highestPriorityMemberOptime = highestPriorityIndex == _selfIndex
1493                 ? getMyLastAppliedOpTime()
1494                 : _memberData.at(highestPriorityIndex).getHeartbeatAppliedOpTime();
1495 
1496             if ((highestPriorityMember.getPriority() > currentPrimaryMember.getPriority()) &&
1497                 _isOpTimeCloseEnoughToLatestToElect(highestPriorityMemberOptime)) {
1498                 const OpTime latestOpTime = _latestKnownOpTime();
1499 
1500                 if (_iAmPrimary()) {
1501                     if (_leaderMode == LeaderMode::kSteppingDown) {
1502                         return HeartbeatResponseAction::makeNoAction();
1503                     }
1504                     log() << "Stepping down self (priority " << currentPrimaryMember.getPriority()
1505                           << ") because " << highestPriorityMember.getHostAndPort()
1506                           << " has higher priority " << highestPriorityMember.getPriority()
1507                           << " and is only "
1508                           << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
1509                           << " seconds behind me";
1510                     const Date_t until =
1511                         now + VoteLease::leaseTime + _rsConfig.getHeartbeatInterval();
1512                     if (_electionSleepUntil < until) {
1513                         _electionSleepUntil = until;
1514                     }
1515                     return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
1516                 } else if ((highestPriorityIndex == _selfIndex) && (_electionSleepUntil <= now)) {
1517                     // If this node is the highest priority node, and it is not in
1518                     // an inter-election sleep period, ask the current primary to step down.
1519                     // This is an optimization, because the remote primary will almost certainly
1520                     // notice this node's electability promptly, via its own heartbeat process.
1521                     log() << "Requesting that " << currentPrimaryMember.getHostAndPort()
1522                           << " (priority " << currentPrimaryMember.getPriority()
1523                           << ") step down because I have higher priority "
1524                           << highestPriorityMember.getPriority() << " and am only "
1525                           << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs())
1526                           << " seconds behind it";
1527                     int primaryIndex = _currentPrimaryIndex;
1528                     _currentPrimaryIndex = -1;
1529                     return HeartbeatResponseAction::makeStepDownRemoteAction(primaryIndex);
1530                 }
1531             }
1532         }
1533     }
1534 
1535     // Scan the member list's heartbeat data for who is primary, and update
1536     // _currentPrimaryIndex and _role, or request a remote to step down, as necessary.
1537     {
1538         int remotePrimaryIndex = -1;
1539         for (std::vector<MemberData>::const_iterator it = _memberData.begin();
1540              it != _memberData.end();
1541              ++it) {
1542             const int itIndex = indexOfIterator(_memberData, it);
1543             if (itIndex == _selfIndex) {
1544                 continue;
1545             }
1546 
1547             if (it->getState().primary() && it->up()) {
1548                 if (remotePrimaryIndex != -1) {
1549                     // two other nodes think they are primary (asynchronously polled)
1550                     // -- wait for things to settle down.
1551                     warning() << "two remote primaries (transiently)";
1552                     return HeartbeatResponseAction::makeNoAction();
1553                 }
1554                 remotePrimaryIndex = itIndex;
1555             }
1556         }
1557 
1558         if (remotePrimaryIndex != -1) {
1559             // If it's the same as last time, don't do anything further.
1560             if (_currentPrimaryIndex == remotePrimaryIndex) {
1561                 return HeartbeatResponseAction::makeNoAction();
1562             }
1563             // Clear last heartbeat message on ourselves (why?)
1564             setMyHeartbeatMessage(now, "");
1565 
1566             // If we are also primary, this is a problem.  Determine who should step down.
1567             if (_iAmPrimary()) {
1568                 Timestamp remoteElectionTime = _memberData.at(remotePrimaryIndex).getElectionTime();
1569                 log() << "another primary seen with election time " << remoteElectionTime
1570                       << " my election time is " << _electionTime;
1571 
1572                 // Step down whomever has the older election time.
1573                 if (remoteElectionTime > _electionTime) {
1574                     if (_leaderMode == LeaderMode::kSteppingDown) {
1575                         return HeartbeatResponseAction::makeNoAction();
1576                     }
1577                     log() << "stepping down; another primary was elected more recently";
1578                     return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
1579                 } else {
1580                     log() << "another PRIMARY detected and it should step down"
1581                              " since it was elected earlier than me";
1582                     return HeartbeatResponseAction::makeStepDownRemoteAction(remotePrimaryIndex);
1583                 }
1584             }
1585 
1586             _currentPrimaryIndex = remotePrimaryIndex;
1587             return HeartbeatResponseAction::makeNoAction();
1588         }
1589     }
1590 
1591     ////////////////////
1592     // Phase 2
1593     ////////////////////
1594 
1595     // We do not believe any remote to be primary.
1596 
1597     // If we are primary, check if we can still see majority of the set;
1598     // stepdown if we can't.
1599     if (_iAmPrimary()) {
1600         if (CannotSeeMajority &
1601             _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)) {
1602             if (_leaderMode == LeaderMode::kSteppingDown) {
1603                 return HeartbeatResponseAction::makeNoAction();
1604             }
1605             log() << "can't see a majority of the set, relinquishing primary";
1606             return HeartbeatResponseAction::makeStepDownSelfAction(_selfIndex);
1607         }
1608 
1609         LOG(2) << "Choosing to remain primary";
1610         return HeartbeatResponseAction::makeNoAction();
1611     }
1612 
1613     fassert(18505, _currentPrimaryIndex == -1);
1614 
1615     const MemberState currentState = getMemberState();
1616     if (originalState.recovering() && currentState.secondary()) {
1617         // We just transitioned from RECOVERING to SECONDARY, this can only happen if we
1618         // received a heartbeat with an auth error when previously all the heartbeats we'd
1619         // received had auth errors.  In this case, don't return makeElectAction() because
1620         // that could cause the election to start before the ReplicationCoordinator has updated
1621         // its notion of the member state to SECONDARY.  Instead return noAction so that the
1622         // ReplicationCooridinator knows to update its tracking of the member state off of the
1623         // TopologyCoordinator, and leave starting the election until the next heartbeat comes
1624         // back.
1625         return HeartbeatResponseAction::makeNoAction();
1626     }
1627 
1628     // At this point, there is no primary anywhere.  Check to see if we should become a candidate.
1629     const auto status = checkShouldStandForElection(now);
1630     if (!status.isOK()) {
1631         // NOTE: This log line is checked in unit test(s).
1632         LOG(2) << "TopologyCoordinator::_updatePrimaryFromHBData - " << status.reason();
1633         return HeartbeatResponseAction::makeNoAction();
1634     }
1635     fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout));
1636     return HeartbeatResponseAction::makeElectAction();
1637 }
1638 
checkShouldStandForElection(Date_t now) const1639 Status TopologyCoordinator::checkShouldStandForElection(Date_t now) const {
1640     if (_currentPrimaryIndex != -1) {
1641         return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"};
1642     }
1643     invariant(_role != Role::kLeader);
1644 
1645     if (_role == Role::kCandidate) {
1646         return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"};
1647     }
1648 
1649     const UnelectableReasonMask unelectableReason =
1650         _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout);
1651     if (NotCloseEnoughToLatestOptime & unelectableReason) {
1652         return {ErrorCodes::NodeNotElectable,
1653                 str::stream() << "Not standing for election because "
1654                               << _getUnelectableReasonString(unelectableReason)
1655                               << "; my last optime is "
1656                               << getMyLastAppliedOpTime().toString()
1657                               << " and the newest is "
1658                               << _latestKnownOpTime().toString()};
1659     }
1660     if (unelectableReason) {
1661         return {ErrorCodes::NodeNotElectable,
1662                 str::stream() << "Not standing for election because "
1663                               << _getUnelectableReasonString(unelectableReason)};
1664     }
1665     if (_electionSleepUntil > now) {
1666         if (_rsConfig.getProtocolVersion() == 1) {
1667             return {
1668                 ErrorCodes::NodeNotElectable,
1669                 str::stream() << "Not standing for election before "
1670                               << dateToISOStringLocal(_electionSleepUntil)
1671                               << " because I stood up or learned about a new term too recently"};
1672         } else {
1673             return {ErrorCodes::NodeNotElectable,
1674                     str::stream() << "Not standing for election before "
1675                                   << dateToISOStringLocal(_electionSleepUntil)
1676                                   << " because I stood too recently"};
1677         }
1678     }
1679     // All checks passed. Start election proceedings.
1680     return Status::OK();
1681 }
1682 
_aMajoritySeemsToBeUp() const1683 bool TopologyCoordinator::_aMajoritySeemsToBeUp() const {
1684     int vUp = 0;
1685     for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
1686          ++it) {
1687         const int itIndex = indexOfIterator(_memberData, it);
1688         if (itIndex == _selfIndex || it->up()) {
1689             vUp += _rsConfig.getMemberAt(itIndex).getNumVotes();
1690         }
1691     }
1692 
1693     return vUp * 2 > _rsConfig.getTotalVotingMembers();
1694 }
1695 
_findHealthyPrimaryOfEqualOrGreaterPriority(const int candidateIndex) const1696 int TopologyCoordinator::_findHealthyPrimaryOfEqualOrGreaterPriority(
1697     const int candidateIndex) const {
1698     const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority();
1699     for (auto it = _memberData.begin(); it != _memberData.end(); ++it) {
1700         if (!it->up() || it->getState() != MemberState::RS_PRIMARY) {
1701             continue;
1702         }
1703         const int itIndex = indexOfIterator(_memberData, it);
1704         const double priority = _rsConfig.getMemberAt(itIndex).getPriority();
1705         if (itIndex != candidateIndex && priority >= candidatePriority) {
1706             return itIndex;
1707         }
1708     }
1709 
1710     return -1;
1711 }
1712 
_isOpTimeCloseEnoughToLatestToElect(const OpTime & otherOpTime) const1713 bool TopologyCoordinator::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const {
1714     const OpTime latestKnownOpTime = _latestKnownOpTime();
1715     // Use addition instead of subtraction to avoid overflow.
1716     return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs());
1717 }
1718 
_amIFreshEnoughForPriorityTakeover() const1719 bool TopologyCoordinator::_amIFreshEnoughForPriorityTakeover() const {
1720     const OpTime latestKnownOpTime = _latestKnownOpTime();
1721 
1722     // Rules are:
1723     // - If the terms don't match, we don't call for priority takeover.
1724     // - If our optime and the latest optime happen in different seconds, our optime must be within
1725     // at least priorityTakeoverFreshnessWindowSeconds seconds of the latest optime.
1726     // - If our optime and the latest optime happen in the same second, our optime must be within
1727     // at least 1000 oplog entries of the latest optime (i.e. the increment portion of the timestamp
1728     // must be within 1000).  This is to handle the case where a primary had its clock set far into
1729     // the future, took some writes, then had its clock set back.  In that case the timestamp
1730     // component of all future oplog entries generated will be the same, until real world time
1731     // passes the timestamp component of the last oplog entry.
1732 
1733     const OpTime ourLastOpApplied = getMyLastAppliedOpTime();
1734     if (ourLastOpApplied.getTerm() != latestKnownOpTime.getTerm()) {
1735         return false;
1736     }
1737 
1738     if (ourLastOpApplied.getTimestamp().getSecs() != latestKnownOpTime.getTimestamp().getSecs()) {
1739         return ourLastOpApplied.getTimestamp().getSecs() + priorityTakeoverFreshnessWindowSeconds >=
1740             latestKnownOpTime.getTimestamp().getSecs();
1741     } else {
1742         return ourLastOpApplied.getTimestamp().getInc() + 1000 >=
1743             latestKnownOpTime.getTimestamp().getInc();
1744     }
1745 }
1746 
_amIFreshEnoughForCatchupTakeover() const1747 bool TopologyCoordinator::_amIFreshEnoughForCatchupTakeover() const {
1748 
1749     const OpTime latestKnownOpTime = _latestKnownOpTime();
1750 
1751     // Rules are:
1752     // - We must have the freshest optime of all the up nodes.
1753     // - We must specifically have a fresher optime than the primary (can't be equal).
1754     // - The term of our last applied op must be less than the current term. This ensures that no
1755     // writes have happened since the most recent election and that the primary is still in
1756     // catchup mode.
1757 
1758     // There is no point to a catchup takeover if we aren't the freshest node because
1759     // another node would immediately perform another catchup takeover when we become primary.
1760     const OpTime ourLastOpApplied = getMyLastAppliedOpTime();
1761     if (ourLastOpApplied < latestKnownOpTime) {
1762         return false;
1763     }
1764 
1765     if (_currentPrimaryIndex == -1) {
1766         return false;
1767     }
1768 
1769     // If we aren't ahead of the primary, there is no point to having a catchup takeover.
1770     const OpTime primaryLastOpApplied = _memberData[_currentPrimaryIndex].getLastAppliedOpTime();
1771 
1772     if (ourLastOpApplied <= primaryLastOpApplied) {
1773         return false;
1774     }
1775 
1776     // If the term of our last applied op is less than the current term, the primary didn't write
1777     // anything and it is still in catchup mode.
1778     return ourLastOpApplied.getTerm() < _term;
1779 }
1780 
_iAmPrimary() const1781 bool TopologyCoordinator::_iAmPrimary() const {
1782     if (_role == Role::kLeader) {
1783         invariant(_currentPrimaryIndex == _selfIndex);
1784         invariant(_leaderMode != LeaderMode::kNotLeader);
1785         return true;
1786     }
1787     return false;
1788 }
1789 
_latestKnownOpTime() const1790 OpTime TopologyCoordinator::_latestKnownOpTime() const {
1791     OpTime latest = getMyLastAppliedOpTime();
1792     for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
1793          ++it) {
1794         // Ignore self
1795         // TODO(russotto): Simplify when heartbeat and spanning tree times are combined.
1796         if (it->isSelf()) {
1797             continue;
1798         }
1799         // Ignore down members
1800         if (!it->up()) {
1801             continue;
1802         }
1803         // Ignore removed nodes (not in config, so not valid).
1804         if (it->getState().removed()) {
1805             continue;
1806         }
1807 
1808         OpTime optime = it->getHeartbeatAppliedOpTime();
1809 
1810         if (optime > latest) {
1811             latest = optime;
1812         }
1813     }
1814 
1815     return latest;
1816 }
1817 
_isMemberHigherPriority(int memberOneIndex,int memberTwoIndex) const1818 bool TopologyCoordinator::_isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const {
1819     if (memberOneIndex == -1)
1820         return false;
1821 
1822     if (memberTwoIndex == -1)
1823         return true;
1824 
1825     return _rsConfig.getMemberAt(memberOneIndex).getPriority() >
1826         _rsConfig.getMemberAt(memberTwoIndex).getPriority();
1827 }
1828 
_getHighestPriorityElectableIndex(Date_t now) const1829 int TopologyCoordinator::_getHighestPriorityElectableIndex(Date_t now) const {
1830     int maxIndex = -1;
1831     for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) {
1832         UnelectableReasonMask reason = currentIndex == _selfIndex
1833             ? _getMyUnelectableReason(now, StartElectionReason::kElectionTimeout)
1834             : _getUnelectableReason(currentIndex);
1835         if (None == reason && _isMemberHigherPriority(currentIndex, maxIndex)) {
1836             maxIndex = currentIndex;
1837         }
1838     }
1839 
1840     return maxIndex;
1841 }
1842 
prepareForUnconditionalStepDown()1843 bool TopologyCoordinator::prepareForUnconditionalStepDown() {
1844     if (_leaderMode == LeaderMode::kSteppingDown) {
1845         // Can only be processing one required stepdown at a time.
1846         return false;
1847     }
1848     // Heartbeat-initiated stepdowns take precedence over stepdown command initiated stepdowns, so
1849     // it's safe to transition from kAttemptingStepDown to kSteppingDown.
1850     _setLeaderMode(LeaderMode::kSteppingDown);
1851     return true;
1852 }
1853 
1854 StatusWith<TopologyCoordinator::StepDownAttemptAbortFn>
prepareForStepDownAttempt()1855 TopologyCoordinator::prepareForStepDownAttempt() {
1856     if (_leaderMode == LeaderMode::kSteppingDown ||
1857         _leaderMode == LeaderMode::kAttemptingStepDown) {
1858         return Status{ErrorCodes::ConflictingOperationInProgress,
1859                       "This node is already in the process of stepping down"};
1860     }
1861 
1862     if (_leaderMode == LeaderMode::kNotLeader) {
1863         return Status{ErrorCodes::NotMaster, "This node is not a primary."};
1864     }
1865 
1866     invariant(_leaderMode == LeaderMode::kMaster || _leaderMode == LeaderMode::kLeaderElect);
1867     const auto previousLeaderMode = _leaderMode;
1868     _setLeaderMode(LeaderMode::kAttemptingStepDown);
1869 
1870     return {[this, previousLeaderMode] {
1871         if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) {
1872             _setLeaderMode(previousLeaderMode);
1873         }
1874     }};
1875 }
1876 
changeMemberState_forTest(const MemberState & newMemberState,const Timestamp & electionTime)1877 void TopologyCoordinator::changeMemberState_forTest(const MemberState& newMemberState,
1878                                                     const Timestamp& electionTime) {
1879     invariant(_selfIndex != -1);
1880     if (newMemberState == getMemberState())
1881         return;
1882     switch (newMemberState.s) {
1883         case MemberState::RS_PRIMARY:
1884             _role = Role::kCandidate;
1885             processWinElection(OID(), electionTime);
1886             invariant(_role == Role::kLeader);
1887             break;
1888         case MemberState::RS_SECONDARY:
1889         case MemberState::RS_ROLLBACK:
1890         case MemberState::RS_RECOVERING:
1891         case MemberState::RS_STARTUP2:
1892             _role = Role::kFollower;
1893             _followerMode = newMemberState.s;
1894             if (_currentPrimaryIndex == _selfIndex) {
1895                 _currentPrimaryIndex = -1;
1896                 _setLeaderMode(LeaderMode::kNotLeader);
1897             }
1898             break;
1899         case MemberState::RS_STARTUP:
1900             updateConfig(ReplSetConfig(), -1, Date_t());
1901             break;
1902         default:
1903             severe() << "Cannot switch to state " << newMemberState;
1904             invariant(false);
1905     }
1906     if (getMemberState() != newMemberState.s) {
1907         severe() << "Expected to enter state " << newMemberState << " but am now in "
1908                  << getMemberState();
1909         invariant(false);
1910     }
1911     log() << newMemberState;
1912 }
1913 
_setCurrentPrimaryForTest(int primaryIndex)1914 void TopologyCoordinator::_setCurrentPrimaryForTest(int primaryIndex) {
1915     if (primaryIndex == _selfIndex) {
1916         changeMemberState_forTest(MemberState::RS_PRIMARY);
1917     } else {
1918         if (_iAmPrimary()) {
1919             changeMemberState_forTest(MemberState::RS_SECONDARY);
1920         }
1921         if (primaryIndex != -1) {
1922             ReplSetHeartbeatResponse hbResponse;
1923             hbResponse.setState(MemberState::RS_PRIMARY);
1924             hbResponse.setElectionTime(Timestamp());
1925             hbResponse.setAppliedOpTime(_memberData.at(primaryIndex).getHeartbeatAppliedOpTime());
1926             hbResponse.setSyncingTo(HostAndPort());
1927             hbResponse.setHbMsg("");
1928             _memberData.at(primaryIndex)
1929                 .setUpValues(_memberData.at(primaryIndex).getLastHeartbeat(),
1930                              std::move(hbResponse));
1931         }
1932         _currentPrimaryIndex = primaryIndex;
1933     }
1934 }
1935 
_currentPrimaryMember() const1936 const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const {
1937     if (_currentPrimaryIndex == -1)
1938         return NULL;
1939 
1940     return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
1941 }
1942 
prepareStatusResponse(const ReplSetStatusArgs & rsStatusArgs,BSONObjBuilder * response,Status * result)1943 void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
1944                                                 BSONObjBuilder* response,
1945                                                 Status* result) {
1946     // output for each member
1947     vector<BSONObj> membersOut;
1948     const MemberState myState = getMemberState();
1949     const Date_t now = rsStatusArgs.now;
1950     const OpTime lastOpApplied = getMyLastAppliedOpTime();
1951     const OpTime lastOpDurable = getMyLastDurableOpTime();
1952     const BSONObj& initialSyncStatus = rsStatusArgs.initialSyncStatus;
1953 
1954     if (_selfIndex == -1) {
1955         // We're REMOVED or have an invalid config
1956         response->append("state", static_cast<int>(myState.s));
1957         response->append("stateStr", myState.toString());
1958         response->append("uptime", rsStatusArgs.selfUptime);
1959 
1960         appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion());
1961 
1962         response->appendDate("optimeDate",
1963                              Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
1964         if (_maintenanceModeCalls) {
1965             response->append("maintenanceMode", _maintenanceModeCalls);
1966         }
1967         response->append("lastHeartbeatMessage", "");
1968         response->append("syncingTo", "");
1969         response->append("syncSourceHost", "");
1970         response->append("syncSourceId", -1);
1971 
1972         response->append("infoMessage", _getHbmsg(now));
1973         *result = Status(ErrorCodes::InvalidReplicaSetConfig,
1974                          "Our replica set config is invalid or we are not a member of it");
1975         return;
1976     }
1977 
1978     for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
1979          ++it) {
1980         const int itIndex = indexOfIterator(_memberData, it);
1981         if (itIndex == _selfIndex) {
1982             // add self
1983             BSONObjBuilder bb;
1984             bb.append("_id", _selfConfig().getId());
1985             bb.append("name", _selfConfig().getHostAndPort().toString());
1986             bb.append("health", 1.0);
1987             bb.append("state", static_cast<int>(myState.s));
1988             bb.append("stateStr", myState.toString());
1989             bb.append("uptime", rsStatusArgs.selfUptime);
1990             if (!_selfConfig().isArbiter()) {
1991                 appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion());
1992                 bb.appendDate("optimeDate",
1993                               Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs())));
1994             }
1995 
1996             if (!_syncSource.empty() && !_iAmPrimary()) {
1997                 bb.append("syncingTo", _syncSource.toString());
1998                 bb.append("syncSourceHost", _syncSource.toString());
1999                 const MemberConfig* member = _rsConfig.findMemberByHostAndPort(_syncSource);
2000                 bb.append("syncSourceId", member ? member->getId() : -1);
2001             } else {
2002                 bb.append("syncingTo", "");
2003                 bb.append("syncSourceHost", "");
2004                 bb.append("syncSourceId", -1);
2005             }
2006 
2007             if (_maintenanceModeCalls) {
2008                 bb.append("maintenanceMode", _maintenanceModeCalls);
2009             }
2010 
2011             bb.append("infoMessage", _getHbmsg(now));
2012 
2013             if (myState.primary()) {
2014                 bb.append("electionTime", _electionTime);
2015                 bb.appendDate("electionDate",
2016                               Date_t::fromDurationSinceEpoch(Seconds(_electionTime.getSecs())));
2017             }
2018             bb.appendIntOrLL("configVersion", _rsConfig.getConfigVersion());
2019             bb.append("self", true);
2020             bb.append("lastHeartbeatMessage", "");
2021             membersOut.push_back(bb.obj());
2022         } else {
2023             // add non-self member
2024             const MemberConfig& itConfig = _rsConfig.getMemberAt(itIndex);
2025             BSONObjBuilder bb;
2026             bb.append("_id", itConfig.getId());
2027             bb.append("name", itConfig.getHostAndPort().toString());
2028             double h = it->getHealth();
2029             bb.append("health", h);
2030             const MemberState state = it->getState();
2031             bb.append("state", static_cast<int>(state.s));
2032             if (h == 0) {
2033                 // if we can't connect the state info is from the past
2034                 // and could be confusing to show
2035                 bb.append("stateStr", "(not reachable/healthy)");
2036             } else {
2037                 bb.append("stateStr", it->getState().toString());
2038             }
2039 
2040             const unsigned int uptime = static_cast<unsigned int>((
2041                 it->getUpSince() != Date_t() ? durationCount<Seconds>(now - it->getUpSince()) : 0));
2042             bb.append("uptime", uptime);
2043             if (!itConfig.isArbiter()) {
2044                 appendOpTime(
2045                     &bb, "optime", it->getHeartbeatAppliedOpTime(), _rsConfig.getProtocolVersion());
2046                 appendOpTime(&bb,
2047                              "optimeDurable",
2048                              it->getHeartbeatDurableOpTime(),
2049                              _rsConfig.getProtocolVersion());
2050 
2051                 bb.appendDate("optimeDate",
2052                               Date_t::fromDurationSinceEpoch(
2053                                   Seconds(it->getHeartbeatAppliedOpTime().getSecs())));
2054                 bb.appendDate("optimeDurableDate",
2055                               Date_t::fromDurationSinceEpoch(
2056                                   Seconds(it->getHeartbeatDurableOpTime().getSecs())));
2057             }
2058             bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
2059             bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
2060             Milliseconds ping = _getPing(itConfig.getHostAndPort());
2061             bb.append("pingMs", durationCount<Milliseconds>(ping));
2062             bb.append("lastHeartbeatMessage", it->getLastHeartbeatMsg());
2063             if (it->hasAuthIssue()) {
2064                 bb.append("authenticated", false);
2065             }
2066             const HostAndPort& syncSource = it->getSyncSource();
2067             if (!syncSource.empty() && !state.primary()) {
2068                 bb.append("syncingTo", syncSource.toString());
2069                 bb.append("syncSourceHost", syncSource.toString());
2070                 const MemberConfig* member = _rsConfig.findMemberByHostAndPort(syncSource);
2071                 bb.append("syncSourceId", member ? member->getId() : -1);
2072             } else {
2073                 bb.append("syncingTo", "");
2074                 bb.append("syncSourceHost", "");
2075                 bb.append("syncSourceId", -1);
2076             }
2077 
2078             bb.append("infoMessage", "");
2079 
2080             if (state == MemberState::RS_PRIMARY) {
2081                 bb.append("electionTime", it->getElectionTime());
2082                 bb.appendDate(
2083                     "electionDate",
2084                     Date_t::fromDurationSinceEpoch(Seconds(it->getElectionTime().getSecs())));
2085             }
2086             bb.appendIntOrLL("configVersion", it->getConfigVersion());
2087             membersOut.push_back(bb.obj());
2088         }
2089     }
2090 
2091     // sort members bson
2092     sort(membersOut.begin(), membersOut.end(), SimpleBSONObjComparator::kInstance.makeLessThan());
2093 
2094     response->append("set", _rsConfig.isInitialized() ? _rsConfig.getReplSetName() : "");
2095     response->append("date", now);
2096     response->append("myState", myState.s);
2097     response->append("term", _term);
2098 
2099     // Add sync source info
2100     if (!_syncSource.empty() && !myState.primary() && !myState.removed()) {
2101         response->append("syncingTo", _syncSource.toString());
2102         response->append("syncSourceHost", _syncSource.toString());
2103         const MemberConfig* member = _rsConfig.findMemberByHostAndPort(_syncSource);
2104         response->append("syncSourceId", member ? member->getId() : -1);
2105     } else {
2106         response->append("syncingTo", "");
2107         response->append("syncSourceHost", "");
2108         response->append("syncSourceId", -1);
2109     }
2110 
2111     if (_rsConfig.isConfigServer()) {
2112         response->append("configsvr", true);
2113     }
2114 
2115     response->append("heartbeatIntervalMillis",
2116                      durationCount<Milliseconds>(_rsConfig.getHeartbeatInterval()));
2117 
2118     // New optimes, to hold them all.
2119     BSONObjBuilder optimes;
2120     _lastCommittedOpTime.append(&optimes, "lastCommittedOpTime");
2121     if (!rsStatusArgs.readConcernMajorityOpTime.isNull()) {
2122         rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime");
2123     }
2124 
2125     appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion());
2126     appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion());
2127     response->append("optimes", optimes.obj());
2128 
2129     if (!initialSyncStatus.isEmpty()) {
2130         response->append("initialSyncStatus", initialSyncStatus);
2131     }
2132 
2133     response->append("members", membersOut);
2134     *result = Status::OK();
2135 }
2136 
prepareReplSetUpdatePositionCommand(OpTime currentCommittedSnapshotOpTime) const2137 StatusWith<BSONObj> TopologyCoordinator::prepareReplSetUpdatePositionCommand(
2138     OpTime currentCommittedSnapshotOpTime) const {
2139     BSONObjBuilder cmdBuilder;
2140     invariant(_rsConfig.isInitialized());
2141     // Do not send updates if we have been removed from the config.
2142     if (_selfIndex == -1) {
2143         return Status(ErrorCodes::NodeNotFound,
2144                       "This node is not in the current replset configuration.");
2145     }
2146     cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
2147     // Create an array containing objects each live member connected to us and for ourself.
2148     BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
2149     for (const auto& memberData : _memberData) {
2150         if (memberData.getLastAppliedOpTime().isNull()) {
2151             // Don't include info on members we haven't heard from yet.
2152             continue;
2153         }
2154         // Don't include members we think are down.
2155         if (!memberData.isSelf() && memberData.lastUpdateStale()) {
2156             continue;
2157         }
2158 
2159         BSONObjBuilder entry(arrayBuilder.subobjStart());
2160         memberData.getLastDurableOpTime().append(&entry,
2161                                                  UpdatePositionArgs::kDurableOpTimeFieldName);
2162         memberData.getLastAppliedOpTime().append(&entry,
2163                                                  UpdatePositionArgs::kAppliedOpTimeFieldName);
2164         entry.append(UpdatePositionArgs::kMemberIdFieldName, memberData.getMemberId());
2165         entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
2166     }
2167     arrayBuilder.done();
2168 
2169     // Add metadata to command
2170     prepareReplSetMetadata(currentCommittedSnapshotOpTime)
2171         .writeToMetadata(&cmdBuilder)
2172         .transitional_ignore();
2173     return cmdBuilder.obj();
2174 }
2175 
fillMemberData(BSONObjBuilder * result)2176 void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) {
2177     BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
2178     {
2179         for (const auto& memberData : _memberData) {
2180             BSONObjBuilder entry(replicationProgress.subobjStart());
2181             entry.append("rid", memberData.getRid());
2182             const auto lastDurableOpTime = memberData.getLastDurableOpTime();
2183             if (_rsConfig.getProtocolVersion() == 1) {
2184                 BSONObjBuilder opTime(entry.subobjStart("optime"));
2185                 opTime.append("ts", lastDurableOpTime.getTimestamp());
2186                 opTime.append("term", lastDurableOpTime.getTerm());
2187                 opTime.done();
2188             } else {
2189                 entry.append("optime", lastDurableOpTime.getTimestamp());
2190             }
2191             entry.append("host", memberData.getHostAndPort().toString());
2192             if (_selfIndex >= 0) {
2193                 const int memberId = memberData.getMemberId();
2194                 invariant(memberId >= 0);
2195                 entry.append("memberId", memberId);
2196             }
2197         }
2198     }
2199 }
2200 
fillIsMasterForReplSet(IsMasterResponse * const response,const SplitHorizon::Parameters & horizonParams)2201 void TopologyCoordinator::fillIsMasterForReplSet(IsMasterResponse* const response,
2202                                                  const SplitHorizon::Parameters& horizonParams) {
2203     const MemberState myState = getMemberState();
2204     if (!_rsConfig.isInitialized()) {
2205         response->markAsNoConfig();
2206         return;
2207     }
2208 
2209     response->setReplSetName(_rsConfig.getReplSetName());
2210     if (myState.removed()) {
2211         response->markAsNoConfig();
2212         return;
2213     }
2214 
2215     invariant(!_rsConfig.members().empty());
2216 
2217     const auto& self = _rsConfig.members()[_selfIndex];
2218 
2219     const auto horizon = self.determineHorizon(horizonParams).toString();
2220 
2221     for (const auto& member : _rsConfig.members()) {
2222         if (member.isHidden() || member.getSlaveDelay() > Seconds{0}) {
2223             continue;
2224         }
2225         auto hostView = member.getHostAndPort(horizon);
2226 
2227         if (member.isElectable()) {
2228             response->addHost(std::move(hostView));
2229         } else if (member.isArbiter()) {
2230             response->addArbiter(std::move(hostView));
2231         } else {
2232             response->addPassive(std::move(hostView));
2233         }
2234     }
2235 
2236     response->setReplSetVersion(_rsConfig.getConfigVersion());
2237     // "ismaster" is false if we are not primary. If we're stepping down, we're waiting for the
2238     // Replication State Transition Lock before we can change to secondary, but we should report
2239     // "ismaster" false to indicate that we can't accept new writes.
2240     response->setIsMaster(myState.primary() && _leaderMode != LeaderMode::kSteppingDown);
2241     response->setIsSecondary(myState.secondary());
2242 
2243     const MemberConfig* curPrimary = _currentPrimaryMember();
2244     if (curPrimary) {
2245         response->setPrimary(curPrimary->getHostAndPort(horizon));
2246     }
2247 
2248     const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
2249     if (selfConfig.isArbiter()) {
2250         response->setIsArbiterOnly(true);
2251     } else if (selfConfig.getPriority() == 0) {
2252         response->setIsPassive(true);
2253     }
2254     if (selfConfig.getSlaveDelay() > Seconds(0)) {
2255         response->setSlaveDelay(selfConfig.getSlaveDelay());
2256     }
2257     if (selfConfig.isHidden()) {
2258         response->setIsHidden(true);
2259     }
2260     if (!selfConfig.shouldBuildIndexes()) {
2261         response->setShouldBuildIndexes(false);
2262     }
2263     const ReplSetTagConfig tagConfig = _rsConfig.getTagConfig();
2264     if (selfConfig.hasTags(tagConfig)) {
2265         for (MemberConfig::TagIterator tag = selfConfig.tagsBegin(); tag != selfConfig.tagsEnd();
2266              ++tag) {
2267             std::string tagKey = tagConfig.getTagKey(*tag);
2268             if (tagKey[0] == '$') {
2269                 // Filter out internal tags
2270                 continue;
2271             }
2272             response->addTag(tagKey, tagConfig.getTagValue(*tag));
2273         }
2274     }
2275     response->setMe(selfConfig.getHostAndPort(horizon));
2276     if (_iAmPrimary()) {
2277         response->setElectionId(_electionId);
2278     }
2279 }
2280 
2281 StatusWith<TopologyCoordinator::PrepareFreezeResponseResult>
prepareFreezeResponse(Date_t now,int secs,BSONObjBuilder * response)2282 TopologyCoordinator::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) {
2283     if (_role != TopologyCoordinator::Role::kFollower) {
2284         std::string msg = str::stream()
2285             << "cannot freeze node when primary or running for election. state: "
2286             << (_role == TopologyCoordinator::Role::kLeader ? "Primary" : "Running-Election");
2287         log() << msg;
2288         return Status(ErrorCodes::NotSecondary, msg);
2289     }
2290 
2291     if (secs == 0) {
2292         _stepDownUntil = now;
2293         log() << "'unfreezing'";
2294         response->append("info", "unfreezing");
2295 
2296         if (_rsConfig.getProtocolVersion() == 1)
2297             return PrepareFreezeResponseResult::kSingleNodeSelfElect;
2298         if (_isElectableNodeInSingleNodeReplicaSet()) {
2299             // If we are a one-node replica set, we're the one member,
2300             // we're electable, we're not in maintenance mode, and we are currently in followerMode
2301             // SECONDARY, we must transition to candidate now that our stepdown period
2302             // is no longer active, in leiu of heartbeats.
2303             _role = Role::kCandidate;
2304             return PrepareFreezeResponseResult::kSingleNodeSelfElect;
2305         }
2306     } else {
2307         if (secs == 1)
2308             response->append("warning", "you really want to freeze for only 1 second?");
2309 
2310         _stepDownUntil = std::max(_stepDownUntil, now + Seconds(secs));
2311         log() << "'freezing' for " << secs << " seconds";
2312     }
2313 
2314     return PrepareFreezeResponseResult::kNoAction;
2315 }
2316 
becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now)2317 bool TopologyCoordinator::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) {
2318     if (_stepDownUntil > now) {
2319         return false;
2320     }
2321 
2322     if (_isElectableNodeInSingleNodeReplicaSet()) {
2323         // If the new config describes a one-node replica set, we're the one member,
2324         // we're electable, we're not in maintenance mode, and we are currently in followerMode
2325         // SECONDARY, we must transition to candidate, in leiu of heartbeats.
2326         _role = Role::kCandidate;
2327         return true;
2328     }
2329     return false;
2330 }
2331 
setElectionSleepUntil(Date_t newTime)2332 void TopologyCoordinator::setElectionSleepUntil(Date_t newTime) {
2333     if (_electionSleepUntil < newTime) {
2334         _electionSleepUntil = newTime;
2335     }
2336 }
2337 
getElectionTime() const2338 Timestamp TopologyCoordinator::getElectionTime() const {
2339     return _electionTime;
2340 }
2341 
getElectionId() const2342 OID TopologyCoordinator::getElectionId() const {
2343     return _electionId;
2344 }
2345 
getCurrentPrimaryIndex() const2346 int TopologyCoordinator::getCurrentPrimaryIndex() const {
2347     return _currentPrimaryIndex;
2348 }
2349 
getStepDownTime() const2350 Date_t TopologyCoordinator::getStepDownTime() const {
2351     return _stepDownUntil;
2352 }
2353 
_updateHeartbeatDataForReconfig(const ReplSetConfig & newConfig,int selfIndex,Date_t now)2354 void TopologyCoordinator::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig,
2355                                                           int selfIndex,
2356                                                           Date_t now) {
2357     std::vector<MemberData> oldHeartbeats;
2358     _memberData.swap(oldHeartbeats);
2359 
2360     int index = 0;
2361     for (ReplSetConfig::MemberIterator it = newConfig.membersBegin(); it != newConfig.membersEnd();
2362          ++it, ++index) {
2363         const MemberConfig& newMemberConfig = *it;
2364         MemberData newHeartbeatData;
2365         for (auto&& oldMemberData : oldHeartbeats) {
2366             if ((oldMemberData.getMemberId() == newMemberConfig.getId() &&
2367                  oldMemberData.getHostAndPort() == newMemberConfig.getHostAndPort()) ||
2368                 (index == selfIndex && oldMemberData.isSelf())) {
2369                 // This member existed in the old config with the same member ID and
2370                 // HostAndPort, so copy its heartbeat data over.
2371                 newHeartbeatData = oldMemberData;
2372                 break;
2373             }
2374         }
2375         newHeartbeatData.setConfigIndex(index);
2376         newHeartbeatData.setIsSelf(index == selfIndex);
2377         newHeartbeatData.setHostAndPort(newMemberConfig.getHostAndPort());
2378         newHeartbeatData.setMemberId(newMemberConfig.getId());
2379         _memberData.push_back(newHeartbeatData);
2380     }
2381     if (selfIndex < 0) {
2382         // It's necessary to have self member data even if self isn't in the configuration.
2383         // We don't need data for the other nodes (which no longer know about us, or soon won't)
2384         _memberData.clear();
2385         // We're not in the config, we can't sync any more.
2386         _syncSource = HostAndPort();
2387         MemberData newHeartbeatData;
2388         for (auto&& oldMemberData : oldHeartbeats) {
2389             if (oldMemberData.isSelf()) {
2390                 newHeartbeatData = oldMemberData;
2391                 break;
2392             }
2393         }
2394         newHeartbeatData.setConfigIndex(-1);
2395         newHeartbeatData.setIsSelf(true);
2396         _memberData.push_back(newHeartbeatData);
2397     }
2398 }
2399 
2400 // This function installs a new config object and recreates MemberData objects
2401 // that reflect the new config.
updateConfig(const ReplSetConfig & newConfig,int selfIndex,Date_t now)2402 void TopologyCoordinator::updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now) {
2403     invariant(_role != Role::kCandidate);
2404     invariant(selfIndex < newConfig.getNumMembers());
2405 
2406     // Reset term on startup and upgrade/downgrade of protocol version.
2407     if (!_rsConfig.isInitialized() ||
2408         _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) {
2409         if (newConfig.getProtocolVersion() == 1) {
2410             _term = OpTime::kInitialTerm;
2411         } else {
2412             invariant(newConfig.getProtocolVersion() == 0);
2413             _term = OpTime::kUninitializedTerm;
2414         }
2415         LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config";
2416     }
2417 
2418     _updateHeartbeatDataForReconfig(newConfig, selfIndex, now);
2419     _rsConfig = newConfig;
2420     _selfIndex = selfIndex;
2421     _forceSyncSourceIndex = -1;
2422 
2423     if (_role == Role::kLeader) {
2424         if (_selfIndex == -1) {
2425             log() << "Could not remain primary because no longer a member of the replica set";
2426         } else if (!_selfConfig().isElectable()) {
2427             log() << " Could not remain primary because no longer electable";
2428         } else {
2429             // Don't stepdown if you don't have to.
2430             _currentPrimaryIndex = _selfIndex;
2431             return;
2432         }
2433         _role = Role::kFollower;
2434         _setLeaderMode(LeaderMode::kNotLeader);
2435     }
2436 
2437     // By this point we know we are in Role::kFollower
2438     _currentPrimaryIndex = -1;  // force secondaries to re-detect who the primary is
2439 
2440     if (_isElectableNodeInSingleNodeReplicaSet()) {
2441         // If the new config describes a one-node replica set, we're the one member,
2442         // we're electable, we're not in maintenance mode and we are currently in followerMode
2443         // SECONDARY, we must transition to candidate, in leiu of heartbeats.
2444         _role = Role::kCandidate;
2445     }
2446 }
_getHbmsg(Date_t now) const2447 std::string TopologyCoordinator::_getHbmsg(Date_t now) const {
2448     // ignore messages over 2 minutes old
2449     if ((now - _hbmsgTime) > Seconds{120}) {
2450         return "";
2451     }
2452     return _hbmsg;
2453 }
2454 
setMyHeartbeatMessage(const Date_t now,const std::string & message)2455 void TopologyCoordinator::setMyHeartbeatMessage(const Date_t now, const std::string& message) {
2456     _hbmsgTime = now;
2457     _hbmsg = message;
2458 }
2459 
_selfConfig() const2460 const MemberConfig& TopologyCoordinator::_selfConfig() const {
2461     return _rsConfig.getMemberAt(_selfIndex);
2462 }
2463 
_selfMemberData() const2464 const MemberData& TopologyCoordinator::_selfMemberData() const {
2465     return _memberData[_selfMemberDataIndex()];
2466 }
2467 
_selfMemberDataIndex() const2468 const int TopologyCoordinator::_selfMemberDataIndex() const {
2469     invariant(!_memberData.empty());
2470     if (_selfIndex >= 0)
2471         return _selfIndex;
2472     // In master-slave mode, the first entry is for self.  If there is no config
2473     // or we're not in the config, the first-and-only entry should be for self.
2474     return 0;
2475 }
2476 
_getUnelectableReason(int index) const2477 TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getUnelectableReason(
2478     int index) const {
2479     invariant(index != _selfIndex);
2480     const MemberConfig& memberConfig = _rsConfig.getMemberAt(index);
2481     const MemberData& hbData = _memberData.at(index);
2482     UnelectableReasonMask result = None;
2483     if (memberConfig.isArbiter()) {
2484         result |= ArbiterIAm;
2485     }
2486     if (memberConfig.getPriority() <= 0) {
2487         result |= NoPriority;
2488     }
2489     if (hbData.getState() != MemberState::RS_SECONDARY) {
2490         result |= NotSecondary;
2491     }
2492     if (_rsConfig.getProtocolVersion() == 0 &&
2493         !_isOpTimeCloseEnoughToLatestToElect(hbData.getHeartbeatAppliedOpTime())) {
2494         result |= NotCloseEnoughToLatestOptime;
2495     }
2496     if (hbData.up() && hbData.isUnelectable()) {
2497         result |= RefusesToStand;
2498     }
2499     invariant(result || memberConfig.isElectable());
2500     return result;
2501 }
2502 
_getMyUnelectableReason(const Date_t now,StartElectionReason reason) const2503 TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getMyUnelectableReason(
2504     const Date_t now, StartElectionReason reason) const {
2505     UnelectableReasonMask result = None;
2506     const OpTime lastApplied = getMyLastAppliedOpTime();
2507     if (lastApplied.isNull()) {
2508         result |= NoData;
2509     }
2510     if (!_aMajoritySeemsToBeUp()) {
2511         result |= CannotSeeMajority;
2512     }
2513     if (_selfIndex == -1) {
2514         result |= NotInitialized;
2515         return result;
2516     }
2517     if (_selfConfig().isArbiter()) {
2518         result |= ArbiterIAm;
2519     }
2520     if (_selfConfig().getPriority() <= 0) {
2521         result |= NoPriority;
2522     }
2523     if (_stepDownUntil > now) {
2524         result |= StepDownPeriodActive;
2525     }
2526 
2527     // Cannot be electable unless secondary or already primary
2528     if (!getMemberState().secondary() && !_iAmPrimary()) {
2529         result |= NotSecondary;
2530     }
2531 
2532     if (_rsConfig.getProtocolVersion() == 0) {
2533         // Election rules only for protocol version 0.
2534         if (_voteLease.whoId != -1 &&
2535             _voteLease.whoId != _rsConfig.getMemberAt(_selfIndex).getId() &&
2536             _voteLease.when + VoteLease::leaseTime >= now) {
2537             result |= VotedTooRecently;
2538         }
2539         if (!_isOpTimeCloseEnoughToLatestToElect(lastApplied)) {
2540             result |= NotCloseEnoughToLatestOptime;
2541         }
2542     } else {
2543         // Election rules only for protocol version 1.
2544         invariant(_rsConfig.getProtocolVersion() == 1);
2545         if (reason == StartElectionReason::kPriorityTakeover &&
2546             !_amIFreshEnoughForPriorityTakeover()) {
2547             result |= NotCloseEnoughToLatestForPriorityTakeover;
2548         }
2549 
2550         if (reason == StartElectionReason::kCatchupTakeover &&
2551             !_amIFreshEnoughForCatchupTakeover()) {
2552             result |= NotFreshEnoughForCatchupTakeover;
2553         }
2554     }
2555     return result;
2556 }
2557 
_getUnelectableReasonString(const UnelectableReasonMask ur) const2558 std::string TopologyCoordinator::_getUnelectableReasonString(const UnelectableReasonMask ur) const {
2559     invariant(ur);
2560     str::stream ss;
2561     bool hasWrittenToStream = false;
2562     if (ur & NoData) {
2563         ss << "node has no applied oplog entries";
2564         hasWrittenToStream = true;
2565     }
2566     if (ur & VotedTooRecently) {
2567         if (hasWrittenToStream) {
2568             ss << "; ";
2569         }
2570         hasWrittenToStream = true;
2571         ss << "I recently voted for " << _voteLease.whoHostAndPort.toString();
2572     }
2573     if (ur & CannotSeeMajority) {
2574         if (hasWrittenToStream) {
2575             ss << "; ";
2576         }
2577         hasWrittenToStream = true;
2578         ss << "I cannot see a majority";
2579     }
2580     if (ur & ArbiterIAm) {
2581         if (hasWrittenToStream) {
2582             ss << "; ";
2583         }
2584         hasWrittenToStream = true;
2585         ss << "member is an arbiter";
2586     }
2587     if (ur & NoPriority) {
2588         if (hasWrittenToStream) {
2589             ss << "; ";
2590         }
2591         hasWrittenToStream = true;
2592         ss << "member has zero priority";
2593     }
2594     if (ur & StepDownPeriodActive) {
2595         if (hasWrittenToStream) {
2596             ss << "; ";
2597         }
2598         hasWrittenToStream = true;
2599         ss << "I am still waiting for stepdown period to end at "
2600            << dateToISOStringLocal(_stepDownUntil);
2601     }
2602     if (ur & NotSecondary) {
2603         if (hasWrittenToStream) {
2604             ss << "; ";
2605         }
2606         hasWrittenToStream = true;
2607         ss << "member is not currently a secondary";
2608     }
2609     if (ur & NotCloseEnoughToLatestOptime) {
2610         if (hasWrittenToStream) {
2611             ss << "; ";
2612         }
2613         hasWrittenToStream = true;
2614         ss << "member is more than 10 seconds behind the most up-to-date member";
2615     }
2616     if (ur & NotCloseEnoughToLatestForPriorityTakeover) {
2617         if (hasWrittenToStream) {
2618             ss << "; ";
2619         }
2620         hasWrittenToStream = true;
2621         ss << "member is not caught up enough to the most up-to-date member to call for priority "
2622               "takeover - must be within "
2623            << priorityTakeoverFreshnessWindowSeconds << " seconds";
2624     }
2625     if (ur & NotFreshEnoughForCatchupTakeover) {
2626         if (hasWrittenToStream) {
2627             ss << "; ";
2628         }
2629         hasWrittenToStream = true;
2630         ss << "member is either not the most up-to-date member or not ahead of the primary, and "
2631               "therefore cannot call for catchup takeover";
2632     }
2633     if (ur & NotInitialized) {
2634         if (hasWrittenToStream) {
2635             ss << "; ";
2636         }
2637         hasWrittenToStream = true;
2638         ss << "node is not a member of a valid replica set configuration";
2639     }
2640     if (ur & RefusesToStand) {
2641         if (hasWrittenToStream) {
2642             ss << "; ";
2643         }
2644         hasWrittenToStream = true;
2645         ss << "most recent heartbeat indicates node will not stand for election";
2646     }
2647     if (!hasWrittenToStream) {
2648         severe() << "Invalid UnelectableReasonMask value 0x" << integerToHex(ur);
2649         fassertFailed(26011);
2650     }
2651     ss << " (mask 0x" << integerToHex(ur) << ")";
2652     return ss;
2653 }
2654 
_getPing(const HostAndPort & host)2655 Milliseconds TopologyCoordinator::_getPing(const HostAndPort& host) {
2656     return _pings[host].getMillis();
2657 }
2658 
_setElectionTime(const Timestamp & newElectionTime)2659 void TopologyCoordinator::_setElectionTime(const Timestamp& newElectionTime) {
2660     _electionTime = newElectionTime;
2661 }
2662 
_getTotalPings()2663 int TopologyCoordinator::_getTotalPings() {
2664     PingMap::iterator it = _pings.begin();
2665     PingMap::iterator end = _pings.end();
2666     int totalPings = 0;
2667     while (it != end) {
2668         totalPings += it->second.getCount();
2669         it++;
2670     }
2671     return totalPings;
2672 }
2673 
getMaybeUpHostAndPorts() const2674 std::vector<HostAndPort> TopologyCoordinator::getMaybeUpHostAndPorts() const {
2675     std::vector<HostAndPort> upHosts;
2676     for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
2677          ++it) {
2678         const int itIndex = indexOfIterator(_memberData, it);
2679         if (itIndex == _selfIndex) {
2680             continue;  // skip ourselves
2681         }
2682         if (!it->maybeUp()) {
2683             continue;  // skip DOWN nodes
2684         }
2685 
2686         upHosts.push_back(_rsConfig.getMemberAt(itIndex).getHostAndPort());
2687     }
2688     return upHosts;
2689 }
2690 
voteForMyself(Date_t now)2691 bool TopologyCoordinator::voteForMyself(Date_t now) {
2692     if (_role != Role::kCandidate) {
2693         return false;
2694     }
2695     int selfId = _selfConfig().getId();
2696     if ((_voteLease.when + VoteLease::leaseTime >= now) && (_voteLease.whoId != selfId)) {
2697         log() << "not voting yea for " << selfId << " voted for "
2698               << _voteLease.whoHostAndPort.toString() << ' '
2699               << durationCount<Seconds>(now - _voteLease.when) << " secs ago";
2700         return false;
2701     }
2702     _voteLease.when = now;
2703     _voteLease.whoId = selfId;
2704     _voteLease.whoHostAndPort = _selfConfig().getHostAndPort();
2705     return true;
2706 }
2707 
isSteppingDown() const2708 bool TopologyCoordinator::isSteppingDown() const {
2709     return _leaderMode == LeaderMode::kAttemptingStepDown ||
2710         _leaderMode == LeaderMode::kSteppingDown;
2711 }
2712 
_setLeaderMode(TopologyCoordinator::LeaderMode newMode)2713 void TopologyCoordinator::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) {
2714     // Invariants for valid state transitions.
2715     switch (_leaderMode) {
2716         case LeaderMode::kNotLeader:
2717             invariant(newMode == LeaderMode::kLeaderElect);
2718             break;
2719         case LeaderMode::kLeaderElect:
2720             invariant(newMode == LeaderMode::kNotLeader ||  // TODO(SERVER-30852): remove this case
2721                       newMode == LeaderMode::kMaster ||
2722                       newMode == LeaderMode::kAttemptingStepDown ||
2723                       newMode == LeaderMode::kSteppingDown);
2724             break;
2725         case LeaderMode::kMaster:
2726             invariant(newMode == LeaderMode::kNotLeader ||  // TODO(SERVER-30852): remove this case
2727                       newMode == LeaderMode::kAttemptingStepDown ||
2728                       newMode == LeaderMode::kSteppingDown);
2729             break;
2730         case LeaderMode::kAttemptingStepDown:
2731             invariant(newMode == LeaderMode::kNotLeader || newMode == LeaderMode::kMaster ||
2732                       newMode == LeaderMode::kSteppingDown || newMode == LeaderMode::kLeaderElect);
2733             break;
2734         case LeaderMode::kSteppingDown:
2735             invariant(newMode == LeaderMode::kNotLeader);
2736             break;
2737     }
2738     _leaderMode = std::move(newMode);
2739 }
2740 
getMemberState() const2741 MemberState TopologyCoordinator::getMemberState() const {
2742     if (_selfIndex == -1) {
2743         if (_rsConfig.isInitialized()) {
2744             return MemberState::RS_REMOVED;
2745         }
2746         return MemberState::RS_STARTUP;
2747     }
2748 
2749     if (_rsConfig.isConfigServer()) {
2750         if (_options.clusterRole != ClusterRole::ConfigServer && !skipShardingConfigurationChecks) {
2751             return MemberState::RS_REMOVED;
2752         } else {
2753             invariant(_storageEngineSupportsReadCommitted != ReadCommittedSupport::kUnknown);
2754             if (_storageEngineSupportsReadCommitted == ReadCommittedSupport::kNo) {
2755                 return MemberState::RS_REMOVED;
2756             }
2757         }
2758     } else {
2759         if (_options.clusterRole == ClusterRole::ConfigServer && !skipShardingConfigurationChecks) {
2760             return MemberState::RS_REMOVED;
2761         }
2762     }
2763 
2764     if (_role == Role::kLeader) {
2765         invariant(_currentPrimaryIndex == _selfIndex);
2766         invariant(_leaderMode != LeaderMode::kNotLeader);
2767         return MemberState::RS_PRIMARY;
2768     }
2769     const MemberConfig& myConfig = _selfConfig();
2770     if (myConfig.isArbiter()) {
2771         return MemberState::RS_ARBITER;
2772     }
2773     if (((_maintenanceModeCalls > 0) || (_hasOnlyAuthErrorUpHeartbeats(_memberData, _selfIndex))) &&
2774         (_followerMode == MemberState::RS_SECONDARY)) {
2775         return MemberState::RS_RECOVERING;
2776     }
2777     return _followerMode;
2778 }
2779 
canAcceptWrites() const2780 bool TopologyCoordinator::canAcceptWrites() const {
2781     return _leaderMode == LeaderMode::kMaster;
2782 }
2783 
setElectionInfo(OID electionId,Timestamp electionOpTime)2784 void TopologyCoordinator::setElectionInfo(OID electionId, Timestamp electionOpTime) {
2785     invariant(_role == Role::kLeader);
2786     _electionTime = electionOpTime;
2787     _electionId = electionId;
2788 }
2789 
processWinElection(OID electionId,Timestamp electionOpTime)2790 void TopologyCoordinator::processWinElection(OID electionId, Timestamp electionOpTime) {
2791     invariant(_role == Role::kCandidate);
2792     invariant(_leaderMode == LeaderMode::kNotLeader);
2793     _role = Role::kLeader;
2794     _setLeaderMode(LeaderMode::kLeaderElect);
2795     setElectionInfo(electionId, electionOpTime);
2796     _currentPrimaryIndex = _selfIndex;
2797     _syncSource = HostAndPort();
2798     _forceSyncSourceIndex = -1;
2799     // Prevent last committed optime from updating until we finish draining.
2800     _firstOpTimeOfMyTerm =
2801         OpTime(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<int>::max());
2802 }
2803 
processLoseElection()2804 void TopologyCoordinator::processLoseElection() {
2805     invariant(_role == Role::kCandidate);
2806     invariant(_leaderMode == LeaderMode::kNotLeader);
2807     const HostAndPort syncSourceAddress = getSyncSourceAddress();
2808     _electionTime = Timestamp(0, 0);
2809     _electionId = OID();
2810     _role = Role::kFollower;
2811 
2812     // Clear voteLease time, if we voted for ourselves in this election.
2813     // This will allow us to vote for others.
2814     if (_voteLease.whoId == _selfConfig().getId()) {
2815         _voteLease.when = Date_t();
2816     }
2817 }
2818 
attemptStepDown(long long termAtStart,Date_t now,Date_t waitUntil,Date_t stepDownUntil,bool force)2819 bool TopologyCoordinator::attemptStepDown(
2820     long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) {
2821 
2822     if (_role != Role::kLeader || _leaderMode == LeaderMode::kSteppingDown ||
2823         _term != termAtStart) {
2824         uasserted(ErrorCodes::PrimarySteppedDown,
2825                   "While waiting for secondaries to catch up before stepping down, "
2826                   "this node decided to step down for other reasons");
2827     }
2828     invariant(_leaderMode == LeaderMode::kAttemptingStepDown);
2829 
2830     if (now >= stepDownUntil) {
2831         uasserted(ErrorCodes::ExceededTimeLimit,
2832                   "By the time we were ready to step down, we were already past the "
2833                   "time we were supposed to step down until");
2834     }
2835 
2836     if (!_canCompleteStepDownAttempt(now, waitUntil, force)) {
2837         // Stepdown attempt failed.
2838 
2839         // Check waitUntil after at least one stepdown attempt, so that stepdown could succeed even
2840         // if secondaryCatchUpPeriodSecs == 0.
2841         if (now >= waitUntil) {
2842             uasserted(ErrorCodes::ExceededTimeLimit,
2843                       str::stream() << "No electable secondaries caught up as of "
2844                                     << dateToISOStringLocal(now)
2845                                     << "Please use the replSetStepDown command with the argument "
2846                                     << "{force: true} to force node to step down.");
2847         }
2848 
2849         // Stepdown attempt failed, but in a way that can be retried
2850         return false;
2851     }
2852 
2853     // Stepdown attempt success!
2854     _stepDownUntil = stepDownUntil;
2855     _stepDownSelfAndReplaceWith(-1);
2856     return true;
2857 }
2858 
_canCompleteStepDownAttempt(Date_t now,Date_t waitUntil,bool force)2859 bool TopologyCoordinator::_canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force) {
2860     const bool forceNow = force && (now >= waitUntil);
2861     if (forceNow) {
2862         return true;
2863     }
2864 
2865     return isSafeToStepDown();
2866 }
2867 
_isCaughtUpAndElectable(int memberIndex,OpTime lastApplied)2868 bool TopologyCoordinator::_isCaughtUpAndElectable(int memberIndex, OpTime lastApplied) {
2869     if (_getUnelectableReason(memberIndex)) {
2870         return false;
2871     }
2872 
2873     return (_memberData.at(memberIndex).getLastAppliedOpTime() >= lastApplied);
2874 }
2875 
isSafeToStepDown()2876 bool TopologyCoordinator::isSafeToStepDown() {
2877     if (!_rsConfig.isInitialized() || _selfIndex < 0) {
2878         return false;
2879     }
2880 
2881     OpTime lastApplied = getMyLastAppliedOpTime();
2882 
2883     auto tagStatus = _rsConfig.findCustomWriteMode(ReplSetConfig::kMajorityWriteConcernModeName);
2884     invariant(tagStatus.isOK());
2885 
2886     // Check if a majority of nodes have reached the last applied optime.
2887     if (!haveTaggedNodesReachedOpTime(lastApplied, tagStatus.getValue(), false)) {
2888         return false;
2889     }
2890 
2891     // Now check that we also have at least one caught up node that is electable.
2892     for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) {
2893         // ignore your self
2894         if (memberIndex == _selfIndex) {
2895             continue;
2896         }
2897         if (_isCaughtUpAndElectable(memberIndex, lastApplied)) {
2898             return true;
2899         }
2900     }
2901 
2902     return false;
2903 }
2904 
chooseElectionHandoffCandidate()2905 int TopologyCoordinator::chooseElectionHandoffCandidate() {
2906 
2907     OpTime lastApplied = getMyLastAppliedOpTime();
2908 
2909     int bestCandidateIndex = -1;
2910     int highestPriority = -1;
2911 
2912     for (int memberIndex = 0; memberIndex < _rsConfig.getNumMembers(); memberIndex++) {
2913 
2914         // Skip your own member index.
2915         if (memberIndex == _selfIndex) {
2916             continue;
2917         }
2918 
2919         // Skip this node if it is not eligible to become primary. This includes nodes with
2920         // priority 0.
2921         if (!_isCaughtUpAndElectable(memberIndex, lastApplied)) {
2922             continue;
2923         }
2924 
2925         // Only update best if priority is strictly greater. This guarantees that
2926         // we will pick the member with the lowest index in case of a tie. Note that
2927         // member priority is always a non-negative number.
2928         auto memberPriority = _rsConfig.getMemberAt(memberIndex).getPriority();
2929         if (memberPriority > highestPriority) {
2930             bestCandidateIndex = memberIndex;
2931             highestPriority = memberPriority;
2932         }
2933     }
2934 
2935     // This is the most suitable node.
2936     return bestCandidateIndex;
2937 }
2938 
setFollowerMode(MemberState::MS newMode)2939 void TopologyCoordinator::setFollowerMode(MemberState::MS newMode) {
2940     invariant(_role == Role::kFollower);
2941     switch (newMode) {
2942         case MemberState::RS_RECOVERING:
2943         case MemberState::RS_ROLLBACK:
2944         case MemberState::RS_SECONDARY:
2945         case MemberState::RS_STARTUP2:
2946             _followerMode = newMode;
2947             break;
2948         default:
2949             invariant(false);
2950     }
2951 
2952     if (_followerMode != MemberState::RS_SECONDARY) {
2953         return;
2954     }
2955 
2956     // When a single node replica set transitions to SECONDARY, we must check if we should
2957     // be a candidate here.  This is necessary because a single node replica set has no
2958     // heartbeats that would normally change the role to candidate.
2959 
2960     if (_isElectableNodeInSingleNodeReplicaSet()) {
2961         _role = Role::kCandidate;
2962     }
2963 }
2964 
_isElectableNodeInSingleNodeReplicaSet() const2965 bool TopologyCoordinator::_isElectableNodeInSingleNodeReplicaSet() const {
2966     return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 &&
2967         _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() &&
2968         _maintenanceModeCalls == 0;
2969 }
2970 
finishUnconditionalStepDown()2971 void TopologyCoordinator::finishUnconditionalStepDown() {
2972     invariant(_leaderMode == LeaderMode::kSteppingDown);
2973 
2974     int remotePrimaryIndex = -1;
2975     for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
2976          ++it) {
2977         const int itIndex = indexOfIterator(_memberData, it);
2978         if (itIndex == _selfIndex) {
2979             continue;
2980         }
2981 
2982         if (it->getState().primary() && it->up()) {
2983             if (remotePrimaryIndex != -1) {
2984                 // two other nodes think they are primary (asynchronously polled)
2985                 // -- wait for things to settle down.
2986                 remotePrimaryIndex = -1;
2987                 warning() << "two remote primaries (transiently)";
2988                 break;
2989             }
2990             remotePrimaryIndex = itIndex;
2991         }
2992     }
2993     _stepDownSelfAndReplaceWith(remotePrimaryIndex);
2994 }
2995 
_stepDownSelfAndReplaceWith(int newPrimary)2996 void TopologyCoordinator::_stepDownSelfAndReplaceWith(int newPrimary) {
2997     invariant(_role == Role::kLeader);
2998     invariant(_selfIndex != -1);
2999     invariant(_selfIndex != newPrimary);
3000     invariant(_selfIndex == _currentPrimaryIndex);
3001     _currentPrimaryIndex = newPrimary;
3002     _role = Role::kFollower;
3003     _setLeaderMode(LeaderMode::kNotLeader);
3004 }
3005 
updateLastCommittedOpTime()3006 bool TopologyCoordinator::updateLastCommittedOpTime() {
3007     // If we're not primary or we're stepping down due to learning of a new term then  we must not
3008     // advance the commit point.  If we are stepping down due to a user request, however, then it
3009     // is safe to advance the commit point, and in fact we must since the stepdown request may be
3010     // waiting for the commit point to advance enough to be able to safely complete the step down.
3011     if (!_iAmPrimary() || _leaderMode == LeaderMode::kSteppingDown) {
3012         return false;
3013     }
3014 
3015     // Whether we use the applied or durable OpTime for the commit point is decided here.
3016     const bool useDurableOpTime = _rsConfig.getWriteConcernMajorityShouldJournal();
3017 
3018     std::vector<OpTime> votingNodesOpTimes;
3019     for (const auto& memberData : _memberData) {
3020         int memberIndex = memberData.getConfigIndex();
3021         invariant(memberIndex >= 0);
3022         const auto& memberConfig = _rsConfig.getMemberAt(memberIndex);
3023         if (memberConfig.isVoter()) {
3024             const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime()
3025                                                  : memberData.getLastAppliedOpTime();
3026             votingNodesOpTimes.push_back(opTime);
3027         }
3028     }
3029 
3030     invariant(votingNodesOpTimes.size() > 0);
3031     if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) {
3032         return false;
3033     }
3034     std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
3035 
3036     // need the majority to have this OpTime
3037     OpTime committedOpTime =
3038         votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
3039     return advanceLastCommittedOpTime(committedOpTime);
3040 }
3041 
advanceLastCommittedOpTime(OpTime committedOpTime)3042 bool TopologyCoordinator::advanceLastCommittedOpTime(OpTime committedOpTime) {
3043     if (_selfIndex == -1) {
3044         // The config hasn't been installed or we are not in the config. This could happen
3045         // on heartbeats before installing a config.
3046         return false;
3047     }
3048 
3049     // This check is performed to ensure primaries do not commit an OpTime from a previous term.
3050     if (_iAmPrimary() && committedOpTime < _firstOpTimeOfMyTerm) {
3051         LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: "
3052                << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm;
3053         return false;
3054     }
3055 
3056     // Arbiters don't have data so they always advance their commit point via heartbeats.
3057     if (!_selfConfig().isArbiter() &&
3058         getMyLastAppliedOpTime().getTerm() != committedOpTime.getTerm()) {
3059         committedOpTime = std::min(committedOpTime, getMyLastAppliedOpTime());
3060     }
3061 
3062     if (committedOpTime == _lastCommittedOpTime) {
3063         return false;  // Hasn't changed, so ignore it.
3064     }
3065 
3066     if (committedOpTime < _lastCommittedOpTime) {
3067         LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime
3068                << ", currentCommittedOpTime: " << _lastCommittedOpTime;
3069         return false;
3070     }
3071 
3072     LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime;
3073     _lastCommittedOpTime = committedOpTime;
3074     return true;
3075 }
3076 
resetLastCommittedOpTime()3077 void TopologyCoordinator::resetLastCommittedOpTime() {
3078     _lastCommittedOpTime = OpTime();
3079 }
3080 
getLastCommittedOpTime() const3081 OpTime TopologyCoordinator::getLastCommittedOpTime() const {
3082     return _lastCommittedOpTime;
3083 }
3084 
canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const3085 bool TopologyCoordinator::canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const {
3086 
3087     if (termWhenDrainCompleted != _term) {
3088         return false;
3089     }
3090     // Allow completing the transition to primary even when in the middle of a stepdown attempt,
3091     // in case the stepdown attempt fails.
3092     // Allow calling this function even if the node is already primary on PV upgrade.
3093     if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown &&
3094         _leaderMode != LeaderMode::kMaster) {
3095         return false;
3096     }
3097 
3098     return true;
3099 }
3100 
completeTransitionToPrimary(const OpTime & firstOpTimeOfTerm)3101 Status TopologyCoordinator::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) {
3102     if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) {
3103         return Status(ErrorCodes::PrimarySteppedDown,
3104                       "By the time this node was ready to complete its transition to PRIMARY it "
3105                       "was no longer eligible to do so");
3106     }
3107     if (_leaderMode == LeaderMode::kLeaderElect) {
3108         _setLeaderMode(LeaderMode::kMaster);
3109     }
3110     _firstOpTimeOfMyTerm = firstOpTimeOfTerm;
3111     return Status::OK();
3112 }
3113 
adjustMaintenanceCountBy(int inc)3114 void TopologyCoordinator::adjustMaintenanceCountBy(int inc) {
3115     invariant(_role == Role::kFollower);
3116     _maintenanceModeCalls += inc;
3117     invariant(_maintenanceModeCalls >= 0);
3118 }
3119 
getMaintenanceCount() const3120 int TopologyCoordinator::getMaintenanceCount() const {
3121     return _maintenanceModeCalls;
3122 }
3123 
updateTerm(long long term,Date_t now)3124 TopologyCoordinator::UpdateTermResult TopologyCoordinator::updateTerm(long long term, Date_t now) {
3125     if (term <= _term) {
3126         return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate;
3127     }
3128     // Don't run election if we just stood up or learned about a new term.
3129     _electionSleepUntil = now + _rsConfig.getElectionTimeoutPeriod();
3130 
3131     // Don't update the term just yet if we are going to step down, as we don't want to report
3132     // that we are primary in the new term.
3133     if (_iAmPrimary()) {
3134         return TopologyCoordinator::UpdateTermResult::kTriggerStepDown;
3135     }
3136     LOG(1) << "Updating term from " << _term << " to " << term;
3137     _term = term;
3138     return TopologyCoordinator::UpdateTermResult::kUpdatedTerm;
3139 }
3140 
3141 
getTerm() const3142 long long TopologyCoordinator::getTerm() const {
3143     return _term;
3144 }
3145 
3146 // TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the
3147 // replset. Passing metadata is unnecessary.
shouldChangeSyncSource(const HostAndPort & currentSource,const rpc::ReplSetMetadata & replMetadata,boost::optional<rpc::OplogQueryMetadata> oqMetadata,Date_t now) const3148 bool TopologyCoordinator::shouldChangeSyncSource(
3149     const HostAndPort& currentSource,
3150     const rpc::ReplSetMetadata& replMetadata,
3151     boost::optional<rpc::OplogQueryMetadata> oqMetadata,
3152     Date_t now) const {
3153     // Methodology:
3154     // If there exists a viable sync source member other than currentSource, whose oplog has
3155     // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's,
3156     // return true.
3157     // If the currentSource has the same replication progress as we do and has no source for further
3158     // progress, return true.
3159 
3160     if (_selfIndex == -1) {
3161         log() << "Not choosing new sync source because we are not in the config.";
3162         return false;
3163     }
3164 
3165     // If the user requested a sync source change, return true.
3166     if (_forceSyncSourceIndex != -1) {
3167         log() << "Choosing new sync source because the user has requested to use "
3168               << _rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort()
3169               << " as a sync source";
3170         return true;
3171     }
3172 
3173     if (_rsConfig.getProtocolVersion() == 1 &&
3174         replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
3175         log() << "Choosing new sync source because the config version supplied by " << currentSource
3176               << ", " << replMetadata.getConfigVersion() << ", does not match ours, "
3177               << _rsConfig.getConfigVersion();
3178         return true;
3179     }
3180 
3181     const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource);
3182     // PV0 doesn't use metadata, we have to consult _rsConfig.
3183     if (currentSourceIndex == -1) {
3184         log() << "Choosing new sync source because " << currentSource.toString()
3185               << " is not in our config";
3186         return true;
3187     }
3188 
3189     invariant(currentSourceIndex != _selfIndex);
3190 
3191     // If OplogQueryMetadata was provided, use its values, otherwise use the ones in
3192     // ReplSetMetadata.
3193     OpTime currentSourceOpTime;
3194     int syncSourceIndex = -1;
3195     int primaryIndex = -1;
3196     if (oqMetadata) {
3197         currentSourceOpTime =
3198             std::max(oqMetadata->getLastOpApplied(),
3199                      _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
3200         syncSourceIndex = oqMetadata->getSyncSourceIndex();
3201         primaryIndex = oqMetadata->getPrimaryIndex();
3202     } else {
3203         currentSourceOpTime =
3204             std::max(replMetadata.getLastOpVisible(),
3205                      _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime());
3206         syncSourceIndex = replMetadata.getSyncSourceIndex();
3207         primaryIndex = replMetadata.getPrimaryIndex();
3208     }
3209 
3210     if (currentSourceOpTime.isNull()) {
3211         // Haven't received a heartbeat from the sync source yet, so can't tell if we should
3212         // change.
3213         return false;
3214     }
3215 
3216     // Change sync source if they are not ahead of us, and don't have a sync source,
3217     // unless they are primary.
3218     const OpTime myLastOpTime = getMyLastAppliedOpTime();
3219     if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 &&
3220         currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) {
3221         std::stringstream logMessage;
3222         logMessage << "Choosing new sync source because our current sync source, "
3223                    << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime
3224                    << ") which is not ahead of ours (" << myLastOpTime
3225                    << "), it does not have a sync source, and it's not the primary";
3226         if (primaryIndex >= 0) {
3227             logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)";
3228         } else {
3229             logMessage << " (sync source does not know the primary)";
3230         }
3231         log() << logMessage.str();
3232         return true;
3233     }
3234 
3235     if (MONGO_FAIL_POINT(disableMaxSyncSourceLagSecs)) {
3236         log() << "disableMaxSyncSourceLagSecs fail point enabled - not checking the most recent "
3237                  "OpTime, "
3238               << currentSourceOpTime.toString() << ", of our current sync source, " << currentSource
3239               << ", against the OpTimes of the other nodes in this replica set.";
3240     } else {
3241         unsigned int currentSecs = currentSourceOpTime.getSecs();
3242         unsigned int goalSecs = currentSecs + durationCount<Seconds>(_options.maxSyncSourceLagSecs);
3243 
3244         for (std::vector<MemberData>::const_iterator it = _memberData.begin();
3245              it != _memberData.end();
3246              ++it) {
3247             const int itIndex = indexOfIterator(_memberData, it);
3248             const MemberConfig& candidateConfig = _rsConfig.getMemberAt(itIndex);
3249             if (it->up() && (candidateConfig.isVoter() || !_selfConfig().isVoter()) &&
3250                 (candidateConfig.shouldBuildIndexes() || !_selfConfig().shouldBuildIndexes()) &&
3251                 it->getState().readable() && !_memberIsBlacklisted(candidateConfig, now) &&
3252                 goalSecs < it->getHeartbeatAppliedOpTime().getSecs()) {
3253                 log() << "Choosing new sync source because the most recent OpTime of our sync "
3254                          "source, "
3255                       << currentSource << ", is " << currentSourceOpTime.toString()
3256                       << " which is more than " << _options.maxSyncSourceLagSecs
3257                       << " behind member " << candidateConfig.getHostAndPort().toString()
3258                       << " whose most recent OpTime is "
3259                       << it->getHeartbeatAppliedOpTime().toString();
3260                 invariant(itIndex != _selfIndex);
3261                 return true;
3262             }
3263         }
3264     }
3265 
3266     return false;
3267 }
3268 
prepareReplSetMetadata(const OpTime & lastVisibleOpTime) const3269 rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata(
3270     const OpTime& lastVisibleOpTime) const {
3271     return rpc::ReplSetMetadata(_term,
3272                                 _lastCommittedOpTime,
3273                                 lastVisibleOpTime,
3274                                 _rsConfig.getConfigVersion(),
3275                                 _rsConfig.getReplicaSetId(),
3276                                 _currentPrimaryIndex,
3277                                 _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
3278 }
3279 
prepareOplogQueryMetadata(int rbid) const3280 rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) const {
3281     return rpc::OplogQueryMetadata(_lastCommittedOpTime,
3282                                    getMyLastAppliedOpTime(),
3283                                    rbid,
3284                                    _currentPrimaryIndex,
3285                                    _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
3286 }
3287 
summarizeAsHtml(ReplSetHtmlSummary * output)3288 void TopologyCoordinator::summarizeAsHtml(ReplSetHtmlSummary* output) {
3289     output->setConfig(_rsConfig);
3290     output->setHBData(_memberData);
3291     output->setSelfIndex(_selfIndex);
3292     output->setPrimaryIndex(_currentPrimaryIndex);
3293     output->setSelfState(getMemberState());
3294     output->setSelfHeartbeatMessage(_hbmsg);
3295 }
3296 
processReplSetRequestVotes(const ReplSetRequestVotesArgs & args,ReplSetRequestVotesResponse * response)3297 void TopologyCoordinator::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
3298                                                      ReplSetRequestVotesResponse* response) {
3299     response->setTerm(_term);
3300 
3301     if (args.getTerm() < _term) {
3302         response->setVoteGranted(false);
3303         response->setReason(str::stream() << "candidate's term (" << args.getTerm()
3304                                           << ") is lower than mine ("
3305                                           << _term
3306                                           << ")");
3307     } else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) {
3308         response->setVoteGranted(false);
3309         response->setReason(str::stream() << "candidate's config version ("
3310                                           << args.getConfigVersion()
3311                                           << ") differs from mine ("
3312                                           << _rsConfig.getConfigVersion()
3313                                           << ")");
3314     } else if (args.getSetName() != _rsConfig.getReplSetName()) {
3315         response->setVoteGranted(false);
3316         response->setReason(str::stream() << "candidate's set name (" << args.getSetName()
3317                                           << ") differs from mine ("
3318                                           << _rsConfig.getReplSetName()
3319                                           << ")");
3320     } else if (args.getLastDurableOpTime() < getMyLastAppliedOpTime()) {
3321         response->setVoteGranted(false);
3322         response
3323             ->setReason(str::stream()
3324                         << "candidate's data is staler than mine. candidate's last applied OpTime: "
3325                         << args.getLastDurableOpTime().toString()
3326                         << ", my last applied OpTime: "
3327                         << getMyLastAppliedOpTime().toString());
3328     } else if (!args.isADryRun() && _lastVote.getTerm() == args.getTerm()) {
3329         response->setVoteGranted(false);
3330         response->setReason(str::stream()
3331                             << "already voted for another candidate ("
3332                             << _rsConfig.getMemberAt(_lastVote.getCandidateIndex()).getHostAndPort()
3333                             << ") this term ("
3334                             << _lastVote.getTerm()
3335                             << ")");
3336     } else {
3337         int betterPrimary = _findHealthyPrimaryOfEqualOrGreaterPriority(args.getCandidateIndex());
3338         if (_selfConfig().isArbiter() && betterPrimary >= 0) {
3339             response->setVoteGranted(false);
3340             response->setReason(str::stream()
3341                                 << "can see a healthy primary ("
3342                                 << _rsConfig.getMemberAt(betterPrimary).getHostAndPort()
3343                                 << ") of equal or greater priority");
3344         } else {
3345             if (!args.isADryRun()) {
3346                 _lastVote.setTerm(args.getTerm());
3347                 _lastVote.setCandidateIndex(args.getCandidateIndex());
3348             }
3349             response->setVoteGranted(true);
3350         }
3351     }
3352 }
3353 
loadLastVote(const LastVote & lastVote)3354 void TopologyCoordinator::loadLastVote(const LastVote& lastVote) {
3355     _lastVote = lastVote;
3356 }
3357 
voteForMyselfV1()3358 void TopologyCoordinator::voteForMyselfV1() {
3359     _lastVote.setTerm(_term);
3360     _lastVote.setCandidateIndex(_selfIndex);
3361 }
3362 
setPrimaryIndex(long long primaryIndex)3363 void TopologyCoordinator::setPrimaryIndex(long long primaryIndex) {
3364     _currentPrimaryIndex = primaryIndex;
3365 }
3366 
becomeCandidateIfElectable(const Date_t now,StartElectionReason reason)3367 Status TopologyCoordinator::becomeCandidateIfElectable(const Date_t now,
3368                                                        StartElectionReason reason) {
3369     if (_role == Role::kLeader) {
3370         return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"};
3371     }
3372 
3373     if (_role == Role::kCandidate) {
3374         return {ErrorCodes::NodeNotElectable, "Not standing for election again; already candidate"};
3375     }
3376 
3377     const UnelectableReasonMask unelectableReason = _getMyUnelectableReason(now, reason);
3378     if (unelectableReason) {
3379         return {ErrorCodes::NodeNotElectable,
3380                 str::stream() << "Not standing for election because "
3381                               << _getUnelectableReasonString(unelectableReason)};
3382     }
3383 
3384     // All checks passed, become a candidate and start election proceedings.
3385     _role = Role::kCandidate;
3386 
3387     return Status::OK();
3388 }
3389 
setStorageEngineSupportsReadCommitted(bool supported)3390 void TopologyCoordinator::setStorageEngineSupportsReadCommitted(bool supported) {
3391     _storageEngineSupportsReadCommitted =
3392         supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo;
3393 }
3394 
restartHeartbeats()3395 void TopologyCoordinator::restartHeartbeats() {
3396     for (auto& hb : _memberData) {
3397         hb.restart();
3398     }
3399 }
3400 
latestKnownOpTimeSinceHeartbeatRestart() const3401 boost::optional<OpTime> TopologyCoordinator::latestKnownOpTimeSinceHeartbeatRestart() const {
3402     // The smallest OpTime in PV1.
3403     OpTime latest(Timestamp(0, 0), 0);
3404     for (size_t i = 0; i < _memberData.size(); i++) {
3405         auto& peer = _memberData[i];
3406 
3407         if (static_cast<int>(i) == _selfIndex) {
3408             continue;
3409         }
3410         // If any heartbeat is not fresh enough, return none.
3411         if (!peer.isUpdatedSinceRestart()) {
3412             return boost::none;
3413         }
3414         // Ignore down members
3415         if (!peer.up()) {
3416             continue;
3417         }
3418         if (peer.getHeartbeatAppliedOpTime() > latest) {
3419             latest = peer.getHeartbeatAppliedOpTime();
3420         }
3421     }
3422     return latest;
3423 }
3424 
3425 }  // namespace repl
3426 }  // namespace mongo
3427