1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/client/replica_set_monitor.h"
36 
37 #include <algorithm>
38 #include <limits>
39 #include <random>
40 
41 #include "mongo/bson/simple_bsonelement_comparator.h"
42 #include "mongo/client/connpool.h"
43 #include "mongo/client/global_conn_pool.h"
44 #include "mongo/client/read_preference.h"
45 #include "mongo/client/replica_set_monitor_internal.h"
46 #include "mongo/db/operation_context.h"
47 #include "mongo/db/repl/bson_extract_optime.h"
48 #include "mongo/db/server_options.h"
49 #include "mongo/stdx/condition_variable.h"
50 #include "mongo/stdx/mutex.h"
51 #include "mongo/stdx/thread.h"
52 #include "mongo/util/background.h"
53 #include "mongo/util/debug_util.h"
54 #include "mongo/util/exit.h"
55 #include "mongo/util/fail_point_service.h"
56 #include "mongo/util/log.h"
57 #include "mongo/util/string_map.h"
58 #include "mongo/util/timer.h"
59 
60 namespace mongo {
61 
62 using std::shared_ptr;
63 using std::numeric_limits;
64 using std::set;
65 using std::string;
66 using std::vector;
67 
68 // Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes.
69 MONGO_FP_DECLARE(failAsyncConfigChangeHook);
70 
71 namespace {
72 
73 // Pull nested types to top-level scope
74 typedef ReplicaSetMonitor::IsMasterReply IsMasterReply;
75 typedef ReplicaSetMonitor::ScanState ScanState;
76 typedef ReplicaSetMonitor::ScanStatePtr ScanStatePtr;
77 typedef ReplicaSetMonitor::SetState SetState;
78 typedef ReplicaSetMonitor::SetStatePtr SetStatePtr;
79 typedef ReplicaSetMonitor::Refresher Refresher;
80 typedef ScanState::UnconfirmedReplies UnconfirmedReplies;
81 typedef SetState::Node Node;
82 typedef SetState::Nodes Nodes;
83 using executor::TaskExecutor;
84 using CallbackArgs = TaskExecutor::CallbackArgs;
85 using CallbackHandle = TaskExecutor::CallbackHandle;
86 
87 const double socketTimeoutSecs = 5;
88 
89 // Intentionally chosen to compare worse than all known latencies.
90 const int64_t unknownLatency = numeric_limits<int64_t>::max();
91 
92 const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
93 const Milliseconds kFindHostMaxBackOffTime(500);
94 AtomicBool areRefreshRetriesDisabledForTest{false};  // Only true in tests.
95 
96 // TODO: Move to ReplicaSetMonitorManager
97 ReplicaSetMonitor::ConfigChangeHook asyncConfigChangeHook;
98 ReplicaSetMonitor::ConfigChangeHook syncConfigChangeHook;
99 
100 //
101 // Helpers for stl algorithms
102 //
103 
isMaster(const Node & node)104 bool isMaster(const Node& node) {
105     return node.isMaster;
106 }
107 
opTimeGreater(const Node * lhs,const Node * rhs)108 bool opTimeGreater(const Node* lhs, const Node* rhs) {
109     return lhs->opTime > rhs->opTime;
110 }
111 
compareLatencies(const Node * lhs,const Node * rhs)112 bool compareLatencies(const Node* lhs, const Node* rhs) {
113     // NOTE: this automatically compares Node::unknownLatency worse than all others.
114     return lhs->latencyMicros < rhs->latencyMicros;
115 }
116 
hostsEqual(const Node & lhs,const HostAndPort & rhs)117 bool hostsEqual(const Node& lhs, const HostAndPort& rhs) {
118     return lhs.host == rhs;
119 }
120 
121 // Allows comparing two Nodes, or a HostAndPort and a Node.
122 // NOTE: the two HostAndPort overload is only needed to support extra checks in some STL
123 // implementations. For simplicity, no comparator should be used with collections of just
124 // HostAndPort.
125 struct CompareHosts {
operator ()mongo::__anon4c2208890111::CompareHosts126     bool operator()(const Node& lhs, const Node& rhs) {
127         return lhs.host < rhs.host;
128     }
operator ()mongo::__anon4c2208890111::CompareHosts129     bool operator()(const Node& lhs, const HostAndPort& rhs) {
130         return lhs.host < rhs;
131     }
operator ()mongo::__anon4c2208890111::CompareHosts132     bool operator()(const HostAndPort& lhs, const Node& rhs) {
133         return lhs < rhs.host;
134     }
operator ()mongo::__anon4c2208890111::CompareHosts135     bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) {
136         return lhs < rhs;
137     }
138 } compareHosts;  // like an overloaded function, but able to pass to stl algorithms
139 
140 // The following structs should be treated as functions returning a UnaryPredicate.
141 // Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost));
142 // They all hold their constructor argument by reference.
143 
144 struct HostIs {
HostIsmongo::__anon4c2208890111::HostIs145     explicit HostIs(const HostAndPort& host) : _host(host) {}
operator ()mongo::__anon4c2208890111::HostIs146     bool operator()(const HostAndPort& host) {
147         return host == _host;
148     }
operator ()mongo::__anon4c2208890111::HostIs149     bool operator()(const Node& node) {
150         return node.host == _host;
151     }
152     const HostAndPort& _host;
153 };
154 
155 struct HostNotIn {
HostNotInmongo::__anon4c2208890111::HostNotIn156     explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {}
operator ()mongo::__anon4c2208890111::HostNotIn157     bool operator()(const HostAndPort& host) {
158         return !_hosts.count(host);
159     }
operator ()mongo::__anon4c2208890111::HostNotIn160     bool operator()(const Node& node) {
161         return !_hosts.count(node.host);
162     }
163     const std::set<HostAndPort>& _hosts;
164 };
165 
pingTimeMillis(const Node & node)166 int32_t pingTimeMillis(const Node& node) {
167     auto latencyMillis = node.latencyMicros / 1000;
168     if (latencyMillis > numeric_limits<int32_t>::max()) {
169         // In particular, Node::unknownLatency does not fit in an int32.
170         return numeric_limits<int32_t>::max();
171     }
172     return latencyMillis;
173 }
174 
175 /**
176  * Replica set refresh period on the task executor.
177  */
178 const Seconds kRefreshPeriod(30);
179 }  // namespace
180 
181 // If we cannot find a host after 15 seconds of refreshing, give up
182 const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15);
183 
184 // Defaults to random selection as required by the spec
185 bool ReplicaSetMonitor::useDeterministicHostSelection = false;
186 
ReplicaSetMonitor(StringData name,const std::set<HostAndPort> & seeds)187 ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds)
188     : _state(std::make_shared<SetState>(name, seeds)),
189       _executor(globalRSMonitorManager.getExecutor()) {}
190 
ReplicaSetMonitor(const MongoURI & uri)191 ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
192     : _state(std::make_shared<SetState>(uri)), _executor(globalRSMonitorManager.getExecutor()) {}
193 
init()194 void ReplicaSetMonitor::init() {
195     stdx::lock_guard<stdx::mutex> lk(_mutex);
196     invariant(_executor);
197     std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
198     auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) {
199         if (auto ptr = that.lock()) {
200             ptr->_refresh(cbArgs);
201         }
202     });
203 
204     if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
205         LOG(1) << "Couldn't schedule refresh for " << getName()
206                << ". Executor shutdown in progress";
207         return;
208     }
209 
210     if (!status.isOK()) {
211         severe() << "Can't start refresh for replica set " << getName()
212                  << causedBy(redact(status.getStatus()));
213         fassertFailed(40139);
214     }
215 
216     _refresherHandle = status.getValue();
217 }
218 
~ReplicaSetMonitor()219 ReplicaSetMonitor::~ReplicaSetMonitor() {
220     // need this lock because otherwise can get race with scheduling in _refresh
221     stdx::lock_guard<stdx::mutex> lk(_mutex);
222     if (!_refresherHandle || !_executor) {
223         return;
224     }
225 
226     _executor->cancel(_refresherHandle);
227     // Note: calling _executor->wait(_refresherHandle); from the dispatcher thread will cause hang
228     // Its ok not to call it because the d-tor is called only when the last owning pointer goes out
229     // of scope, so as taskExecutor queue holds a weak pointer to RSM it will not be able to get a
230     // task to execute eliminating the need to call method "wait".
231     //
232     _refresherHandle = {};
233 }
234 
_refresh(const CallbackArgs & cbArgs)235 void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
236     if (!cbArgs.status.isOK()) {
237         return;
238     }
239 
240     Timer t;
241     startOrContinueRefresh().refreshAll();
242     LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec";
243 
244     // Reschedule the refresh
245     invariant(_executor);
246 
247     if (_isRemovedFromManager.load()) {  // already removed so no need to refresh
248         LOG(1) << "Stopping refresh for replica set " << getName() << " because its removed";
249         return;
250     }
251 
252     stdx::lock_guard<stdx::mutex> lk(_mutex);
253 
254     std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
255     auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
256                                             [=](const CallbackArgs& cbArgs) {
257                                                 if (auto ptr = that.lock()) {
258                                                     ptr->_refresh(cbArgs);
259                                                 }
260                                             });
261 
262     if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
263         LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress";
264         return;
265     }
266 
267     if (!status.isOK()) {
268         severe() << "Can't continue refresh for replica set " << getName() << " due to "
269                  << redact(status.getStatus());
270         fassertFailed(40140);
271     }
272 
273     _refresherHandle = status.getValue();
274 }
275 
getHostOrRefresh(const ReadPreferenceSetting & criteria,Milliseconds maxWait)276 StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
277                                                             Milliseconds maxWait) {
278     if (_isRemovedFromManager.load()) {
279         return {ErrorCodes::ReplicaSetMonitorRemoved,
280                 str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"};
281     }
282 
283     {
284         // Fast path, for the failure-free case
285         stdx::lock_guard<stdx::mutex> lk(_state->mutex);
286         HostAndPort out = _state->getMatchingHost(criteria);
287         if (!out.empty())
288             return {std::move(out)};
289     }
290 
291     const auto startTimeMs = Date_t::now();
292 
293     while (true) {
294         // We might not have found any matching hosts due to the scan, which just completed may have
295         // seen stale data from before we joined. Therefore we should participate in a new scan to
296         // make sure all hosts are contacted at least once (possibly by other threads) before this
297         // function gives up.
298         Refresher refresher(startOrContinueRefresh());
299 
300         HostAndPort out = refresher.refreshUntilMatches(criteria);
301         if (!out.empty())
302             return {std::move(out)};
303 
304         if (globalInShutdownDeprecated()) {
305             return {ErrorCodes::ShutdownInProgress, str::stream() << "Server is shutting down"};
306         }
307 
308         const Milliseconds remaining = maxWait - (Date_t::now() - startTimeMs);
309 
310         if (remaining < kFindHostMaxBackOffTime || areRefreshRetriesDisabledForTest.load()) {
311             break;
312         }
313 
314         // Back-off so we don't spam the replica set hosts too much
315         sleepFor(kFindHostMaxBackOffTime);
316     }
317 
318     return {ErrorCodes::FailedToSatisfyReadPreference,
319             str::stream() << "Could not find host matching read preference " << criteria.toString()
320                           << " for set "
321                           << getName()};
322 }
323 
getMasterOrUassert()324 HostAndPort ReplicaSetMonitor::getMasterOrUassert() {
325     return uassertStatusOK(getHostOrRefresh(kPrimaryOnlyReadPreference));
326 }
327 
startOrContinueRefresh()328 Refresher ReplicaSetMonitor::startOrContinueRefresh() {
329     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
330 
331     Refresher out(_state);
332     DEV _state->checkInvariants();
333     return out;
334 }
335 
failedHost(const HostAndPort & host,const Status & status)336 void ReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
337     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
338     Node* node = _state->findNode(host);
339     if (node)
340         node->markFailed(status);
341     DEV _state->checkInvariants();
342 }
343 
isPrimary(const HostAndPort & host) const344 bool ReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
345     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
346     Node* node = _state->findNode(host);
347     return node ? node->isMaster : false;
348 }
349 
isHostUp(const HostAndPort & host) const350 bool ReplicaSetMonitor::isHostUp(const HostAndPort& host) const {
351     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
352     Node* node = _state->findNode(host);
353     return node ? node->isUp : false;
354 }
355 
getMinWireVersion() const356 int ReplicaSetMonitor::getMinWireVersion() const {
357     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
358     int minVersion = 0;
359     for (const auto& host : _state->nodes) {
360         if (host.isUp) {
361             minVersion = std::max(minVersion, host.minWireVersion);
362         }
363     }
364 
365     return minVersion;
366 }
367 
getMaxWireVersion() const368 int ReplicaSetMonitor::getMaxWireVersion() const {
369     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
370     int maxVersion = std::numeric_limits<int>::max();
371     for (const auto& host : _state->nodes) {
372         if (host.isUp) {
373             maxVersion = std::min(maxVersion, host.maxWireVersion);
374         }
375     }
376 
377     return maxVersion;
378 }
379 
getName() const380 std::string ReplicaSetMonitor::getName() const {
381     // name is const so don't need to lock
382     return _state->name;
383 }
384 
getServerAddress() const385 std::string ReplicaSetMonitor::getServerAddress() const {
386     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
387     return _state->getConfirmedServerAddress();
388 }
389 
contains(const HostAndPort & host) const390 bool ReplicaSetMonitor::contains(const HostAndPort& host) const {
391     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
392     return _state->seedNodes.count(host);
393 }
394 
createIfNeeded(const string & name,const set<HostAndPort> & servers)395 shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const string& name,
396                                                                 const set<HostAndPort>& servers) {
397     return globalRSMonitorManager.getOrCreateMonitor(
398         ConnectionString::forReplicaSet(name, vector<HostAndPort>(servers.begin(), servers.end())));
399 }
400 
createIfNeeded(const MongoURI & uri)401 shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const MongoURI& uri) {
402     return globalRSMonitorManager.getOrCreateMonitor(uri);
403 }
404 
get(const std::string & name)405 shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::get(const std::string& name) {
406     return globalRSMonitorManager.getMonitor(name);
407 }
408 
remove(const string & name)409 void ReplicaSetMonitor::remove(const string& name) {
410     globalRSMonitorManager.removeMonitor(name);
411 
412     // Kill all pooled ReplicaSetConnections for this set. They will not function correctly
413     // after we kill the ReplicaSetMonitor.
414     globalConnPool.removeHost(name);
415 }
416 
setAsynchronousConfigChangeHook(ConfigChangeHook hook)417 void ReplicaSetMonitor::setAsynchronousConfigChangeHook(ConfigChangeHook hook) {
418     invariant(!asyncConfigChangeHook);
419     asyncConfigChangeHook = hook;
420 }
421 
setSynchronousConfigChangeHook(ConfigChangeHook hook)422 void ReplicaSetMonitor::setSynchronousConfigChangeHook(ConfigChangeHook hook) {
423     invariant(!syncConfigChangeHook);
424     syncConfigChangeHook = hook;
425 }
426 
427 // TODO move to correct order with non-statics before pushing
appendInfo(BSONObjBuilder & bsonObjBuilder,bool forFTDC) const428 void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const {
429     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
430 
431     BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName()));
432     if (forFTDC) {
433         for (size_t i = 0; i < _state->nodes.size(); i++) {
434             const Node& node = _state->nodes[i];
435             monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node));
436         }
437         return;
438     }
439 
440     // NOTE: the format here must be consistent for backwards compatibility
441     BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts"));
442     for (size_t i = 0; i < _state->nodes.size(); i++) {
443         const Node& node = _state->nodes[i];
444 
445         BSONObjBuilder builder;
446         builder.append("addr", node.host.toString());
447         builder.append("ok", node.isUp);
448         builder.append("ismaster", node.isMaster);  // intentionally not camelCase
449         builder.append("hidden", false);            // we don't keep hidden nodes in the set
450         builder.append("secondary", node.isUp && !node.isMaster);
451         builder.append("pingTimeMillis", pingTimeMillis(node));
452 
453         if (!node.tags.isEmpty()) {
454             builder.append("tags", node.tags);
455         }
456 
457         hosts.append(builder.obj());
458     }
459 }
460 
shutdown()461 void ReplicaSetMonitor::shutdown() {
462     globalRSMonitorManager.shutdown();
463 }
464 
cleanup()465 void ReplicaSetMonitor::cleanup() {
466     globalRSMonitorManager.removeAllMonitors();
467     asyncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
468     syncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
469 }
470 
disableRefreshRetries_forTest()471 void ReplicaSetMonitor::disableRefreshRetries_forTest() {
472     areRefreshRetriesDisabledForTest.store(true);
473 }
474 
isKnownToHaveGoodPrimary() const475 bool ReplicaSetMonitor::isKnownToHaveGoodPrimary() const {
476     stdx::lock_guard<stdx::mutex> lk(_state->mutex);
477 
478     for (const auto& node : _state->nodes) {
479         if (node.isMaster) {
480             return true;
481         }
482     }
483 
484     return false;
485 }
486 
markAsRemoved()487 void ReplicaSetMonitor::markAsRemoved() {
488     _isRemovedFromManager.store(true);
489 }
490 
Refresher(const SetStatePtr & setState)491 Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) {
492     if (_scan)
493         return;  // participate in in-progress scan
494 
495     LOG(2) << "Starting new refresh of replica set " << _set->name;
496     _scan = startNewScan(_set.get());
497     _set->currentScan = _scan;
498 }
499 
refreshUntilMatches(const ReadPreferenceSetting & criteria)500 HostAndPort Refresher::refreshUntilMatches(const ReadPreferenceSetting& criteria) {
501     return _refreshUntilMatches(&criteria);
502 };
503 
refreshAll()504 void Refresher::refreshAll() {
505     _refreshUntilMatches(nullptr);
506 }
507 
getNextStep()508 Refresher::NextStep Refresher::getNextStep() {
509     // No longer the current scan
510     if (_scan != _set->currentScan) {
511         return NextStep(NextStep::DONE);
512     }
513 
514     // Wait for all dispatched hosts to return before trying any fallback hosts.
515     if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) {
516         return NextStep(NextStep::WAIT);
517     }
518 
519     // If we haven't yet found a master, try contacting unconfirmed hosts
520     if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) {
521         _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand);
522         _scan->possibleNodes.clear();
523     }
524 
525     if (_scan->hostsToScan.empty()) {
526         // We've tried all hosts we can, so nothing more to do in this round.
527         if (!_scan->foundUpMaster) {
528             warning() << "Unable to reach primary for set " << _set->name;
529 
530             // Since we've talked to everyone we could but still didn't find a primary, we
531             // do the best we can, and assume all unconfirmedReplies are actually from nodes
532             // in the set (we've already confirmed that they think they are). This is
533             // important since it allows us to bootstrap to a usable state even if we are
534             // unable to talk to a master while starting up. As soon as we are able to
535             // contact a master, we will remove any nodes that it doesn't think are part of
536             // the set, undoing the damage we cause here.
537 
538             // NOTE: we don't modify seedNodes or notify about set membership change in this
539             // case since it hasn't been confirmed by a master.
540             const string oldAddr = _set->getUnconfirmedServerAddress();
541             for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
542                  it != _scan->unconfirmedReplies.end();
543                  ++it) {
544                 _set->findOrCreateNode(it->host)->update(*it);
545             }
546 
547             const string newAddr = _set->getUnconfirmedServerAddress();
548             if (oldAddr != newAddr && syncConfigChangeHook) {
549                 // Run the syncConfigChangeHook because the ShardRegistry needs to know about any
550                 // node we might talk to.  Don't run the asyncConfigChangeHook because we don't
551                 // want to update the seed list stored on the config servers with unconfirmed hosts.
552                 syncConfigChangeHook(_set->name, _set->getUnconfirmedServerAddress());
553             }
554         }
555 
556         if (_scan->foundAnyUpNodes) {
557             _set->consecutiveFailedScans = 0;
558         } else {
559             auto nScans = _set->consecutiveFailedScans++;
560             if (nScans <= 10 || nScans % 10 == 0) {
561                 log() << "Cannot reach any nodes for set " << _set->name
562                       << ". Please check network connectivity and the status of the set. "
563                       << "This has happened for " << _set->consecutiveFailedScans
564                       << " checks in a row.";
565             }
566         }
567 
568         // Makes sure all other Refreshers in this round return DONE
569         _set->currentScan.reset();
570 
571         return NextStep(NextStep::DONE);
572     }
573 
574     // Pop and return the next hostToScan.
575     HostAndPort host = _scan->hostsToScan.front();
576     _scan->hostsToScan.pop_front();
577     _scan->waitingFor.insert(host);
578     _scan->triedHosts.insert(host);
579 
580     return NextStep(NextStep::CONTACT_HOST, host);
581 }
582 
receivedIsMaster(const HostAndPort & from,int64_t latencyMicros,const BSONObj & replyObj)583 void Refresher::receivedIsMaster(const HostAndPort& from,
584                                  int64_t latencyMicros,
585                                  const BSONObj& replyObj) {
586     // Be careful: all return paths must call either failedHost or cv.notify_all!
587     _scan->waitingFor.erase(from);
588 
589     const IsMasterReply reply(from, latencyMicros, replyObj);
590 
591     // Handle various failure cases
592     if (!reply.ok) {
593         failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"});
594         return;
595     }
596 
597     if (reply.setName != _set->name) {
598         if (reply.raw["isreplicaset"].trueValue()) {
599             // The reply came from a node in the state referred to as RSGhost in the SDAM
600             // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event,
601             // if a reply from a ghost offers a list of possible other members of the replica set,
602             // and if this refresher has yet to find the replica set master, we add hosts listed in
603             // the reply to the list of possible replica set members.
604             if (!_scan->foundUpMaster) {
605                 _scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end());
606             }
607         } else {
608             warning() << "node: " << from << " isn't a part of set: " << _set->name
609                       << " ismaster: " << replyObj;
610         }
611 
612         failedHost(from,
613                    {ErrorCodes::InconsistentReplicaSetNames,
614                     str::stream() << "Target replica set name " << reply.setName
615                                   << " does not match the monitored set name "
616                                   << _set->name});
617         return;
618     }
619 
620     if (reply.isMaster) {
621         Status status = receivedIsMasterFromMaster(from, reply);
622         if (!status.isOK()) {
623             failedHost(from, status);
624             return;
625         }
626     }
627 
628     if (_scan->foundUpMaster) {
629         // We only update a Node if a master has confirmed it is in the set.
630         _set->updateNodeIfInNodes(reply);
631     } else {
632         receivedIsMasterBeforeFoundMaster(reply);
633         _scan->unconfirmedReplies.push_back(reply);
634     }
635 
636     // _set->nodes may still not have any nodes with isUp==true, but we have at least found a
637     // connectible host that is that claims to be in the set.
638     _scan->foundAnyUpNodes = true;
639 
640     // TODO consider only notifying if we've updated a node or we've emptied waitingFor.
641     _set->cv.notify_all();
642 
643     DEV _set->checkInvariants();
644 }
645 
failedHost(const HostAndPort & host,const Status & status)646 void Refresher::failedHost(const HostAndPort& host, const Status& status) {
647     _scan->waitingFor.erase(host);
648 
649     // Failed hosts can't pass criteria, so the only way they'd effect the _refreshUntilMatches
650     // loop is if it was the last host we were waitingFor.
651     if (_scan->waitingFor.empty())
652         _set->cv.notify_all();
653 
654     Node* node = _set->findNode(host);
655     if (node)
656         node->markFailed(status);
657 }
658 
startNewScan(const SetState * set)659 ScanStatePtr Refresher::startNewScan(const SetState* set) {
660     const ScanStatePtr scan = std::make_shared<ScanState>();
661 
662     // The heuristics we use in deciding the order to contact hosts are designed to find a
663     // master as quickly as possible. This is because we can't use any hosts we find until
664     // we either get the latest set of members from a master or talk to all possible hosts
665     // without finding a master.
666 
667     // TODO It might make sense to check down nodes first if the last seen master is still
668     // marked as up.
669 
670     int upNodes = 0;
671     for (Nodes::const_iterator it(set->nodes.begin()), end(set->nodes.end()); it != end; ++it) {
672         if (it->isUp) {
673             // scan the nodes we think are up first
674             scan->hostsToScan.push_front(it->host);
675             upNodes++;
676         } else {
677             scan->hostsToScan.push_back(it->host);
678         }
679     }
680 
681     // shuffle the queue, but keep "up" nodes at the front
682     std::random_shuffle(scan->hostsToScan.begin(), scan->hostsToScan.begin() + upNodes, set->rand);
683     std::random_shuffle(scan->hostsToScan.begin() + upNodes, scan->hostsToScan.end(), set->rand);
684 
685     if (!set->lastSeenMaster.empty()) {
686         // move lastSeenMaster to front of queue
687         std::stable_partition(
688             scan->hostsToScan.begin(), scan->hostsToScan.end(), HostIs(set->lastSeenMaster));
689     }
690 
691     return scan;
692 }
693 
receivedIsMasterFromMaster(const HostAndPort & from,const IsMasterReply & reply)694 Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) {
695     invariant(reply.isMaster);
696 
697     // Reject if config version is older. This is for backwards compatibility with nodes in pv0
698     // since they don't have the same ordering with pv1 electionId.
699     if (reply.configVersion < _set->configVersion) {
700         return {ErrorCodes::NotMaster,
701                 str::stream() << "Node " << from
702                               << " believes it is primary, but its config version "
703                               << reply.configVersion
704                               << " is older than the most recent config version "
705                               << _set->configVersion};
706     }
707 
708     if (reply.electionId.isSet()) {
709         // ElectionIds are only comparable if they are of the same protocol version. However, since
710         // isMaster has no protocol version field, we use the configVersion instead. This works
711         // because configVersion needs to be incremented whenever the protocol version is changed.
712         if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() &&
713             _set->maxElectionId.compare(reply.electionId) > 0) {
714             return {ErrorCodes::NotMaster,
715                     str::stream() << "Node " << from
716                                   << " believes it is primary, but its election id "
717                                   << reply.electionId
718                                   << " is older than the most recent election id "
719                                   << _set->maxElectionId};
720         }
721 
722         _set->maxElectionId = reply.electionId;
723     }
724 
725     _set->configVersion = reply.configVersion;
726 
727     // Mark all nodes as not master. We will mark ourself as master before releasing the lock.
728     // NOTE: we use a "last-wins" policy if multiple hosts claim to be master.
729     for (size_t i = 0; i < _set->nodes.size(); i++) {
730         _set->nodes[i].isMaster = false;
731     }
732 
733     // Check if the master agrees with our current list of nodes.
734     // REMINDER: both _set->nodes and reply.normalHosts are sorted.
735     if (_set->nodes.size() != reply.normalHosts.size() ||
736         !std::equal(
737             _set->nodes.begin(), _set->nodes.end(), reply.normalHosts.begin(), hostsEqual)) {
738         LOG(2) << "Adjusting nodes in our view of replica set " << _set->name
739                << " based on master reply: " << redact(reply.raw);
740 
741         // remove non-members from _set->nodes
742         _set->nodes.erase(
743             std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.normalHosts)),
744             _set->nodes.end());
745 
746         // add new members to _set->nodes
747         for (std::set<HostAndPort>::const_iterator it = reply.normalHosts.begin();
748              it != reply.normalHosts.end();
749              ++it) {
750             _set->findOrCreateNode(*it);
751         }
752 
753         // replace hostToScan queue with untried normal hosts. can both add and remove
754         // hosts from the queue.
755         _scan->hostsToScan.clear();
756         _scan->enqueAllUntriedHosts(reply.normalHosts, _set->rand);
757 
758         if (!_scan->waitingFor.empty()) {
759             // make sure we don't wait for any hosts that aren't considered members
760             std::set<HostAndPort> newWaitingFor;
761             std::set_intersection(reply.normalHosts.begin(),
762                                   reply.normalHosts.end(),
763                                   _scan->waitingFor.begin(),
764                                   _scan->waitingFor.end(),
765                                   std::inserter(newWaitingFor, newWaitingFor.end()));
766             _scan->waitingFor.swap(newWaitingFor);
767         }
768     }
769 
770     if (reply.normalHosts != _set->seedNodes) {
771         const string oldAddr = _set->getConfirmedServerAddress();
772         _set->seedNodes = reply.normalHosts;
773 
774         // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare
775         // and we want to record our changes
776         log() << "changing hosts to " << _set->getConfirmedServerAddress() << " from " << oldAddr;
777 
778         if (syncConfigChangeHook) {
779             syncConfigChangeHook(_set->name, _set->getConfirmedServerAddress());
780         }
781 
782         if (asyncConfigChangeHook && !MONGO_FAIL_POINT(failAsyncConfigChangeHook)) {
783             // call from a separate thread to avoid blocking and holding lock while potentially
784             // going over the network
785             stdx::thread bg(asyncConfigChangeHook, _set->name, _set->getConfirmedServerAddress());
786             bg.detach();
787         }
788     }
789 
790     // Update other nodes's information based on replies we've already seen
791     for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
792          it != _scan->unconfirmedReplies.end();
793          ++it) {
794         // this ignores replies from hosts not in _set->nodes (as modified above)
795         _set->updateNodeIfInNodes(*it);
796     }
797     _scan->unconfirmedReplies.clear();
798 
799     _scan->foundUpMaster = true;
800     _set->lastSeenMaster = reply.host;
801 
802     return Status::OK();
803 }
804 
receivedIsMasterBeforeFoundMaster(const IsMasterReply & reply)805 void Refresher::receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply) {
806     invariant(!reply.isMaster);
807     // This function doesn't alter _set at all. It only modifies the work queue in _scan.
808 
809     // Add everyone this host claims is in the set to possibleNodes.
810     _scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end());
811 
812     // If this node thinks the primary is someone we haven't tried, make that the next
813     // hostToScan.
814     if (!reply.primary.empty() && !_scan->triedHosts.count(reply.primary)) {
815         std::deque<HostAndPort>::iterator it = std::stable_partition(
816             _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(reply.primary));
817 
818         if (it == _scan->hostsToScan.begin()) {
819             // reply.primary wasn't in hostsToScan
820             _scan->hostsToScan.push_front(reply.primary);
821         }
822     }
823 }
824 
_refreshUntilMatches(const ReadPreferenceSetting * criteria)825 HostAndPort Refresher::_refreshUntilMatches(const ReadPreferenceSetting* criteria) {
826     stdx::unique_lock<stdx::mutex> lk(_set->mutex);
827     while (true) {
828         if (criteria) {
829             HostAndPort out = _set->getMatchingHost(*criteria);
830             if (!out.empty())
831                 return out;
832         }
833 
834         const NextStep ns = getNextStep();
835         DEV _set->checkInvariants();
836 
837         switch (ns.step) {
838             case NextStep::DONE:
839                 // getNextStep may have updated nodes if no master was found
840                 return criteria ? _set->getMatchingHost(*criteria) : HostAndPort();
841 
842             case NextStep::WAIT:  // TODO consider treating as DONE for refreshAll
843                 _set->cv.wait(lk);
844                 continue;
845 
846             case NextStep::CONTACT_HOST: {
847                 StatusWith<BSONObj> isMasterReplyStatus{ErrorCodes::InternalError,
848                                                         "Uninitialized variable"};
849                 int64_t pingMicros = 0;
850                 MongoURI targetURI;
851 
852                 if (_set->setUri.isValid()) {
853                     targetURI = _set->setUri.cloneURIForServer(ns.host);
854                     targetURI.setUser("");
855                     targetURI.setPassword("");
856                 } else {
857                     targetURI = MongoURI(ConnectionString(ns.host));
858                 }
859 
860                 // Do not do network calls while holding a mutex
861                 lk.unlock();
862                 try {
863                     ScopedDbConnection conn(targetURI, socketTimeoutSecs);
864                     bool ignoredOutParam = false;
865                     Timer timer;
866                     BSONObj reply;
867                     conn->isMaster(ignoredOutParam, &reply);
868                     isMasterReplyStatus = reply;
869                     pingMicros = timer.micros();
870                     conn.done();  // return to pool on success.
871                 } catch (const DBException& ex) {
872                     isMasterReplyStatus = ex.toStatus();
873                 }
874                 lk.lock();
875 
876                 // Ignore the reply and return if we are no longer the current scan. This might
877                 // happen if it was decided that the host we were contacting isn't part of the set.
878                 if (_scan != _set->currentScan)
879                     return criteria ? _set->getMatchingHost(*criteria) : HostAndPort();
880 
881                 if (isMasterReplyStatus.isOK())
882                     receivedIsMaster(ns.host, pingMicros, isMasterReplyStatus.getValue());
883                 else
884                     failedHost(ns.host, isMasterReplyStatus.getStatus());
885             }
886         }
887     }
888 }
889 
parse(const BSONObj & obj)890 void IsMasterReply::parse(const BSONObj& obj) {
891     try {
892         raw = obj.getOwned();  // don't use obj again after this line
893 
894         ok = raw["ok"].trueValue();
895         if (!ok)
896             return;
897 
898         setName = raw["setName"].str();
899         hidden = raw["hidden"].trueValue();
900         secondary = raw["secondary"].trueValue();
901 
902         minWireVersion = raw["minWireVersion"].numberInt();
903         maxWireVersion = raw["maxWireVersion"].numberInt();
904 
905         // hidden nodes can't be master, even if they claim to be.
906         isMaster = !hidden && raw["ismaster"].trueValue();
907 
908         if (isMaster && raw.hasField("electionId")) {
909             electionId = raw["electionId"].OID();
910         }
911 
912         configVersion = raw["setVersion"].numberInt();
913 
914         const string primaryString = raw["primary"].str();
915         primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString);
916 
917         // both hosts and passives, but not arbiters, are considered "normal hosts"
918         normalHosts.clear();
919         BSONForEach(host, raw.getObjectField("hosts")) {
920             normalHosts.insert(HostAndPort(host.String()));
921         }
922         BSONForEach(host, raw.getObjectField("passives")) {
923             normalHosts.insert(HostAndPort(host.String()));
924         }
925 
926         tags = raw.getObjectField("tags");
927         BSONObj lastWriteField = raw.getObjectField("lastWrite");
928         if (!lastWriteField.isEmpty()) {
929             if (auto lastWrite = lastWriteField["lastWriteDate"]) {
930                 lastWriteDate = lastWrite.date();
931             }
932 
933             uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime));
934         }
935     } catch (const std::exception& e) {
936         ok = false;
937         log() << "exception while parsing isMaster reply: " << e.what() << " " << obj;
938     }
939 }
940 
Node(const HostAndPort & host)941 Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {}
942 
markFailed(const Status & status)943 void Node::markFailed(const Status& status) {
944     if (isUp) {
945         log() << "Marking host " << host << " as failed" << causedBy(redact(status));
946 
947         isUp = false;
948     }
949 
950     isMaster = false;
951 }
952 
matches(const ReadPreference pref) const953 bool Node::matches(const ReadPreference pref) const {
954     if (!isUp)
955         return false;
956 
957     if (pref == ReadPreference::PrimaryOnly) {
958         return isMaster;
959     }
960 
961     if (pref == ReadPreference::SecondaryOnly) {
962         if (isMaster)
963             return false;
964     }
965 
966     return true;
967 }
968 
matches(const BSONObj & tag) const969 bool Node::matches(const BSONObj& tag) const {
970     BSONForEach(tagCriteria, tag) {
971         if (SimpleBSONElementComparator::kInstance.evaluate(
972                 this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) {
973             return false;
974         }
975     }
976 
977     return true;
978 }
979 
update(const IsMasterReply & reply)980 void Node::update(const IsMasterReply& reply) {
981     invariant(host == reply.host);
982     invariant(reply.ok);
983 
984     LOG(3) << "Updating host " << host << " based on ismaster reply: " << reply.raw;
985 
986     // Nodes that are hidden or neither master or secondary are considered down since we can't
987     // send any operations to them.
988     isUp = !reply.hidden && (reply.isMaster || reply.secondary);
989     isMaster = reply.isMaster;
990 
991     minWireVersion = reply.minWireVersion;
992     maxWireVersion = reply.maxWireVersion;
993 
994     // save a copy if unchanged
995     if (!tags.binaryEqual(reply.tags))
996         tags = reply.tags.getOwned();
997 
998     if (reply.latencyMicros >= 0) {  // TODO upper bound?
999         if (latencyMicros == unknownLatency) {
1000             latencyMicros = reply.latencyMicros;
1001         } else {
1002             // update latency with smoothed moving average (1/4th the delta)
1003             latencyMicros += (reply.latencyMicros - latencyMicros) / 4;
1004         }
1005     }
1006 
1007     LOG(3) << "Updating " << host << " lastWriteDate to " << reply.lastWriteDate;
1008     lastWriteDate = reply.lastWriteDate;
1009 
1010     LOG(3) << "Updating " << host << " opTime to " << reply.opTime;
1011     opTime = reply.opTime;
1012     lastWriteDateUpdateTime = Date_t::now();
1013 }
1014 
SetState(StringData name,const std::set<HostAndPort> & seedNodes)1015 SetState::SetState(StringData name, const std::set<HostAndPort>& seedNodes)
1016     : name(name.toString()),
1017       consecutiveFailedScans(0),
1018       seedNodes(seedNodes),
1019       latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * 1000),
1020       rand(std::random_device()()),
1021       roundRobin(0) {
1022     uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
1023 
1024     if (name.empty())
1025         warning() << "Replica set name empty, first node: " << *(seedNodes.begin());
1026 
1027     // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a
1028     // scan until we start a scan and either find a master or contact all hosts without finding
1029     // one.
1030     // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to
1031     // sort nodes after this loop.
1032     for (std::set<HostAndPort>::const_iterator it = seedNodes.begin(); it != seedNodes.end();
1033          ++it) {
1034         nodes.push_back(Node(*it));
1035     }
1036 
1037     DEV checkInvariants();
1038 }
1039 
SetState(const MongoURI & uri)1040 SetState::SetState(const MongoURI& uri)
1041     : SetState(uri.getSetName(),
1042                std::set<HostAndPort>(uri.getServers().begin(), uri.getServers().end())) {
1043     setUri = uri;
1044 }
1045 
getMatchingHost(const ReadPreferenceSetting & criteria) const1046 HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const {
1047     switch (criteria.pref) {
1048         // "Prefered" read preferences are defined in terms of other preferences
1049         case ReadPreference::PrimaryPreferred: {
1050             HostAndPort out =
1051                 getMatchingHost(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
1052             // NOTE: the spec says we should use the primary even if tags don't match
1053             if (!out.empty())
1054                 return out;
1055             return getMatchingHost(ReadPreferenceSetting(
1056                 ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
1057         }
1058 
1059         case ReadPreference::SecondaryPreferred: {
1060             HostAndPort out = getMatchingHost(ReadPreferenceSetting(
1061                 ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
1062             if (!out.empty())
1063                 return out;
1064             // NOTE: the spec says we should use the primary even if tags don't match
1065             return getMatchingHost(
1066                 ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
1067         }
1068 
1069         case ReadPreference::PrimaryOnly: {
1070             // NOTE: isMaster implies isUp
1071             Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster);
1072             if (it == nodes.end())
1073                 return HostAndPort();
1074             return it->host;
1075         }
1076 
1077         // The difference between these is handled by Node::matches
1078         case ReadPreference::SecondaryOnly:
1079         case ReadPreference::Nearest: {
1080             stdx::function<bool(const Node&)> matchNode = [](const Node& node) -> bool {
1081                 return true;
1082             };
1083             // build comparator
1084             if (criteria.maxStalenessSeconds.count()) {
1085                 auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster);
1086                 if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) {
1087                     auto writeDateCmp = [](const Node* a, const Node* b) -> bool {
1088                         return a->lastWriteDate < b->lastWriteDate;
1089                     };
1090                     // use only non failed nodes
1091                     std::vector<const Node*> upNodes;
1092                     for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) {
1093                         if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) {
1094                             upNodes.push_back(&(*nodeIt));
1095                         }
1096                     }
1097                     auto latestSecNode =
1098                         std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp);
1099                     if (latestSecNode == upNodes.end()) {
1100                         matchNode = [](const Node& node) -> bool { return false; };
1101                     } else {
1102                         Date_t maxWriteTime = (*latestSecNode)->lastWriteDate;
1103                         matchNode = [=](const Node& node) -> bool {
1104                             return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) +
1105                                 kRefreshPeriod <=
1106                                 criteria.maxStalenessSeconds;
1107                         };
1108                     }
1109                 } else {
1110                     Seconds primaryStaleness = duration_cast<Seconds>(
1111                         masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate);
1112                     matchNode = [=](const Node& node) -> bool {
1113                         return duration_cast<Seconds>(node.lastWriteDateUpdateTime -
1114                                                       node.lastWriteDate) -
1115                             primaryStaleness + kRefreshPeriod <=
1116                             criteria.maxStalenessSeconds;
1117                     };
1118                 }
1119             }
1120 
1121             BSONForEach(tagElem, criteria.tags.getTagBSON()) {
1122                 uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj());
1123                 BSONObj tag = tagElem.Obj();
1124 
1125                 std::vector<const Node*> matchingNodes;
1126                 for (size_t i = 0; i < nodes.size(); i++) {
1127                     if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) &&
1128                         matchNode(nodes[i])) {
1129                         matchingNodes.push_back(&nodes[i]);
1130                     }
1131                 }
1132 
1133                 // don't do more complicated selection if not needed
1134                 if (matchingNodes.empty()) {
1135                     continue;
1136                 }
1137                 if (matchingNodes.size() == 1) {
1138                     return matchingNodes.front()->host;
1139                 }
1140 
1141                 // Only consider nodes that satisfy the minOpTime
1142                 if (!criteria.minOpTime.isNull()) {
1143                     std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater);
1144                     for (size_t i = 0; i < matchingNodes.size(); i++) {
1145                         if (matchingNodes[i]->opTime < criteria.minOpTime) {
1146                             if (i == 0) {
1147                                 // If no nodes satisfy the minOpTime criteria, we ignore the
1148                                 // minOpTime requirement.
1149                                 break;
1150                             }
1151                             matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
1152                             break;
1153                         }
1154                     }
1155 
1156                     if (matchingNodes.size() == 1) {
1157                         return matchingNodes.front()->host;
1158                     }
1159                 }
1160 
1161                 // If there are multiple nodes satisfying the minOpTime, next order by latency
1162                 // and don't consider hosts further than a threshold from the closest.
1163                 std::sort(matchingNodes.begin(), matchingNodes.end(), compareLatencies);
1164                 for (size_t i = 1; i < matchingNodes.size(); i++) {
1165                     int64_t distance =
1166                         matchingNodes[i]->latencyMicros - matchingNodes[0]->latencyMicros;
1167                     if (distance >= latencyThresholdMicros) {
1168                         // this node and all remaining ones are too far away
1169                         matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
1170                         break;
1171                     }
1172                 }
1173 
1174                 // of the remaining nodes, pick one at random (or use round-robin)
1175                 if (ReplicaSetMonitor::useDeterministicHostSelection) {
1176                     // only in tests
1177                     return matchingNodes[roundRobin++ % matchingNodes.size()]->host;
1178                 } else {
1179                     // normal case
1180                     return matchingNodes[rand.nextInt32(matchingNodes.size())]->host;
1181                 };
1182             }
1183 
1184             return HostAndPort();
1185         }
1186 
1187         default:
1188             uassert(16337, "Unknown read preference", false);
1189             break;
1190     }
1191 }
1192 
findNode(const HostAndPort & host)1193 Node* SetState::findNode(const HostAndPort& host) {
1194     const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
1195     if (it == nodes.end() || it->host != host)
1196         return NULL;
1197 
1198     return &(*it);
1199 }
1200 
findOrCreateNode(const HostAndPort & host)1201 Node* SetState::findOrCreateNode(const HostAndPort& host) {
1202     // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class
1203     // must function correctly even with more nodes). If we lift that restriction, we may need
1204     // to consider alternate algorithms.
1205     Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
1206     if (it == nodes.end() || it->host != host) {
1207         LOG(2) << "Adding node " << host << " to our view of replica set " << name;
1208         it = nodes.insert(it, Node(host));
1209     }
1210     return &(*it);
1211 }
1212 
updateNodeIfInNodes(const IsMasterReply & reply)1213 void SetState::updateNodeIfInNodes(const IsMasterReply& reply) {
1214     Node* node = findNode(reply.host);
1215     if (!node) {
1216         LOG(2) << "Skipping application of ismaster reply from " << reply.host
1217                << " since it isn't a confirmed member of set " << name;
1218         return;
1219     }
1220 
1221     node->update(reply);
1222 }
1223 
getConfirmedServerAddress() const1224 std::string SetState::getConfirmedServerAddress() const {
1225     StringBuilder ss;
1226     if (!name.empty())
1227         ss << name << "/";
1228 
1229     for (std::set<HostAndPort>::const_iterator it = seedNodes.begin(); it != seedNodes.end();
1230          ++it) {
1231         if (it != seedNodes.begin())
1232             ss << ",";
1233         it->append(ss);
1234     }
1235 
1236     return ss.str();
1237 }
1238 
getUnconfirmedServerAddress() const1239 std::string SetState::getUnconfirmedServerAddress() const {
1240     StringBuilder ss;
1241     if (!name.empty())
1242         ss << name << "/";
1243 
1244     for (std::vector<Node>::const_iterator it = nodes.begin(); it != nodes.end(); ++it) {
1245         if (it != nodes.begin())
1246             ss << ",";
1247         it->host.append(ss);
1248     }
1249 
1250     return ss.str();
1251 }
1252 
checkInvariants() const1253 void SetState::checkInvariants() const {
1254     bool foundMaster = false;
1255     for (size_t i = 0; i < nodes.size(); i++) {
1256         // no empty hosts
1257         invariant(!nodes[i].host.empty());
1258 
1259         if (nodes[i].isMaster) {
1260             // masters must be up
1261             invariant(nodes[i].isUp);
1262 
1263             // at most one master
1264             invariant(!foundMaster);
1265             foundMaster = true;
1266 
1267             // if we have a master it should be the same as lastSeenMaster
1268             invariant(nodes[i].host == lastSeenMaster);
1269         }
1270 
1271         // should never end up with negative latencies
1272         invariant(nodes[i].latencyMicros >= 0);
1273 
1274         // nodes must be sorted by host with no-dupes
1275         invariant(i == 0 || (nodes[i - 1].host < nodes[i].host));
1276     }
1277 
1278     // nodes should be a (non-strict) superset of the seedNodes
1279     invariant(std::includes(
1280         nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts));
1281 
1282     if (currentScan) {
1283         // hostsToScan can't have dups or hosts already in triedHosts.
1284         std::set<HostAndPort> cantSee = currentScan->triedHosts;
1285         for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin();
1286              it != currentScan->hostsToScan.end();
1287              ++it) {
1288             invariant(!cantSee.count(*it));
1289             cantSee.insert(*it);  // make sure we don't see this again
1290         }
1291 
1292         // We should only be waitingFor hosts that are in triedHosts
1293         invariant(std::includes(currentScan->triedHosts.begin(),
1294                                 currentScan->triedHosts.end(),
1295                                 currentScan->waitingFor.begin(),
1296                                 currentScan->waitingFor.end()));
1297 
1298         // We should only have unconfirmedReplies if we haven't found a master yet
1299         invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty());
1300     }
1301 }
1302 
1303 template <typename Container>
enqueAllUntriedHosts(const Container & container,PseudoRandom & rand)1304 void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) {
1305     invariant(hostsToScan.empty());  // because we don't try to dedup hosts already in the queue.
1306 
1307     // no std::copy_if before c++11
1308     for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end;
1309          ++it) {
1310         if (!triedHosts.count(*it)) {
1311             hostsToScan.push_back(*it);
1312         }
1313     }
1314     std::random_shuffle(hostsToScan.begin(), hostsToScan.end(), rand);
1315 }
1316 }
1317