1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/client/replica_set_monitor.h"
36
37 #include <algorithm>
38 #include <limits>
39 #include <random>
40
41 #include "mongo/bson/simple_bsonelement_comparator.h"
42 #include "mongo/client/connpool.h"
43 #include "mongo/client/global_conn_pool.h"
44 #include "mongo/client/read_preference.h"
45 #include "mongo/client/replica_set_monitor_internal.h"
46 #include "mongo/db/operation_context.h"
47 #include "mongo/db/repl/bson_extract_optime.h"
48 #include "mongo/db/server_options.h"
49 #include "mongo/stdx/condition_variable.h"
50 #include "mongo/stdx/mutex.h"
51 #include "mongo/stdx/thread.h"
52 #include "mongo/util/background.h"
53 #include "mongo/util/debug_util.h"
54 #include "mongo/util/exit.h"
55 #include "mongo/util/fail_point_service.h"
56 #include "mongo/util/log.h"
57 #include "mongo/util/string_map.h"
58 #include "mongo/util/timer.h"
59
60 namespace mongo {
61
62 using std::shared_ptr;
63 using std::numeric_limits;
64 using std::set;
65 using std::string;
66 using std::vector;
67
68 // Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes.
69 MONGO_FP_DECLARE(failAsyncConfigChangeHook);
70
71 namespace {
72
73 // Pull nested types to top-level scope
74 typedef ReplicaSetMonitor::IsMasterReply IsMasterReply;
75 typedef ReplicaSetMonitor::ScanState ScanState;
76 typedef ReplicaSetMonitor::ScanStatePtr ScanStatePtr;
77 typedef ReplicaSetMonitor::SetState SetState;
78 typedef ReplicaSetMonitor::SetStatePtr SetStatePtr;
79 typedef ReplicaSetMonitor::Refresher Refresher;
80 typedef ScanState::UnconfirmedReplies UnconfirmedReplies;
81 typedef SetState::Node Node;
82 typedef SetState::Nodes Nodes;
83 using executor::TaskExecutor;
84 using CallbackArgs = TaskExecutor::CallbackArgs;
85 using CallbackHandle = TaskExecutor::CallbackHandle;
86
87 const double socketTimeoutSecs = 5;
88
89 // Intentionally chosen to compare worse than all known latencies.
90 const int64_t unknownLatency = numeric_limits<int64_t>::max();
91
92 const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
93 const Milliseconds kFindHostMaxBackOffTime(500);
94 AtomicBool areRefreshRetriesDisabledForTest{false}; // Only true in tests.
95
96 // TODO: Move to ReplicaSetMonitorManager
97 ReplicaSetMonitor::ConfigChangeHook asyncConfigChangeHook;
98 ReplicaSetMonitor::ConfigChangeHook syncConfigChangeHook;
99
100 //
101 // Helpers for stl algorithms
102 //
103
isMaster(const Node & node)104 bool isMaster(const Node& node) {
105 return node.isMaster;
106 }
107
opTimeGreater(const Node * lhs,const Node * rhs)108 bool opTimeGreater(const Node* lhs, const Node* rhs) {
109 return lhs->opTime > rhs->opTime;
110 }
111
compareLatencies(const Node * lhs,const Node * rhs)112 bool compareLatencies(const Node* lhs, const Node* rhs) {
113 // NOTE: this automatically compares Node::unknownLatency worse than all others.
114 return lhs->latencyMicros < rhs->latencyMicros;
115 }
116
hostsEqual(const Node & lhs,const HostAndPort & rhs)117 bool hostsEqual(const Node& lhs, const HostAndPort& rhs) {
118 return lhs.host == rhs;
119 }
120
121 // Allows comparing two Nodes, or a HostAndPort and a Node.
122 // NOTE: the two HostAndPort overload is only needed to support extra checks in some STL
123 // implementations. For simplicity, no comparator should be used with collections of just
124 // HostAndPort.
125 struct CompareHosts {
operator ()mongo::__anon4c2208890111::CompareHosts126 bool operator()(const Node& lhs, const Node& rhs) {
127 return lhs.host < rhs.host;
128 }
operator ()mongo::__anon4c2208890111::CompareHosts129 bool operator()(const Node& lhs, const HostAndPort& rhs) {
130 return lhs.host < rhs;
131 }
operator ()mongo::__anon4c2208890111::CompareHosts132 bool operator()(const HostAndPort& lhs, const Node& rhs) {
133 return lhs < rhs.host;
134 }
operator ()mongo::__anon4c2208890111::CompareHosts135 bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) {
136 return lhs < rhs;
137 }
138 } compareHosts; // like an overloaded function, but able to pass to stl algorithms
139
140 // The following structs should be treated as functions returning a UnaryPredicate.
141 // Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost));
142 // They all hold their constructor argument by reference.
143
144 struct HostIs {
HostIsmongo::__anon4c2208890111::HostIs145 explicit HostIs(const HostAndPort& host) : _host(host) {}
operator ()mongo::__anon4c2208890111::HostIs146 bool operator()(const HostAndPort& host) {
147 return host == _host;
148 }
operator ()mongo::__anon4c2208890111::HostIs149 bool operator()(const Node& node) {
150 return node.host == _host;
151 }
152 const HostAndPort& _host;
153 };
154
155 struct HostNotIn {
HostNotInmongo::__anon4c2208890111::HostNotIn156 explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {}
operator ()mongo::__anon4c2208890111::HostNotIn157 bool operator()(const HostAndPort& host) {
158 return !_hosts.count(host);
159 }
operator ()mongo::__anon4c2208890111::HostNotIn160 bool operator()(const Node& node) {
161 return !_hosts.count(node.host);
162 }
163 const std::set<HostAndPort>& _hosts;
164 };
165
pingTimeMillis(const Node & node)166 int32_t pingTimeMillis(const Node& node) {
167 auto latencyMillis = node.latencyMicros / 1000;
168 if (latencyMillis > numeric_limits<int32_t>::max()) {
169 // In particular, Node::unknownLatency does not fit in an int32.
170 return numeric_limits<int32_t>::max();
171 }
172 return latencyMillis;
173 }
174
175 /**
176 * Replica set refresh period on the task executor.
177 */
178 const Seconds kRefreshPeriod(30);
179 } // namespace
180
181 // If we cannot find a host after 15 seconds of refreshing, give up
182 const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15);
183
184 // Defaults to random selection as required by the spec
185 bool ReplicaSetMonitor::useDeterministicHostSelection = false;
186
ReplicaSetMonitor(StringData name,const std::set<HostAndPort> & seeds)187 ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds)
188 : _state(std::make_shared<SetState>(name, seeds)),
189 _executor(globalRSMonitorManager.getExecutor()) {}
190
ReplicaSetMonitor(const MongoURI & uri)191 ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
192 : _state(std::make_shared<SetState>(uri)), _executor(globalRSMonitorManager.getExecutor()) {}
193
init()194 void ReplicaSetMonitor::init() {
195 stdx::lock_guard<stdx::mutex> lk(_mutex);
196 invariant(_executor);
197 std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
198 auto status = _executor->scheduleWork([=](const CallbackArgs& cbArgs) {
199 if (auto ptr = that.lock()) {
200 ptr->_refresh(cbArgs);
201 }
202 });
203
204 if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
205 LOG(1) << "Couldn't schedule refresh for " << getName()
206 << ". Executor shutdown in progress";
207 return;
208 }
209
210 if (!status.isOK()) {
211 severe() << "Can't start refresh for replica set " << getName()
212 << causedBy(redact(status.getStatus()));
213 fassertFailed(40139);
214 }
215
216 _refresherHandle = status.getValue();
217 }
218
~ReplicaSetMonitor()219 ReplicaSetMonitor::~ReplicaSetMonitor() {
220 // need this lock because otherwise can get race with scheduling in _refresh
221 stdx::lock_guard<stdx::mutex> lk(_mutex);
222 if (!_refresherHandle || !_executor) {
223 return;
224 }
225
226 _executor->cancel(_refresherHandle);
227 // Note: calling _executor->wait(_refresherHandle); from the dispatcher thread will cause hang
228 // Its ok not to call it because the d-tor is called only when the last owning pointer goes out
229 // of scope, so as taskExecutor queue holds a weak pointer to RSM it will not be able to get a
230 // task to execute eliminating the need to call method "wait".
231 //
232 _refresherHandle = {};
233 }
234
_refresh(const CallbackArgs & cbArgs)235 void ReplicaSetMonitor::_refresh(const CallbackArgs& cbArgs) {
236 if (!cbArgs.status.isOK()) {
237 return;
238 }
239
240 Timer t;
241 startOrContinueRefresh().refreshAll();
242 LOG(1) << "Refreshing replica set " << getName() << " took " << t.millis() << " msec";
243
244 // Reschedule the refresh
245 invariant(_executor);
246
247 if (_isRemovedFromManager.load()) { // already removed so no need to refresh
248 LOG(1) << "Stopping refresh for replica set " << getName() << " because its removed";
249 return;
250 }
251
252 stdx::lock_guard<stdx::mutex> lk(_mutex);
253
254 std::weak_ptr<ReplicaSetMonitor> that(shared_from_this());
255 auto status = _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
256 [=](const CallbackArgs& cbArgs) {
257 if (auto ptr = that.lock()) {
258 ptr->_refresh(cbArgs);
259 }
260 });
261
262 if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
263 LOG(1) << "Cant schedule refresh for " << getName() << ". Executor shutdown in progress";
264 return;
265 }
266
267 if (!status.isOK()) {
268 severe() << "Can't continue refresh for replica set " << getName() << " due to "
269 << redact(status.getStatus());
270 fassertFailed(40140);
271 }
272
273 _refresherHandle = status.getValue();
274 }
275
getHostOrRefresh(const ReadPreferenceSetting & criteria,Milliseconds maxWait)276 StatusWith<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
277 Milliseconds maxWait) {
278 if (_isRemovedFromManager.load()) {
279 return {ErrorCodes::ReplicaSetMonitorRemoved,
280 str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"};
281 }
282
283 {
284 // Fast path, for the failure-free case
285 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
286 HostAndPort out = _state->getMatchingHost(criteria);
287 if (!out.empty())
288 return {std::move(out)};
289 }
290
291 const auto startTimeMs = Date_t::now();
292
293 while (true) {
294 // We might not have found any matching hosts due to the scan, which just completed may have
295 // seen stale data from before we joined. Therefore we should participate in a new scan to
296 // make sure all hosts are contacted at least once (possibly by other threads) before this
297 // function gives up.
298 Refresher refresher(startOrContinueRefresh());
299
300 HostAndPort out = refresher.refreshUntilMatches(criteria);
301 if (!out.empty())
302 return {std::move(out)};
303
304 if (globalInShutdownDeprecated()) {
305 return {ErrorCodes::ShutdownInProgress, str::stream() << "Server is shutting down"};
306 }
307
308 const Milliseconds remaining = maxWait - (Date_t::now() - startTimeMs);
309
310 if (remaining < kFindHostMaxBackOffTime || areRefreshRetriesDisabledForTest.load()) {
311 break;
312 }
313
314 // Back-off so we don't spam the replica set hosts too much
315 sleepFor(kFindHostMaxBackOffTime);
316 }
317
318 return {ErrorCodes::FailedToSatisfyReadPreference,
319 str::stream() << "Could not find host matching read preference " << criteria.toString()
320 << " for set "
321 << getName()};
322 }
323
getMasterOrUassert()324 HostAndPort ReplicaSetMonitor::getMasterOrUassert() {
325 return uassertStatusOK(getHostOrRefresh(kPrimaryOnlyReadPreference));
326 }
327
startOrContinueRefresh()328 Refresher ReplicaSetMonitor::startOrContinueRefresh() {
329 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
330
331 Refresher out(_state);
332 DEV _state->checkInvariants();
333 return out;
334 }
335
failedHost(const HostAndPort & host,const Status & status)336 void ReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
337 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
338 Node* node = _state->findNode(host);
339 if (node)
340 node->markFailed(status);
341 DEV _state->checkInvariants();
342 }
343
isPrimary(const HostAndPort & host) const344 bool ReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
345 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
346 Node* node = _state->findNode(host);
347 return node ? node->isMaster : false;
348 }
349
isHostUp(const HostAndPort & host) const350 bool ReplicaSetMonitor::isHostUp(const HostAndPort& host) const {
351 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
352 Node* node = _state->findNode(host);
353 return node ? node->isUp : false;
354 }
355
getMinWireVersion() const356 int ReplicaSetMonitor::getMinWireVersion() const {
357 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
358 int minVersion = 0;
359 for (const auto& host : _state->nodes) {
360 if (host.isUp) {
361 minVersion = std::max(minVersion, host.minWireVersion);
362 }
363 }
364
365 return minVersion;
366 }
367
getMaxWireVersion() const368 int ReplicaSetMonitor::getMaxWireVersion() const {
369 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
370 int maxVersion = std::numeric_limits<int>::max();
371 for (const auto& host : _state->nodes) {
372 if (host.isUp) {
373 maxVersion = std::min(maxVersion, host.maxWireVersion);
374 }
375 }
376
377 return maxVersion;
378 }
379
getName() const380 std::string ReplicaSetMonitor::getName() const {
381 // name is const so don't need to lock
382 return _state->name;
383 }
384
getServerAddress() const385 std::string ReplicaSetMonitor::getServerAddress() const {
386 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
387 return _state->getConfirmedServerAddress();
388 }
389
contains(const HostAndPort & host) const390 bool ReplicaSetMonitor::contains(const HostAndPort& host) const {
391 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
392 return _state->seedNodes.count(host);
393 }
394
createIfNeeded(const string & name,const set<HostAndPort> & servers)395 shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const string& name,
396 const set<HostAndPort>& servers) {
397 return globalRSMonitorManager.getOrCreateMonitor(
398 ConnectionString::forReplicaSet(name, vector<HostAndPort>(servers.begin(), servers.end())));
399 }
400
createIfNeeded(const MongoURI & uri)401 shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const MongoURI& uri) {
402 return globalRSMonitorManager.getOrCreateMonitor(uri);
403 }
404
get(const std::string & name)405 shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::get(const std::string& name) {
406 return globalRSMonitorManager.getMonitor(name);
407 }
408
remove(const string & name)409 void ReplicaSetMonitor::remove(const string& name) {
410 globalRSMonitorManager.removeMonitor(name);
411
412 // Kill all pooled ReplicaSetConnections for this set. They will not function correctly
413 // after we kill the ReplicaSetMonitor.
414 globalConnPool.removeHost(name);
415 }
416
setAsynchronousConfigChangeHook(ConfigChangeHook hook)417 void ReplicaSetMonitor::setAsynchronousConfigChangeHook(ConfigChangeHook hook) {
418 invariant(!asyncConfigChangeHook);
419 asyncConfigChangeHook = hook;
420 }
421
setSynchronousConfigChangeHook(ConfigChangeHook hook)422 void ReplicaSetMonitor::setSynchronousConfigChangeHook(ConfigChangeHook hook) {
423 invariant(!syncConfigChangeHook);
424 syncConfigChangeHook = hook;
425 }
426
427 // TODO move to correct order with non-statics before pushing
appendInfo(BSONObjBuilder & bsonObjBuilder,bool forFTDC) const428 void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const {
429 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
430
431 BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName()));
432 if (forFTDC) {
433 for (size_t i = 0; i < _state->nodes.size(); i++) {
434 const Node& node = _state->nodes[i];
435 monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node));
436 }
437 return;
438 }
439
440 // NOTE: the format here must be consistent for backwards compatibility
441 BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts"));
442 for (size_t i = 0; i < _state->nodes.size(); i++) {
443 const Node& node = _state->nodes[i];
444
445 BSONObjBuilder builder;
446 builder.append("addr", node.host.toString());
447 builder.append("ok", node.isUp);
448 builder.append("ismaster", node.isMaster); // intentionally not camelCase
449 builder.append("hidden", false); // we don't keep hidden nodes in the set
450 builder.append("secondary", node.isUp && !node.isMaster);
451 builder.append("pingTimeMillis", pingTimeMillis(node));
452
453 if (!node.tags.isEmpty()) {
454 builder.append("tags", node.tags);
455 }
456
457 hosts.append(builder.obj());
458 }
459 }
460
shutdown()461 void ReplicaSetMonitor::shutdown() {
462 globalRSMonitorManager.shutdown();
463 }
464
cleanup()465 void ReplicaSetMonitor::cleanup() {
466 globalRSMonitorManager.removeAllMonitors();
467 asyncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
468 syncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
469 }
470
disableRefreshRetries_forTest()471 void ReplicaSetMonitor::disableRefreshRetries_forTest() {
472 areRefreshRetriesDisabledForTest.store(true);
473 }
474
isKnownToHaveGoodPrimary() const475 bool ReplicaSetMonitor::isKnownToHaveGoodPrimary() const {
476 stdx::lock_guard<stdx::mutex> lk(_state->mutex);
477
478 for (const auto& node : _state->nodes) {
479 if (node.isMaster) {
480 return true;
481 }
482 }
483
484 return false;
485 }
486
markAsRemoved()487 void ReplicaSetMonitor::markAsRemoved() {
488 _isRemovedFromManager.store(true);
489 }
490
Refresher(const SetStatePtr & setState)491 Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) {
492 if (_scan)
493 return; // participate in in-progress scan
494
495 LOG(2) << "Starting new refresh of replica set " << _set->name;
496 _scan = startNewScan(_set.get());
497 _set->currentScan = _scan;
498 }
499
refreshUntilMatches(const ReadPreferenceSetting & criteria)500 HostAndPort Refresher::refreshUntilMatches(const ReadPreferenceSetting& criteria) {
501 return _refreshUntilMatches(&criteria);
502 };
503
refreshAll()504 void Refresher::refreshAll() {
505 _refreshUntilMatches(nullptr);
506 }
507
getNextStep()508 Refresher::NextStep Refresher::getNextStep() {
509 // No longer the current scan
510 if (_scan != _set->currentScan) {
511 return NextStep(NextStep::DONE);
512 }
513
514 // Wait for all dispatched hosts to return before trying any fallback hosts.
515 if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) {
516 return NextStep(NextStep::WAIT);
517 }
518
519 // If we haven't yet found a master, try contacting unconfirmed hosts
520 if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) {
521 _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand);
522 _scan->possibleNodes.clear();
523 }
524
525 if (_scan->hostsToScan.empty()) {
526 // We've tried all hosts we can, so nothing more to do in this round.
527 if (!_scan->foundUpMaster) {
528 warning() << "Unable to reach primary for set " << _set->name;
529
530 // Since we've talked to everyone we could but still didn't find a primary, we
531 // do the best we can, and assume all unconfirmedReplies are actually from nodes
532 // in the set (we've already confirmed that they think they are). This is
533 // important since it allows us to bootstrap to a usable state even if we are
534 // unable to talk to a master while starting up. As soon as we are able to
535 // contact a master, we will remove any nodes that it doesn't think are part of
536 // the set, undoing the damage we cause here.
537
538 // NOTE: we don't modify seedNodes or notify about set membership change in this
539 // case since it hasn't been confirmed by a master.
540 const string oldAddr = _set->getUnconfirmedServerAddress();
541 for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
542 it != _scan->unconfirmedReplies.end();
543 ++it) {
544 _set->findOrCreateNode(it->host)->update(*it);
545 }
546
547 const string newAddr = _set->getUnconfirmedServerAddress();
548 if (oldAddr != newAddr && syncConfigChangeHook) {
549 // Run the syncConfigChangeHook because the ShardRegistry needs to know about any
550 // node we might talk to. Don't run the asyncConfigChangeHook because we don't
551 // want to update the seed list stored on the config servers with unconfirmed hosts.
552 syncConfigChangeHook(_set->name, _set->getUnconfirmedServerAddress());
553 }
554 }
555
556 if (_scan->foundAnyUpNodes) {
557 _set->consecutiveFailedScans = 0;
558 } else {
559 auto nScans = _set->consecutiveFailedScans++;
560 if (nScans <= 10 || nScans % 10 == 0) {
561 log() << "Cannot reach any nodes for set " << _set->name
562 << ". Please check network connectivity and the status of the set. "
563 << "This has happened for " << _set->consecutiveFailedScans
564 << " checks in a row.";
565 }
566 }
567
568 // Makes sure all other Refreshers in this round return DONE
569 _set->currentScan.reset();
570
571 return NextStep(NextStep::DONE);
572 }
573
574 // Pop and return the next hostToScan.
575 HostAndPort host = _scan->hostsToScan.front();
576 _scan->hostsToScan.pop_front();
577 _scan->waitingFor.insert(host);
578 _scan->triedHosts.insert(host);
579
580 return NextStep(NextStep::CONTACT_HOST, host);
581 }
582
receivedIsMaster(const HostAndPort & from,int64_t latencyMicros,const BSONObj & replyObj)583 void Refresher::receivedIsMaster(const HostAndPort& from,
584 int64_t latencyMicros,
585 const BSONObj& replyObj) {
586 // Be careful: all return paths must call either failedHost or cv.notify_all!
587 _scan->waitingFor.erase(from);
588
589 const IsMasterReply reply(from, latencyMicros, replyObj);
590
591 // Handle various failure cases
592 if (!reply.ok) {
593 failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"});
594 return;
595 }
596
597 if (reply.setName != _set->name) {
598 if (reply.raw["isreplicaset"].trueValue()) {
599 // The reply came from a node in the state referred to as RSGhost in the SDAM
600 // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event,
601 // if a reply from a ghost offers a list of possible other members of the replica set,
602 // and if this refresher has yet to find the replica set master, we add hosts listed in
603 // the reply to the list of possible replica set members.
604 if (!_scan->foundUpMaster) {
605 _scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end());
606 }
607 } else {
608 warning() << "node: " << from << " isn't a part of set: " << _set->name
609 << " ismaster: " << replyObj;
610 }
611
612 failedHost(from,
613 {ErrorCodes::InconsistentReplicaSetNames,
614 str::stream() << "Target replica set name " << reply.setName
615 << " does not match the monitored set name "
616 << _set->name});
617 return;
618 }
619
620 if (reply.isMaster) {
621 Status status = receivedIsMasterFromMaster(from, reply);
622 if (!status.isOK()) {
623 failedHost(from, status);
624 return;
625 }
626 }
627
628 if (_scan->foundUpMaster) {
629 // We only update a Node if a master has confirmed it is in the set.
630 _set->updateNodeIfInNodes(reply);
631 } else {
632 receivedIsMasterBeforeFoundMaster(reply);
633 _scan->unconfirmedReplies.push_back(reply);
634 }
635
636 // _set->nodes may still not have any nodes with isUp==true, but we have at least found a
637 // connectible host that is that claims to be in the set.
638 _scan->foundAnyUpNodes = true;
639
640 // TODO consider only notifying if we've updated a node or we've emptied waitingFor.
641 _set->cv.notify_all();
642
643 DEV _set->checkInvariants();
644 }
645
failedHost(const HostAndPort & host,const Status & status)646 void Refresher::failedHost(const HostAndPort& host, const Status& status) {
647 _scan->waitingFor.erase(host);
648
649 // Failed hosts can't pass criteria, so the only way they'd effect the _refreshUntilMatches
650 // loop is if it was the last host we were waitingFor.
651 if (_scan->waitingFor.empty())
652 _set->cv.notify_all();
653
654 Node* node = _set->findNode(host);
655 if (node)
656 node->markFailed(status);
657 }
658
startNewScan(const SetState * set)659 ScanStatePtr Refresher::startNewScan(const SetState* set) {
660 const ScanStatePtr scan = std::make_shared<ScanState>();
661
662 // The heuristics we use in deciding the order to contact hosts are designed to find a
663 // master as quickly as possible. This is because we can't use any hosts we find until
664 // we either get the latest set of members from a master or talk to all possible hosts
665 // without finding a master.
666
667 // TODO It might make sense to check down nodes first if the last seen master is still
668 // marked as up.
669
670 int upNodes = 0;
671 for (Nodes::const_iterator it(set->nodes.begin()), end(set->nodes.end()); it != end; ++it) {
672 if (it->isUp) {
673 // scan the nodes we think are up first
674 scan->hostsToScan.push_front(it->host);
675 upNodes++;
676 } else {
677 scan->hostsToScan.push_back(it->host);
678 }
679 }
680
681 // shuffle the queue, but keep "up" nodes at the front
682 std::random_shuffle(scan->hostsToScan.begin(), scan->hostsToScan.begin() + upNodes, set->rand);
683 std::random_shuffle(scan->hostsToScan.begin() + upNodes, scan->hostsToScan.end(), set->rand);
684
685 if (!set->lastSeenMaster.empty()) {
686 // move lastSeenMaster to front of queue
687 std::stable_partition(
688 scan->hostsToScan.begin(), scan->hostsToScan.end(), HostIs(set->lastSeenMaster));
689 }
690
691 return scan;
692 }
693
receivedIsMasterFromMaster(const HostAndPort & from,const IsMasterReply & reply)694 Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) {
695 invariant(reply.isMaster);
696
697 // Reject if config version is older. This is for backwards compatibility with nodes in pv0
698 // since they don't have the same ordering with pv1 electionId.
699 if (reply.configVersion < _set->configVersion) {
700 return {ErrorCodes::NotMaster,
701 str::stream() << "Node " << from
702 << " believes it is primary, but its config version "
703 << reply.configVersion
704 << " is older than the most recent config version "
705 << _set->configVersion};
706 }
707
708 if (reply.electionId.isSet()) {
709 // ElectionIds are only comparable if they are of the same protocol version. However, since
710 // isMaster has no protocol version field, we use the configVersion instead. This works
711 // because configVersion needs to be incremented whenever the protocol version is changed.
712 if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() &&
713 _set->maxElectionId.compare(reply.electionId) > 0) {
714 return {ErrorCodes::NotMaster,
715 str::stream() << "Node " << from
716 << " believes it is primary, but its election id "
717 << reply.electionId
718 << " is older than the most recent election id "
719 << _set->maxElectionId};
720 }
721
722 _set->maxElectionId = reply.electionId;
723 }
724
725 _set->configVersion = reply.configVersion;
726
727 // Mark all nodes as not master. We will mark ourself as master before releasing the lock.
728 // NOTE: we use a "last-wins" policy if multiple hosts claim to be master.
729 for (size_t i = 0; i < _set->nodes.size(); i++) {
730 _set->nodes[i].isMaster = false;
731 }
732
733 // Check if the master agrees with our current list of nodes.
734 // REMINDER: both _set->nodes and reply.normalHosts are sorted.
735 if (_set->nodes.size() != reply.normalHosts.size() ||
736 !std::equal(
737 _set->nodes.begin(), _set->nodes.end(), reply.normalHosts.begin(), hostsEqual)) {
738 LOG(2) << "Adjusting nodes in our view of replica set " << _set->name
739 << " based on master reply: " << redact(reply.raw);
740
741 // remove non-members from _set->nodes
742 _set->nodes.erase(
743 std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.normalHosts)),
744 _set->nodes.end());
745
746 // add new members to _set->nodes
747 for (std::set<HostAndPort>::const_iterator it = reply.normalHosts.begin();
748 it != reply.normalHosts.end();
749 ++it) {
750 _set->findOrCreateNode(*it);
751 }
752
753 // replace hostToScan queue with untried normal hosts. can both add and remove
754 // hosts from the queue.
755 _scan->hostsToScan.clear();
756 _scan->enqueAllUntriedHosts(reply.normalHosts, _set->rand);
757
758 if (!_scan->waitingFor.empty()) {
759 // make sure we don't wait for any hosts that aren't considered members
760 std::set<HostAndPort> newWaitingFor;
761 std::set_intersection(reply.normalHosts.begin(),
762 reply.normalHosts.end(),
763 _scan->waitingFor.begin(),
764 _scan->waitingFor.end(),
765 std::inserter(newWaitingFor, newWaitingFor.end()));
766 _scan->waitingFor.swap(newWaitingFor);
767 }
768 }
769
770 if (reply.normalHosts != _set->seedNodes) {
771 const string oldAddr = _set->getConfirmedServerAddress();
772 _set->seedNodes = reply.normalHosts;
773
774 // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare
775 // and we want to record our changes
776 log() << "changing hosts to " << _set->getConfirmedServerAddress() << " from " << oldAddr;
777
778 if (syncConfigChangeHook) {
779 syncConfigChangeHook(_set->name, _set->getConfirmedServerAddress());
780 }
781
782 if (asyncConfigChangeHook && !MONGO_FAIL_POINT(failAsyncConfigChangeHook)) {
783 // call from a separate thread to avoid blocking and holding lock while potentially
784 // going over the network
785 stdx::thread bg(asyncConfigChangeHook, _set->name, _set->getConfirmedServerAddress());
786 bg.detach();
787 }
788 }
789
790 // Update other nodes's information based on replies we've already seen
791 for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
792 it != _scan->unconfirmedReplies.end();
793 ++it) {
794 // this ignores replies from hosts not in _set->nodes (as modified above)
795 _set->updateNodeIfInNodes(*it);
796 }
797 _scan->unconfirmedReplies.clear();
798
799 _scan->foundUpMaster = true;
800 _set->lastSeenMaster = reply.host;
801
802 return Status::OK();
803 }
804
receivedIsMasterBeforeFoundMaster(const IsMasterReply & reply)805 void Refresher::receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply) {
806 invariant(!reply.isMaster);
807 // This function doesn't alter _set at all. It only modifies the work queue in _scan.
808
809 // Add everyone this host claims is in the set to possibleNodes.
810 _scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end());
811
812 // If this node thinks the primary is someone we haven't tried, make that the next
813 // hostToScan.
814 if (!reply.primary.empty() && !_scan->triedHosts.count(reply.primary)) {
815 std::deque<HostAndPort>::iterator it = std::stable_partition(
816 _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(reply.primary));
817
818 if (it == _scan->hostsToScan.begin()) {
819 // reply.primary wasn't in hostsToScan
820 _scan->hostsToScan.push_front(reply.primary);
821 }
822 }
823 }
824
_refreshUntilMatches(const ReadPreferenceSetting * criteria)825 HostAndPort Refresher::_refreshUntilMatches(const ReadPreferenceSetting* criteria) {
826 stdx::unique_lock<stdx::mutex> lk(_set->mutex);
827 while (true) {
828 if (criteria) {
829 HostAndPort out = _set->getMatchingHost(*criteria);
830 if (!out.empty())
831 return out;
832 }
833
834 const NextStep ns = getNextStep();
835 DEV _set->checkInvariants();
836
837 switch (ns.step) {
838 case NextStep::DONE:
839 // getNextStep may have updated nodes if no master was found
840 return criteria ? _set->getMatchingHost(*criteria) : HostAndPort();
841
842 case NextStep::WAIT: // TODO consider treating as DONE for refreshAll
843 _set->cv.wait(lk);
844 continue;
845
846 case NextStep::CONTACT_HOST: {
847 StatusWith<BSONObj> isMasterReplyStatus{ErrorCodes::InternalError,
848 "Uninitialized variable"};
849 int64_t pingMicros = 0;
850 MongoURI targetURI;
851
852 if (_set->setUri.isValid()) {
853 targetURI = _set->setUri.cloneURIForServer(ns.host);
854 targetURI.setUser("");
855 targetURI.setPassword("");
856 } else {
857 targetURI = MongoURI(ConnectionString(ns.host));
858 }
859
860 // Do not do network calls while holding a mutex
861 lk.unlock();
862 try {
863 ScopedDbConnection conn(targetURI, socketTimeoutSecs);
864 bool ignoredOutParam = false;
865 Timer timer;
866 BSONObj reply;
867 conn->isMaster(ignoredOutParam, &reply);
868 isMasterReplyStatus = reply;
869 pingMicros = timer.micros();
870 conn.done(); // return to pool on success.
871 } catch (const DBException& ex) {
872 isMasterReplyStatus = ex.toStatus();
873 }
874 lk.lock();
875
876 // Ignore the reply and return if we are no longer the current scan. This might
877 // happen if it was decided that the host we were contacting isn't part of the set.
878 if (_scan != _set->currentScan)
879 return criteria ? _set->getMatchingHost(*criteria) : HostAndPort();
880
881 if (isMasterReplyStatus.isOK())
882 receivedIsMaster(ns.host, pingMicros, isMasterReplyStatus.getValue());
883 else
884 failedHost(ns.host, isMasterReplyStatus.getStatus());
885 }
886 }
887 }
888 }
889
parse(const BSONObj & obj)890 void IsMasterReply::parse(const BSONObj& obj) {
891 try {
892 raw = obj.getOwned(); // don't use obj again after this line
893
894 ok = raw["ok"].trueValue();
895 if (!ok)
896 return;
897
898 setName = raw["setName"].str();
899 hidden = raw["hidden"].trueValue();
900 secondary = raw["secondary"].trueValue();
901
902 minWireVersion = raw["minWireVersion"].numberInt();
903 maxWireVersion = raw["maxWireVersion"].numberInt();
904
905 // hidden nodes can't be master, even if they claim to be.
906 isMaster = !hidden && raw["ismaster"].trueValue();
907
908 if (isMaster && raw.hasField("electionId")) {
909 electionId = raw["electionId"].OID();
910 }
911
912 configVersion = raw["setVersion"].numberInt();
913
914 const string primaryString = raw["primary"].str();
915 primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString);
916
917 // both hosts and passives, but not arbiters, are considered "normal hosts"
918 normalHosts.clear();
919 BSONForEach(host, raw.getObjectField("hosts")) {
920 normalHosts.insert(HostAndPort(host.String()));
921 }
922 BSONForEach(host, raw.getObjectField("passives")) {
923 normalHosts.insert(HostAndPort(host.String()));
924 }
925
926 tags = raw.getObjectField("tags");
927 BSONObj lastWriteField = raw.getObjectField("lastWrite");
928 if (!lastWriteField.isEmpty()) {
929 if (auto lastWrite = lastWriteField["lastWriteDate"]) {
930 lastWriteDate = lastWrite.date();
931 }
932
933 uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime));
934 }
935 } catch (const std::exception& e) {
936 ok = false;
937 log() << "exception while parsing isMaster reply: " << e.what() << " " << obj;
938 }
939 }
940
Node(const HostAndPort & host)941 Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {}
942
markFailed(const Status & status)943 void Node::markFailed(const Status& status) {
944 if (isUp) {
945 log() << "Marking host " << host << " as failed" << causedBy(redact(status));
946
947 isUp = false;
948 }
949
950 isMaster = false;
951 }
952
matches(const ReadPreference pref) const953 bool Node::matches(const ReadPreference pref) const {
954 if (!isUp)
955 return false;
956
957 if (pref == ReadPreference::PrimaryOnly) {
958 return isMaster;
959 }
960
961 if (pref == ReadPreference::SecondaryOnly) {
962 if (isMaster)
963 return false;
964 }
965
966 return true;
967 }
968
matches(const BSONObj & tag) const969 bool Node::matches(const BSONObj& tag) const {
970 BSONForEach(tagCriteria, tag) {
971 if (SimpleBSONElementComparator::kInstance.evaluate(
972 this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) {
973 return false;
974 }
975 }
976
977 return true;
978 }
979
update(const IsMasterReply & reply)980 void Node::update(const IsMasterReply& reply) {
981 invariant(host == reply.host);
982 invariant(reply.ok);
983
984 LOG(3) << "Updating host " << host << " based on ismaster reply: " << reply.raw;
985
986 // Nodes that are hidden or neither master or secondary are considered down since we can't
987 // send any operations to them.
988 isUp = !reply.hidden && (reply.isMaster || reply.secondary);
989 isMaster = reply.isMaster;
990
991 minWireVersion = reply.minWireVersion;
992 maxWireVersion = reply.maxWireVersion;
993
994 // save a copy if unchanged
995 if (!tags.binaryEqual(reply.tags))
996 tags = reply.tags.getOwned();
997
998 if (reply.latencyMicros >= 0) { // TODO upper bound?
999 if (latencyMicros == unknownLatency) {
1000 latencyMicros = reply.latencyMicros;
1001 } else {
1002 // update latency with smoothed moving average (1/4th the delta)
1003 latencyMicros += (reply.latencyMicros - latencyMicros) / 4;
1004 }
1005 }
1006
1007 LOG(3) << "Updating " << host << " lastWriteDate to " << reply.lastWriteDate;
1008 lastWriteDate = reply.lastWriteDate;
1009
1010 LOG(3) << "Updating " << host << " opTime to " << reply.opTime;
1011 opTime = reply.opTime;
1012 lastWriteDateUpdateTime = Date_t::now();
1013 }
1014
SetState(StringData name,const std::set<HostAndPort> & seedNodes)1015 SetState::SetState(StringData name, const std::set<HostAndPort>& seedNodes)
1016 : name(name.toString()),
1017 consecutiveFailedScans(0),
1018 seedNodes(seedNodes),
1019 latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * 1000),
1020 rand(std::random_device()()),
1021 roundRobin(0) {
1022 uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
1023
1024 if (name.empty())
1025 warning() << "Replica set name empty, first node: " << *(seedNodes.begin());
1026
1027 // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a
1028 // scan until we start a scan and either find a master or contact all hosts without finding
1029 // one.
1030 // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to
1031 // sort nodes after this loop.
1032 for (std::set<HostAndPort>::const_iterator it = seedNodes.begin(); it != seedNodes.end();
1033 ++it) {
1034 nodes.push_back(Node(*it));
1035 }
1036
1037 DEV checkInvariants();
1038 }
1039
SetState(const MongoURI & uri)1040 SetState::SetState(const MongoURI& uri)
1041 : SetState(uri.getSetName(),
1042 std::set<HostAndPort>(uri.getServers().begin(), uri.getServers().end())) {
1043 setUri = uri;
1044 }
1045
getMatchingHost(const ReadPreferenceSetting & criteria) const1046 HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const {
1047 switch (criteria.pref) {
1048 // "Prefered" read preferences are defined in terms of other preferences
1049 case ReadPreference::PrimaryPreferred: {
1050 HostAndPort out =
1051 getMatchingHost(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
1052 // NOTE: the spec says we should use the primary even if tags don't match
1053 if (!out.empty())
1054 return out;
1055 return getMatchingHost(ReadPreferenceSetting(
1056 ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
1057 }
1058
1059 case ReadPreference::SecondaryPreferred: {
1060 HostAndPort out = getMatchingHost(ReadPreferenceSetting(
1061 ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
1062 if (!out.empty())
1063 return out;
1064 // NOTE: the spec says we should use the primary even if tags don't match
1065 return getMatchingHost(
1066 ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
1067 }
1068
1069 case ReadPreference::PrimaryOnly: {
1070 // NOTE: isMaster implies isUp
1071 Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster);
1072 if (it == nodes.end())
1073 return HostAndPort();
1074 return it->host;
1075 }
1076
1077 // The difference between these is handled by Node::matches
1078 case ReadPreference::SecondaryOnly:
1079 case ReadPreference::Nearest: {
1080 stdx::function<bool(const Node&)> matchNode = [](const Node& node) -> bool {
1081 return true;
1082 };
1083 // build comparator
1084 if (criteria.maxStalenessSeconds.count()) {
1085 auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster);
1086 if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) {
1087 auto writeDateCmp = [](const Node* a, const Node* b) -> bool {
1088 return a->lastWriteDate < b->lastWriteDate;
1089 };
1090 // use only non failed nodes
1091 std::vector<const Node*> upNodes;
1092 for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) {
1093 if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) {
1094 upNodes.push_back(&(*nodeIt));
1095 }
1096 }
1097 auto latestSecNode =
1098 std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp);
1099 if (latestSecNode == upNodes.end()) {
1100 matchNode = [](const Node& node) -> bool { return false; };
1101 } else {
1102 Date_t maxWriteTime = (*latestSecNode)->lastWriteDate;
1103 matchNode = [=](const Node& node) -> bool {
1104 return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) +
1105 kRefreshPeriod <=
1106 criteria.maxStalenessSeconds;
1107 };
1108 }
1109 } else {
1110 Seconds primaryStaleness = duration_cast<Seconds>(
1111 masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate);
1112 matchNode = [=](const Node& node) -> bool {
1113 return duration_cast<Seconds>(node.lastWriteDateUpdateTime -
1114 node.lastWriteDate) -
1115 primaryStaleness + kRefreshPeriod <=
1116 criteria.maxStalenessSeconds;
1117 };
1118 }
1119 }
1120
1121 BSONForEach(tagElem, criteria.tags.getTagBSON()) {
1122 uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj());
1123 BSONObj tag = tagElem.Obj();
1124
1125 std::vector<const Node*> matchingNodes;
1126 for (size_t i = 0; i < nodes.size(); i++) {
1127 if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) &&
1128 matchNode(nodes[i])) {
1129 matchingNodes.push_back(&nodes[i]);
1130 }
1131 }
1132
1133 // don't do more complicated selection if not needed
1134 if (matchingNodes.empty()) {
1135 continue;
1136 }
1137 if (matchingNodes.size() == 1) {
1138 return matchingNodes.front()->host;
1139 }
1140
1141 // Only consider nodes that satisfy the minOpTime
1142 if (!criteria.minOpTime.isNull()) {
1143 std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater);
1144 for (size_t i = 0; i < matchingNodes.size(); i++) {
1145 if (matchingNodes[i]->opTime < criteria.minOpTime) {
1146 if (i == 0) {
1147 // If no nodes satisfy the minOpTime criteria, we ignore the
1148 // minOpTime requirement.
1149 break;
1150 }
1151 matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
1152 break;
1153 }
1154 }
1155
1156 if (matchingNodes.size() == 1) {
1157 return matchingNodes.front()->host;
1158 }
1159 }
1160
1161 // If there are multiple nodes satisfying the minOpTime, next order by latency
1162 // and don't consider hosts further than a threshold from the closest.
1163 std::sort(matchingNodes.begin(), matchingNodes.end(), compareLatencies);
1164 for (size_t i = 1; i < matchingNodes.size(); i++) {
1165 int64_t distance =
1166 matchingNodes[i]->latencyMicros - matchingNodes[0]->latencyMicros;
1167 if (distance >= latencyThresholdMicros) {
1168 // this node and all remaining ones are too far away
1169 matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
1170 break;
1171 }
1172 }
1173
1174 // of the remaining nodes, pick one at random (or use round-robin)
1175 if (ReplicaSetMonitor::useDeterministicHostSelection) {
1176 // only in tests
1177 return matchingNodes[roundRobin++ % matchingNodes.size()]->host;
1178 } else {
1179 // normal case
1180 return matchingNodes[rand.nextInt32(matchingNodes.size())]->host;
1181 };
1182 }
1183
1184 return HostAndPort();
1185 }
1186
1187 default:
1188 uassert(16337, "Unknown read preference", false);
1189 break;
1190 }
1191 }
1192
findNode(const HostAndPort & host)1193 Node* SetState::findNode(const HostAndPort& host) {
1194 const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
1195 if (it == nodes.end() || it->host != host)
1196 return NULL;
1197
1198 return &(*it);
1199 }
1200
findOrCreateNode(const HostAndPort & host)1201 Node* SetState::findOrCreateNode(const HostAndPort& host) {
1202 // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class
1203 // must function correctly even with more nodes). If we lift that restriction, we may need
1204 // to consider alternate algorithms.
1205 Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
1206 if (it == nodes.end() || it->host != host) {
1207 LOG(2) << "Adding node " << host << " to our view of replica set " << name;
1208 it = nodes.insert(it, Node(host));
1209 }
1210 return &(*it);
1211 }
1212
updateNodeIfInNodes(const IsMasterReply & reply)1213 void SetState::updateNodeIfInNodes(const IsMasterReply& reply) {
1214 Node* node = findNode(reply.host);
1215 if (!node) {
1216 LOG(2) << "Skipping application of ismaster reply from " << reply.host
1217 << " since it isn't a confirmed member of set " << name;
1218 return;
1219 }
1220
1221 node->update(reply);
1222 }
1223
getConfirmedServerAddress() const1224 std::string SetState::getConfirmedServerAddress() const {
1225 StringBuilder ss;
1226 if (!name.empty())
1227 ss << name << "/";
1228
1229 for (std::set<HostAndPort>::const_iterator it = seedNodes.begin(); it != seedNodes.end();
1230 ++it) {
1231 if (it != seedNodes.begin())
1232 ss << ",";
1233 it->append(ss);
1234 }
1235
1236 return ss.str();
1237 }
1238
getUnconfirmedServerAddress() const1239 std::string SetState::getUnconfirmedServerAddress() const {
1240 StringBuilder ss;
1241 if (!name.empty())
1242 ss << name << "/";
1243
1244 for (std::vector<Node>::const_iterator it = nodes.begin(); it != nodes.end(); ++it) {
1245 if (it != nodes.begin())
1246 ss << ",";
1247 it->host.append(ss);
1248 }
1249
1250 return ss.str();
1251 }
1252
checkInvariants() const1253 void SetState::checkInvariants() const {
1254 bool foundMaster = false;
1255 for (size_t i = 0; i < nodes.size(); i++) {
1256 // no empty hosts
1257 invariant(!nodes[i].host.empty());
1258
1259 if (nodes[i].isMaster) {
1260 // masters must be up
1261 invariant(nodes[i].isUp);
1262
1263 // at most one master
1264 invariant(!foundMaster);
1265 foundMaster = true;
1266
1267 // if we have a master it should be the same as lastSeenMaster
1268 invariant(nodes[i].host == lastSeenMaster);
1269 }
1270
1271 // should never end up with negative latencies
1272 invariant(nodes[i].latencyMicros >= 0);
1273
1274 // nodes must be sorted by host with no-dupes
1275 invariant(i == 0 || (nodes[i - 1].host < nodes[i].host));
1276 }
1277
1278 // nodes should be a (non-strict) superset of the seedNodes
1279 invariant(std::includes(
1280 nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts));
1281
1282 if (currentScan) {
1283 // hostsToScan can't have dups or hosts already in triedHosts.
1284 std::set<HostAndPort> cantSee = currentScan->triedHosts;
1285 for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin();
1286 it != currentScan->hostsToScan.end();
1287 ++it) {
1288 invariant(!cantSee.count(*it));
1289 cantSee.insert(*it); // make sure we don't see this again
1290 }
1291
1292 // We should only be waitingFor hosts that are in triedHosts
1293 invariant(std::includes(currentScan->triedHosts.begin(),
1294 currentScan->triedHosts.end(),
1295 currentScan->waitingFor.begin(),
1296 currentScan->waitingFor.end()));
1297
1298 // We should only have unconfirmedReplies if we haven't found a master yet
1299 invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty());
1300 }
1301 }
1302
1303 template <typename Container>
enqueAllUntriedHosts(const Container & container,PseudoRandom & rand)1304 void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) {
1305 invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
1306
1307 // no std::copy_if before c++11
1308 for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end;
1309 ++it) {
1310 if (!triedHosts.count(*it)) {
1311 hostsToScan.push_back(*it);
1312 }
1313 }
1314 std::random_shuffle(hostsToScan.begin(), hostsToScan.end(), rand);
1315 }
1316 }
1317