1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/repl/freshness_checker.h"
36 
37 #include "mongo/base/status.h"
38 #include "mongo/bson/timestamp.h"
39 #include "mongo/db/repl/repl_set_config.h"
40 #include "mongo/db/repl/scatter_gather_runner.h"
41 #include "mongo/rpc/get_status_from_command_result.h"
42 #include "mongo/stdx/memory.h"
43 #include "mongo/util/log.h"
44 #include "mongo/util/scopeguard.h"
45 #include "mongo/util/time_support.h"
46 
47 namespace mongo {
48 namespace repl {
49 
50 using executor::RemoteCommandRequest;
51 using executor::RemoteCommandResponse;
52 
Algorithm(Timestamp lastOpTimeApplied,const ReplSetConfig & rsConfig,int selfIndex,const std::vector<HostAndPort> & targets)53 FreshnessChecker::Algorithm::Algorithm(Timestamp lastOpTimeApplied,
54                                        const ReplSetConfig& rsConfig,
55                                        int selfIndex,
56                                        const std::vector<HostAndPort>& targets)
57     : _responsesProcessed(0),
58       _failedVoterResponses(0),
59       _lastOpTimeApplied(lastOpTimeApplied),
60       _rsConfig(rsConfig),
61       _selfIndex(selfIndex),
62       _targets(targets),
63       _votingTargets(0),
64       _losableVoters(0),
65       _myVote(0),
66       _abortReason(None) {
67     // Count voting targets (since the targets could be a subset of members).
68     for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); it != _targets.end();
69          ++it) {
70         const MemberConfig* member = _rsConfig.findMemberByHostAndPort(*it);
71         if (member && member->isVoter())
72             ++_votingTargets;
73     }
74 
75     _myVote = _rsConfig.getMemberAt(_selfIndex).isVoter() ? 1 : 0;
76     _losableVoters = std::max(0, ((_votingTargets + _myVote) - _rsConfig.getMajorityVoteCount()));
77 }
78 
~Algorithm()79 FreshnessChecker::Algorithm::~Algorithm() {}
80 
getRequests() const81 std::vector<RemoteCommandRequest> FreshnessChecker::Algorithm::getRequests() const {
82     const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
83 
84     // gather all not-down nodes, get their fullnames(or hostandport's)
85     // schedule fresh command for each node
86     BSONObjBuilder freshCmdBuilder;
87     freshCmdBuilder.append("replSetFresh", 1);
88     freshCmdBuilder.append("set", _rsConfig.getReplSetName());
89     freshCmdBuilder.append("opTime", Date_t::fromMillisSinceEpoch(_lastOpTimeApplied.asLL()));
90     freshCmdBuilder.append("who", selfConfig.getHostAndPort().toString());
91     freshCmdBuilder.appendIntOrLL("cfgver", _rsConfig.getConfigVersion());
92     freshCmdBuilder.append("id", selfConfig.getId());
93     const BSONObj replSetFreshCmd = freshCmdBuilder.obj();
94 
95     std::vector<RemoteCommandRequest> requests;
96     for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); it != _targets.end();
97          ++it) {
98         invariant(*it != selfConfig.getHostAndPort());
99         requests.push_back(RemoteCommandRequest(
100             *it,
101             "admin",
102             replSetFreshCmd,
103             nullptr,
104             Milliseconds(30 * 1000)));  // trying to match current Socket timeout
105     }
106 
107     return requests;
108 }
109 
hadTooManyFailedVoterResponses() const110 bool FreshnessChecker::Algorithm::hadTooManyFailedVoterResponses() const {
111     const bool tooManyLostVoters = (_failedVoterResponses > _losableVoters);
112 
113     LOG(3) << "hadTooManyFailedVoterResponses(" << tooManyLostVoters
114            << ") = " << _failedVoterResponses << " failed responses <"
115            << " (" << _votingTargets << " total voters - " << _rsConfig.getMajorityVoteCount()
116            << " majority voters - me (" << _myVote << ")) -- losableVotes: " << _losableVoters;
117     return tooManyLostVoters;
118 }
119 
_isVotingMember(const HostAndPort hap) const120 bool FreshnessChecker::Algorithm::_isVotingMember(const HostAndPort hap) const {
121     const MemberConfig* member = _rsConfig.findMemberByHostAndPort(hap);
122     invariant(member);
123     return member->isVoter();
124 }
125 
processResponse(const RemoteCommandRequest & request,const RemoteCommandResponse & response)126 void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& request,
127                                                   const RemoteCommandResponse& response) {
128     ++_responsesProcessed;
129     bool votingMember = _isVotingMember(request.target);
130 
131     Status status = Status::OK();
132 
133     if (!response.isOK() || !((status = getStatusFromCommandResult(response.data)).isOK())) {
134         if (votingMember) {
135             ++_failedVoterResponses;
136             if (hadTooManyFailedVoterResponses()) {
137                 _abortReason = QuorumUnreachable;
138             }
139         }
140         if (!response.isOK()) {  // network/executor error
141             LOG(2) << "FreshnessChecker: Got failed response from " << request.target;
142         } else {  // command error, like unauth
143             LOG(2) << "FreshnessChecker: Got error response from " << request.target << " :"
144                    << status;
145         }
146         return;
147     }
148 
149     const BSONObj res = response.data;
150 
151     LOG(2) << "FreshnessChecker: Got response from " << request.target << " of " << res;
152 
153     if (res["fresher"].trueValue()) {
154         log() << "not electing self, " << request.target.toString()
155               << " knows a node is fresher than us";
156         _abortReason = FresherNodeFound;
157         return;
158     }
159 
160     if (res["opTime"].type() != mongo::Date) {
161         error() << "wrong type for opTime argument in replSetFresh response: "
162                 << typeName(res["opTime"].type());
163         _abortReason = FresherNodeFound;
164         return;
165     }
166     Timestamp remoteTime(res["opTime"].date());
167     if (remoteTime == _lastOpTimeApplied) {
168         log() << "not electing self, " << request.target.toString()
169               << " has same OpTime as us: " << remoteTime.toBSON();
170         _abortReason = FreshnessTie;
171     }
172     if (remoteTime > _lastOpTimeApplied) {
173         // something really wrong (rogue command?)
174         log() << "not electing self, " << request.target.toString()
175               << " has newer OpTime than us. Our OpTime: " << _lastOpTimeApplied.toBSON()
176               << ", their OpTime: " << remoteTime.toBSON();
177         _abortReason = FresherNodeFound;
178         return;
179     }
180 
181     if (res["veto"].trueValue()) {
182         BSONElement msg = res["errmsg"];
183         if (msg.type() == String) {
184             log() << "not electing self, " << request.target.toString() << " would veto with '"
185                   << msg.String() << "'";
186         } else {
187             log() << "not electing self, " << request.target.toString() << " would veto";
188         }
189         _abortReason = FresherNodeFound;
190         return;
191     }
192 }
193 
hasReceivedSufficientResponses() const194 bool FreshnessChecker::Algorithm::hasReceivedSufficientResponses() const {
195     return (_abortReason != None && _abortReason != FreshnessTie) ||
196         (_responsesProcessed == static_cast<int>(_targets.size()));
197 }
198 
shouldAbortElection() const199 FreshnessChecker::ElectionAbortReason FreshnessChecker::Algorithm::shouldAbortElection() const {
200     return _abortReason;
201 }
202 
shouldAbortElection() const203 FreshnessChecker::ElectionAbortReason FreshnessChecker::shouldAbortElection() const {
204     return _algorithm->shouldAbortElection();
205 }
206 
getOriginalConfigVersion() const207 long long FreshnessChecker::getOriginalConfigVersion() const {
208     return _originalConfigVersion;
209 }
210 
FreshnessChecker()211 FreshnessChecker::FreshnessChecker() : _isCanceled(false) {}
~FreshnessChecker()212 FreshnessChecker::~FreshnessChecker() {}
213 
start(executor::TaskExecutor * executor,const Timestamp & lastOpTimeApplied,const ReplSetConfig & currentConfig,int selfIndex,const std::vector<HostAndPort> & targets)214 StatusWith<executor::TaskExecutor::EventHandle> FreshnessChecker::start(
215     executor::TaskExecutor* executor,
216     const Timestamp& lastOpTimeApplied,
217     const ReplSetConfig& currentConfig,
218     int selfIndex,
219     const std::vector<HostAndPort>& targets) {
220     _originalConfigVersion = currentConfig.getConfigVersion();
221     _algorithm = std::make_shared<Algorithm>(lastOpTimeApplied, currentConfig, selfIndex, targets);
222     _runner = stdx::make_unique<ScatterGatherRunner>(_algorithm, executor);
223     return _runner->start();
224 }
225 
cancel()226 void FreshnessChecker::cancel() {
227     _isCanceled = true;
228     _runner->cancel();
229 }
230 
231 }  // namespace repl
232 }  // namespace mongo
233