1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <memory> 34 #include <utility> 35 #include <vector> 36 37 #include "mongo/base/status.h" 38 #include "mongo/bson/timestamp.h" 39 #include "mongo/db/concurrency/d_concurrency.h" 40 #include "mongo/db/repl/initial_syncer.h" 41 #include "mongo/db/repl/member_state.h" 42 #include "mongo/db/repl/optime.h" 43 #include "mongo/db/repl/repl_set_config.h" 44 #include "mongo/db/repl/replication_coordinator.h" 45 #include "mongo/db/repl/replication_coordinator_external_state.h" 46 #include "mongo/db/repl/sync_source_resolver.h" 47 #include "mongo/db/repl/topology_coordinator.h" 48 #include "mongo/db/repl/update_position_args.h" 49 #include "mongo/executor/task_executor.h" 50 #include "mongo/platform/atomic_word.h" 51 #include "mongo/platform/random.h" 52 #include "mongo/platform/unordered_map.h" 53 #include "mongo/platform/unordered_set.h" 54 #include "mongo/stdx/condition_variable.h" 55 #include "mongo/stdx/mutex.h" 56 #include "mongo/util/concurrency/with_lock.h" 57 #include "mongo/util/net/hostandport.h" 58 59 namespace mongo { 60 61 class Timer; 62 template <typename T> 63 class StatusWith; 64 65 namespace executor { 66 struct ConnectionPoolStats; 67 } // namespace executor 68 69 namespace rpc { 70 class OplogQueryMetadata; 71 class ReplSetMetadata; 72 } // namespace rpc 73 74 namespace repl { 75 76 class ElectCmdRunner; 77 class FreshnessChecker; 78 class HandshakeArgs; 79 class HeartbeatResponseAction; 80 class LastVote; 81 class OplogReader; 82 class ReplicationProcess; 83 class ReplSetRequestVotesArgs; 84 class ReplSetConfig; 85 class SyncSourceFeedback; 86 class StorageInterface; 87 class TopologyCoordinator; 88 class VoteRequester; 89 90 class ReplicationCoordinatorImpl : public ReplicationCoordinator { 91 MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl); 92 93 public: 94 ReplicationCoordinatorImpl(ServiceContext* serviceContext, 95 const ReplSettings& settings, 96 std::unique_ptr<ReplicationCoordinatorExternalState> externalState, 97 std::unique_ptr<executor::TaskExecutor> executor, 98 std::unique_ptr<TopologyCoordinator> topoCoord, 99 ReplicationProcess* replicationProcess, 100 StorageInterface* storage, 101 int64_t prngSeed); 102 103 virtual ~ReplicationCoordinatorImpl(); 104 105 // ================== Members of public ReplicationCoordinator API =================== 106 107 virtual void startup(OperationContext* opCtx) override; 108 109 virtual void enterTerminalShutdown() override; 110 111 virtual void shutdown(OperationContext* opCtx) override; 112 113 virtual const ReplSettings& getSettings() const override; 114 115 virtual Mode getReplicationMode() const override; 116 117 virtual MemberState getMemberState() const override; 118 119 virtual Status waitForMemberState(MemberState expectedState, Milliseconds timeout) override; 120 121 virtual bool isInPrimaryOrSecondaryState() const override; 122 123 virtual Seconds getSlaveDelaySecs() const override; 124 125 virtual void clearSyncSourceBlacklist() override; 126 127 virtual ReplicationCoordinator::StatusAndDuration awaitReplication( 128 OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern); 129 130 virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient( 131 OperationContext* opCtx, const WriteConcernOptions& writeConcern); 132 133 virtual Status stepDown(OperationContext* opCtx, 134 bool force, 135 const Milliseconds& waitTime, 136 const Milliseconds& stepdownTime); 137 138 virtual bool isMasterForReportingPurposes(); 139 140 virtual bool canAcceptWritesForDatabase(OperationContext* opCtx, StringData dbName); 141 virtual bool canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx, StringData dbName); 142 143 bool canAcceptWritesFor(OperationContext* opCtx, const NamespaceString& ns) override; 144 bool canAcceptWritesFor_UNSAFE(OperationContext* opCtx, const NamespaceString& ns) override; 145 146 virtual Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions& writeConcern) const; 147 148 virtual Status checkCanServeReadsFor(OperationContext* opCtx, 149 const NamespaceString& ns, 150 bool slaveOk); 151 virtual Status checkCanServeReadsFor_UNSAFE(OperationContext* opCtx, 152 const NamespaceString& ns, 153 bool slaveOk); 154 155 virtual bool shouldRelaxIndexConstraints(OperationContext* opCtx, const NamespaceString& ns); 156 157 virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts); 158 159 virtual void setMyLastAppliedOpTime(const OpTime& opTime); 160 virtual void setMyLastDurableOpTime(const OpTime& opTime); 161 162 virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime, DataConsistency consistency); 163 virtual void setMyLastDurableOpTimeForward(const OpTime& opTime); 164 165 virtual void resetMyLastOpTimes(); 166 167 virtual void setMyHeartbeatMessage(const std::string& msg); 168 169 virtual OpTime getMyLastAppliedOpTime() const override; 170 virtual OpTime getMyLastDurableOpTime() const override; 171 172 virtual Status waitUntilOpTimeForReadUntil(OperationContext* opCtx, 173 const ReadConcernArgs& readConcern, 174 boost::optional<Date_t> deadline) override; 175 176 virtual Status waitUntilOpTimeForRead(OperationContext* opCtx, 177 const ReadConcernArgs& readConcern) override; 178 179 virtual OID getElectionId() override; 180 181 virtual OID getMyRID() const override; 182 183 virtual int getMyId() const override; 184 185 virtual Status setFollowerMode(const MemberState& newState) override; 186 187 virtual ApplierState getApplierState() override; 188 189 virtual void signalDrainComplete(OperationContext* opCtx, 190 long long termWhenBufferIsEmpty) override; 191 192 virtual Status waitForDrainFinish(Milliseconds timeout) override; 193 194 virtual void signalUpstreamUpdater() override; 195 196 virtual Status resyncData(OperationContext* opCtx, bool waitUntilCompleted) override; 197 198 virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand() const override; 199 200 virtual Status processReplSetGetStatus(BSONObjBuilder* result, 201 ReplSetGetStatusResponseStyle responseStyle) override; 202 203 virtual void fillIsMasterForReplSet(IsMasterResponse* result, 204 const SplitHorizon::Parameters& horizonParams) override; 205 206 virtual void appendSlaveInfoData(BSONObjBuilder* result) override; 207 208 virtual ReplSetConfig getConfig() const override; 209 210 virtual void processReplSetGetConfig(BSONObjBuilder* result) override; 211 212 virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override; 213 214 virtual void advanceCommitPoint(const OpTime& committedOpTime) override; 215 216 virtual void cancelAndRescheduleElectionTimeout() override; 217 218 virtual Status setMaintenanceMode(bool activate) override; 219 220 virtual bool getMaintenanceMode() override; 221 222 virtual Status processReplSetSyncFrom(OperationContext* opCtx, 223 const HostAndPort& target, 224 BSONObjBuilder* resultObj) override; 225 226 virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override; 227 228 virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args, 229 ReplSetHeartbeatResponse* response) override; 230 231 virtual Status processReplSetReconfig(OperationContext* opCtx, 232 const ReplSetReconfigArgs& args, 233 BSONObjBuilder* resultObj) override; 234 235 virtual Status processReplSetInitiate(OperationContext* opCtx, 236 const BSONObj& configObj, 237 BSONObjBuilder* resultObj) override; 238 239 virtual Status processReplSetFresh(const ReplSetFreshArgs& args, 240 BSONObjBuilder* resultObj) override; 241 242 virtual Status processReplSetElect(const ReplSetElectArgs& args, 243 BSONObjBuilder* response) override; 244 245 virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates, 246 long long* configVersion) override; 247 248 virtual Status processHandshake(OperationContext* opCtx, 249 const HandshakeArgs& handshake) override; 250 251 virtual bool buildsIndexes() override; 252 253 virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, 254 bool durablyWritten) override; 255 256 virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override; 257 258 virtual WriteConcernOptions getGetLastErrorDefault() override; 259 260 virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override; 261 262 virtual bool isReplEnabled() const override; 263 264 virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) override; 265 266 virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override; 267 268 virtual void resetLastOpTimesFromOplog(OperationContext* opCtx, 269 DataConsistency consistency) override; 270 271 virtual bool shouldChangeSyncSource( 272 const HostAndPort& currentSource, 273 const rpc::ReplSetMetadata& replMetadata, 274 boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; 275 276 virtual OpTime getLastCommittedOpTime() const override; 277 278 virtual Status processReplSetRequestVotes(OperationContext* opCtx, 279 const ReplSetRequestVotesArgs& args, 280 ReplSetRequestVotesResponse* response) override; 281 282 virtual void prepareReplMetadata(const BSONObj& metadataRequestObj, 283 const OpTime& lastOpTimeFromClient, 284 BSONObjBuilder* builder) const override; 285 286 virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, 287 ReplSetHeartbeatResponse* response) override; 288 289 virtual bool isV1ElectionProtocol() const override; 290 291 virtual bool getWriteConcernMajorityShouldJournal() override; 292 293 virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override; 294 295 virtual void dropAllSnapshots() override; 296 /** 297 * Get current term from topology coordinator 298 */ 299 virtual long long getTerm() override; 300 301 // Returns the ServiceContext where this instance runs. getServiceContext()302 virtual ServiceContext* getServiceContext() override { 303 return _service; 304 } 305 306 virtual Status updateTerm(OperationContext* opCtx, long long term) override; 307 308 virtual Timestamp reserveSnapshotName(OperationContext* opCtx) override; 309 310 virtual OpTime getCurrentCommittedSnapshotOpTime() const override; 311 312 virtual void waitUntilSnapshotCommitted(OperationContext* opCtx, 313 const Timestamp& untilSnapshot) override; 314 315 virtual void appendDiagnosticBSON(BSONObjBuilder*) override; 316 317 virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; 318 319 virtual size_t getNumUncommittedSnapshots() override; 320 321 virtual WriteConcernOptions populateUnsetWriteConcernOptionsSyncMode( 322 WriteConcernOptions wc) override; 323 324 virtual ReplSettings::IndexPrefetchConfig getIndexPrefetchConfig() const override; 325 virtual void setIndexPrefetchConfig(const ReplSettings::IndexPrefetchConfig cfg) override; 326 327 virtual Status stepUpIfEligible(bool skipDryRun) override; 328 329 virtual Status abortCatchupIfNeeded() override; 330 331 // ================== Test support API =================== 332 333 /** 334 * If called after startReplication(), blocks until all asynchronous 335 * activities associated with replication start-up complete. 336 */ 337 void waitForStartUpComplete_forTest(); 338 339 /** 340 * Gets the replica set configuration in use by the node. 341 */ 342 ReplSetConfig getReplicaSetConfig_forTest(); 343 344 /** 345 * Returns scheduled time of election timeout callback. 346 * Returns Date_t() if callback is not scheduled. 347 */ 348 Date_t getElectionTimeout_forTest() const; 349 350 /* 351 * Return a randomized offset amount that is scaled in proportion to the size of the 352 * _electionTimeoutPeriod. 353 */ 354 Milliseconds getRandomizedElectionOffset_forTest(); 355 356 /** 357 * Returns the scheduled time of the priority takeover callback. If a priority 358 * takeover has not been scheduled, returns boost::none. 359 */ 360 boost::optional<Date_t> getPriorityTakeover_forTest() const; 361 362 /** 363 * Returns the scheduled time of the catchup takeover callback. If a catchup 364 * takeover has not been scheduled, returns boost::none. 365 */ 366 boost::optional<Date_t> getCatchupTakeover_forTest() const; 367 368 /** 369 * Returns the catchup takeover CallbackHandle. 370 */ 371 executor::TaskExecutor::CallbackHandle getCatchupTakeoverCbh_forTest() const; 372 373 /** 374 * Simple wrappers around _setLastOptime_inlock to make it easier to test. 375 */ 376 Status setLastAppliedOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime); 377 Status setLastDurableOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime); 378 379 /** 380 * Simple test wrappers that expose private methods. 381 */ 382 boost::optional<OpTime> calculateStableOpTime_forTest(const std::set<OpTime>& candidates, 383 const OpTime& commitPoint); 384 void cleanupStableOpTimeCandidates_forTest(std::set<OpTime>* candidates, OpTime stableOpTime); 385 std::set<OpTime> getStableOpTimeCandidates_forTest(); 386 boost::optional<OpTime> getStableOpTime_forTest(); 387 388 /** 389 * Non-blocking version of updateTerm. 390 * Returns event handle that we can use to wait for the operation to complete. 391 * When the operation is complete (waitForEvent() returns), 'updateResult' will be set 392 * to a status telling if the term increased or a stepdown was triggered. 393 */ 394 executor::TaskExecutor::EventHandle updateTerm_forTest( 395 long long term, TopologyCoordinator::UpdateTermResult* updateResult); 396 397 /** 398 * If called after _startElectSelfV1(), blocks until all asynchronous 399 * activities associated with election complete. 400 */ 401 void waitForElectionFinish_forTest(); 402 403 /** 404 * If called after _startElectSelfV1(), blocks until all asynchronous 405 * activities associated with election dry run complete, including writing 406 * last vote and scheduling the real election. 407 */ 408 void waitForElectionDryRunFinish_forTest(); 409 410 /** 411 * Waits until a stepdown command has begun. Callers should ensure that the stepdown attempt 412 * won't fully complete before this method is called, or this method may never return. 413 */ 414 void waitForStepDownAttempt_forTest(); 415 416 private: 417 using CallbackFn = executor::TaskExecutor::CallbackFn; 418 419 using CallbackHandle = executor::TaskExecutor::CallbackHandle; 420 421 using EventHandle = executor::TaskExecutor::EventHandle; 422 423 using ScheduleFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( 424 const executor::TaskExecutor::CallbackFn& work)>; 425 426 class LoseElectionGuardV1; 427 class LoseElectionDryRunGuardV1; 428 429 /** 430 * Configuration states for a replica set node. 431 * 432 * Transition diagram: 433 * 434 * PreStart ------------------> ReplicationDisabled 435 * | 436 * | 437 * v 438 * StartingUp -------> Uninitialized <------> Initiating 439 * \ ^ | 440 * ------- | | 441 * | | | 442 * v v | 443 * Reconfig <---> Steady <----> HBReconfig | 444 * ^ / 445 * | / 446 * \ / 447 * ----------------------- 448 */ 449 enum ConfigState { 450 kConfigPreStart, 451 kConfigStartingUp, 452 kConfigReplicationDisabled, 453 kConfigUninitialized, 454 kConfigSteady, 455 kConfigInitiating, 456 kConfigReconfiguring, 457 kConfigHBReconfiguring 458 }; 459 460 /** 461 * Type describing actions to take after a change to the MemberState _memberState. 462 */ 463 enum PostMemberStateUpdateAction { 464 kActionNone, 465 kActionCloseAllConnections, // Also indicates that we should clear sharding state. 466 kActionFollowerModeStateChange, 467 kActionWinElection, 468 kActionStartSingleNodeElection 469 }; 470 471 // Abstract struct that holds information about clients waiting for replication. 472 // Subclasses need to define how to notify them. 473 struct Waiter { 474 Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern); 475 virtual ~Waiter() = default; 476 477 BSONObj toBSON() const; 478 std::string toString() const; 479 // It is invalid to call notify_inlock() unless holding ReplicationCoordinatorImpl::_mutex. 480 virtual void notify_inlock() = 0; 481 482 const OpTime opTime; 483 const WriteConcernOptions* writeConcern = nullptr; 484 }; 485 486 // When ThreadWaiter gets notified, it will signal the conditional variable. 487 // 488 // This is used when a thread wants to block inline until the opTime is reached with the given 489 // writeConcern. 490 struct ThreadWaiter : public Waiter { 491 ThreadWaiter(OpTime _opTime, 492 const WriteConcernOptions* _writeConcern, 493 stdx::condition_variable* _condVar); 494 void notify_inlock() override; 495 496 stdx::condition_variable* condVar = nullptr; 497 }; 498 499 // When the waiter is notified, finishCallback will be called while holding replCoord _mutex 500 // since WaiterLists are protected by _mutex. 501 // 502 // This is used when we want to run a callback when the opTime is reached. 503 struct CallbackWaiter : public Waiter { 504 using FinishFunc = stdx::function<void()>; 505 506 CallbackWaiter(OpTime _opTime, FinishFunc _finishCallback); 507 void notify_inlock() override; 508 509 // The callback that will be called when this waiter is notified. 510 FinishFunc finishCallback = nullptr; 511 }; 512 513 class WaiterGuard; 514 515 class WaiterList { 516 public: 517 using WaiterType = Waiter*; 518 519 // Adds waiter into the list. 520 void add_inlock(WaiterType waiter); 521 // Returns whether waiter is found and removed. 522 bool remove_inlock(WaiterType waiter); 523 // Signals and removes all waiters that satisfy the condition. 524 void signalAndRemoveIf_inlock(stdx::function<bool(WaiterType)> fun); 525 // Signals and removes all waiters from the list. 526 void signalAndRemoveAll_inlock(); 527 528 private: 529 std::vector<WaiterType> _list; 530 }; 531 532 typedef std::vector<executor::TaskExecutor::CallbackHandle> HeartbeatHandles; 533 534 // The state and logic of primary catchup. 535 // 536 // When start() is called, CatchupState will schedule the timeout callback. When we get 537 // responses of the latest heartbeats from all nodes, the target time (opTime of _waiter) is 538 // set. 539 // The primary exits catchup mode when any of the following happens. 540 // 1) My last applied optime reaches the target optime, if we've received a heartbeat from all 541 // nodes. 542 // 2) Catchup timeout expires. 543 // 3) Primary steps down. 544 // 4) The primary has to roll back to catch up. 545 // 5) The primary is too stale to catch up. 546 // 547 // On abort, the state resets the pointer to itself in ReplCoordImpl. In other words, the 548 // life cycle of the state object aligns with the conceptual state. 549 // In shutdown, the timeout callback will be canceled by the executor and the state is safe to 550 // destroy. 551 // 552 // Any function of the state must be called while holding _mutex. 553 class CatchupState { 554 public: CatchupState(ReplicationCoordinatorImpl * repl)555 CatchupState(ReplicationCoordinatorImpl* repl) : _repl(repl) {} 556 // start() can only be called once. 557 void start_inlock(); 558 // Reset the state itself to destruct the state. 559 void abort_inlock(); 560 // Heartbeat calls this function to update the target optime. 561 void signalHeartbeatUpdate_inlock(); 562 563 private: 564 ReplicationCoordinatorImpl* _repl; // Not owned. 565 // Callback handle used to cancel a scheduled catchup timeout callback. 566 executor::TaskExecutor::CallbackHandle _timeoutCbh; 567 // Handle to a Waiter that contains the current target optime to reach after which 568 // we can exit catchup mode. 569 std::unique_ptr<CallbackWaiter> _waiter; 570 }; 571 572 void _resetMyLastOpTimes_inlock(); 573 574 /** 575 * Returns the _writeConcernMajorityJournalDefault of our current _rsConfig. 576 */ 577 bool getWriteConcernMajorityShouldJournal_inlock() const; 578 579 /** 580 * Returns the OpTime of the current committed snapshot, if one exists. 581 */ 582 OpTime _getCurrentCommittedSnapshotOpTime_inlock() const; 583 584 /** 585 * Returns the OpTime of the current committed snapshot converted to LogicalTime. 586 */ 587 LogicalTime _getCurrentCommittedLogicalTime_inlock() const; 588 589 /** 590 * Verifies that ReadConcernArgs match node's readConcern. 591 */ 592 Status _validateReadConcern(OperationContext* opCtx, const ReadConcernArgs& readConcern); 593 594 /** 595 * Helper to update our saved config, cancel any pending heartbeats, and kick off sending 596 * new heartbeats based on the new config. 597 * 598 * Returns an action to be performed after unlocking _mutex, via 599 * _performPostMemberStateUpdateAction. 600 */ 601 PostMemberStateUpdateAction _setCurrentRSConfig_inlock(OperationContext* opCtx, 602 const ReplSetConfig& newConfig, 603 int myIndex); 604 605 /** 606 * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication. 607 */ 608 void _wakeReadyWaiters_inlock(); 609 610 /** 611 * Scheduled to cause the ReplicationCoordinator to reconsider any state that might 612 * need to change as a result of time passing - for instance becoming PRIMARY when a single 613 * node replica set member's stepDown period ends. 614 */ 615 void _handleTimePassing(const executor::TaskExecutor::CallbackArgs& cbData); 616 617 /** 618 * Chooses a candidate for election handoff and sends a ReplSetStepUp command to it. 619 */ 620 void _performElectionHandoff(); 621 622 /** 623 * Helper method for _awaitReplication that takes an already locked unique_lock, but leaves 624 * operation timing to the caller. 625 */ 626 Status _awaitReplication_inlock(stdx::unique_lock<stdx::mutex>* lock, 627 OperationContext* opCtx, 628 const OpTime& opTime, 629 Timestamp minSnapshot, 630 const WriteConcernOptions& writeConcern); 631 632 /** 633 * Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable. 634 * 635 * If the writeConcern is 'majority', also waits for _currentCommittedSnapshot to be newer than 636 * minSnapshot. 637 */ 638 bool _doneWaitingForReplication_inlock(const OpTime& opTime, 639 Timestamp minSnapshot, 640 const WriteConcernOptions& writeConcern); 641 642 Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const; 643 644 /** 645 * Wakes up threads in the process of handling a stepdown request based on whether the 646 * TopologyCoordinator now believes enough secondaries are caught up for the stepdown request to 647 * complete. 648 */ 649 void _signalStepDownWaiterIfReady_inlock(); 650 651 bool _canAcceptWritesFor_inlock(const NamespaceString& ns); 652 653 OID _getMyRID_inlock() const; 654 655 int _getMyId_inlock() const; 656 657 OpTime _getMyLastAppliedOpTime_inlock() const; 658 OpTime _getMyLastDurableOpTime_inlock() const; 659 660 /** 661 * Helper method for updating our tracking of the last optime applied by a given node. 662 * This is only valid to call on replica sets. 663 * "configVersion" will be populated with our config version if it and the configVersion 664 * of "args" differ. 665 */ 666 Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args, 667 long long* configVersion); 668 669 /** 670 * This function will report our position externally (like upstream) if necessary. 671 * 672 * Takes in a unique lock, that must already be locked, on _mutex. 673 * 674 * Lock will be released after this method finishes. 675 */ 676 void _reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock); 677 678 /** 679 * Helpers to set the last applied and durable OpTime. 680 */ 681 void _setMyLastAppliedOpTime_inlock(const OpTime& opTime, 682 bool isRollbackAllowed, 683 DataConsistency consistency); 684 void _setMyLastDurableOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed); 685 686 /** 687 * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index 688 * into the replica set config members array that corresponds to the "target", or -1 if 689 * "target" is not in _rsConfig. 690 */ 691 void _scheduleHeartbeatToTarget_inlock(const HostAndPort& target, int targetIndex, Date_t when); 692 693 /** 694 * Processes each heartbeat response. 695 * 696 * Schedules additional heartbeats, triggers elections and step downs, etc. 697 */ 698 void _handleHeartbeatResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, 699 int targetIndex); 700 701 void _trackHeartbeatHandle_inlock( 702 const StatusWith<executor::TaskExecutor::CallbackHandle>& handle); 703 704 void _untrackHeartbeatHandle_inlock(const executor::TaskExecutor::CallbackHandle& handle); 705 706 /* 707 * Return a randomized offset amount that is scaled in proportion to the size of the 708 * _electionTimeoutPeriod. Used to add randomization to an election timeout. 709 */ 710 Milliseconds _getRandomizedElectionOffset_inlock(); 711 712 /** 713 * Helper for _handleHeartbeatResponse. 714 * 715 * Updates the lastDurableOpTime and lastAppliedOpTime associated with the member at 716 * "memberIndex" in our config. 717 */ 718 void _updateOpTimesFromHeartbeat_inlock(int targetIndex, 719 const OpTime& durableOpTime, 720 const OpTime& appliedOpTime); 721 722 /** 723 * Starts a heartbeat for each member in the current config. Called while holding _mutex. 724 */ 725 void _startHeartbeats_inlock(); 726 727 /** 728 * Cancels all heartbeats. Called while holding replCoord _mutex. 729 */ 730 void _cancelHeartbeats_inlock(); 731 732 /** 733 * Cancels all heartbeats, then starts a heartbeat for each member in the current config. 734 * Called while holding replCoord _mutex. 735 */ 736 void _restartHeartbeats_inlock(); 737 738 /** 739 * Asynchronously sends a heartbeat to "target". "targetIndex" is the index 740 * into the replica set config members array that corresponds to the "target", or -1 if 741 * we don't have a valid replica set config. 742 * 743 * Scheduled by _scheduleHeartbeatToTarget_inlock. 744 */ 745 void _doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, 746 const HostAndPort& target, 747 int targetIndex); 748 749 750 MemberState _getMemberState_inlock() const; 751 752 /** 753 * Starts loading the replication configuration from local storage, and if it is valid, 754 * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set 755 * config (sets _rsConfig and _thisMembersConfigIndex). 756 * Returns true if it finishes loading the local config, which most likely means there 757 * was no local config at all or it was invalid in some way, and false if there was a valid 758 * config detected but more work is needed to set it as the local config (which will be 759 * handled by the callback to _finishLoadLocalConfig). 760 */ 761 bool _startLoadLocalConfig(OperationContext* opCtx); 762 763 /** 764 * Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState 765 * to kConfigSteady, so that we can begin processing heartbeats and reconfigs. 766 */ 767 void _finishLoadLocalConfig(const executor::TaskExecutor::CallbackArgs& cbData, 768 const ReplSetConfig& localConfig, 769 const StatusWith<OpTime>& lastOpTimeStatus, 770 const StatusWith<LastVote>& lastVoteStatus); 771 772 /** 773 * Start replicating data, and does an initial sync if needed first. 774 */ 775 void _startDataReplication(OperationContext* opCtx, 776 stdx::function<void()> startCompleted = nullptr); 777 778 /** 779 * Stops replicating data by stopping the applier, fetcher and such. 780 */ 781 void _stopDataReplication(OperationContext* opCtx); 782 783 /** 784 * Finishes the work of processReplSetInitiate() in the event of a successful quorum check. 785 */ 786 void _finishReplSetInitiate(OperationContext* opCtx, 787 const ReplSetConfig& newConfig, 788 int myIndex); 789 790 /** 791 * Finishes the work of processReplSetReconfig, in the event of 792 * a successful quorum check. 793 */ 794 void _finishReplSetReconfig(const executor::TaskExecutor::CallbackArgs& cbData, 795 const ReplSetConfig& newConfig, 796 bool isForceReconfig, 797 int myIndex, 798 const executor::TaskExecutor::EventHandle& finishedEvent); 799 800 /** 801 * Changes _rsConfigState to newState, and notify any waiters. 802 */ 803 void _setConfigState_inlock(ConfigState newState); 804 805 /** 806 * Updates the cached value, _memberState, to match _topCoord's reported 807 * member state, from getMemberState(). 808 * 809 * Returns an enum indicating what action to take after releasing _mutex, if any. 810 * Call performPostMemberStateUpdateAction on the return value after releasing 811 * _mutex. 812 * 813 * Note: opCtx may be null as currently not all paths thread an OperationContext all the way 814 * down, but it must be non-null for any calls that change _canAcceptNonLocalWrites. 815 */ 816 PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock( 817 OperationContext* opCtx); 818 819 /** 820 * Performs a post member-state update action. Do not call while holding _mutex. "action" must 821 * not be kActionWinElection, use _postWonElectionUpdateMemberState_inlock instead. 822 */ 823 void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action); 824 825 /** 826 * Update state after winning an election. 827 */ 828 void _postWonElectionUpdateMemberState_inlock(); 829 830 /** 831 * Helper to select appropriate sync source after transitioning from a follower state. 832 */ 833 void _onFollowerModeStateChange(); 834 835 /** 836 * Begins an attempt to elect this node. 837 * Called after an incoming heartbeat changes this node's view of the set such that it 838 * believes it can be elected PRIMARY. 839 * For proper concurrency, start methods must be called while holding _mutex. 840 * 841 * For old style elections the election path is: 842 * _startElectSelf_inlock() 843 * _onFreshnessCheckComplete() 844 * _onElectCmdRunnerComplete() 845 * For V1 (raft) style elections the election path is: 846 * _startElectSelfV1() or _startElectSelfV1_inlock() 847 * _processDryRunResult() (may skip) 848 * _startRealElection_inlock() 849 * _writeLastVoteForMyElection() 850 * _startVoteRequester_inlock() 851 * _onVoteRequestComplete() 852 */ 853 void _startElectSelf_inlock(); 854 void _startElectSelfV1_inlock(TopologyCoordinator::StartElectionReason reason); 855 void _startElectSelfV1(TopologyCoordinator::StartElectionReason reason); 856 857 /** 858 * Callback called when the FreshnessChecker has completed; checks the results and 859 * decides whether to continue election proceedings. 860 **/ 861 void _onFreshnessCheckComplete(); 862 863 /** 864 * Callback called when the ElectCmdRunner has completed; checks the results and 865 * decides whether to complete the election and change state to primary. 866 **/ 867 void _onElectCmdRunnerComplete(); 868 869 /** 870 * Callback called when the dryRun VoteRequester has completed; checks the results and 871 * decides whether to conduct a proper election. 872 * "originalTerm" was the term during which the dry run began, if the term has since 873 * changed, do not run for election. 874 */ 875 void _processDryRunResult(long long originalTerm); 876 877 /** 878 * Begins executing a real election. This is called either a successful dry run, or when the 879 * dry run was skipped (which may be specified for a ReplSetStepUp). 880 */ 881 void _startRealElection_inlock(long long originalTerm); 882 883 /** 884 * Writes the last vote in persistent storage after completing dry run successfully. 885 * This job will be scheduled to run in DB worker threads. 886 */ 887 void _writeLastVoteForMyElection(LastVote lastVote, 888 const executor::TaskExecutor::CallbackArgs& cbData); 889 890 /** 891 * Starts VoteRequester to run the real election when last vote write has completed. 892 */ 893 void _startVoteRequester_inlock(long long newTerm); 894 895 /** 896 * Callback called when the VoteRequester has completed; checks the results and 897 * decides whether to change state to primary and alert other nodes of our primary-ness. 898 * "originalTerm" was the term during which the election began, if the term has since 899 * changed, do not step up as primary. 900 */ 901 void _onVoteRequestComplete(long long originalTerm); 902 903 /** 904 * Callback called after a random delay, to prevent repeated election ties. 905 */ 906 void _recoverFromElectionTie(const executor::TaskExecutor::CallbackArgs& cbData); 907 908 /** 909 * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply 910 * ignored and no error is thrown. 911 * 912 * Must be scheduled as a callback. 913 */ 914 void _unblacklistSyncSource(const executor::TaskExecutor::CallbackArgs& cbData, 915 const HostAndPort& host); 916 917 /** 918 * Schedules a request that the given host step down; logs any errors. 919 */ 920 void _requestRemotePrimaryStepdown(const HostAndPort& target); 921 922 /** 923 * Schedules stepdown to run with the global exclusive lock. 924 */ 925 executor::TaskExecutor::EventHandle _stepDownStart(); 926 927 /** 928 * Completes a step-down of the current node. Must be run with a global 929 * shared or global exclusive lock. 930 * Signals 'finishedEvent' on successful completion. 931 */ 932 void _stepDownFinish(const executor::TaskExecutor::CallbackArgs& cbData, 933 const executor::TaskExecutor::EventHandle& finishedEvent); 934 935 /** 936 * Schedules a replica set config change. 937 */ 938 void _scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig); 939 940 /** 941 * Method to write a configuration transmitted via heartbeat message to stable storage. 942 */ 943 void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd, 944 const ReplSetConfig& newConfig); 945 946 /** 947 * Conclusion actions of a heartbeat-triggered reconfiguration. 948 */ 949 void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData, 950 const ReplSetConfig& newConfig, 951 StatusWith<int> myIndex); 952 953 /** 954 * Utility method that schedules or performs actions specified by a HeartbeatResponseAction 955 * returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given 956 * value of "responseStatus". 957 * 958 * Requires "lock" to own _mutex, and returns the same unique_lock. 959 */ 960 stdx::unique_lock<stdx::mutex> _handleHeartbeatResponseAction_inlock( 961 const HeartbeatResponseAction& action, 962 const StatusWith<ReplSetHeartbeatResponse>& responseStatus, 963 stdx::unique_lock<stdx::mutex> lock); 964 965 /** 966 * Updates the last committed OpTime to be min(committedOpTime, lastApplied) if it is more 967 * recent than the current last committed OpTime. 968 */ 969 void _advanceCommitPoint_inlock(const OpTime& committedOpTime); 970 971 /** 972 * Scan the memberData and determine the highest last applied or last 973 * durable optime present on a majority of servers; set _lastCommittedOpTime to this 974 * new entry. Wake any threads waiting for replication that now have their 975 * write concern satisfied. 976 * 977 * Whether the last applied or last durable op time is used depends on whether 978 * the config getWriteConcernMajorityShouldJournal is set. 979 */ 980 void _updateLastCommittedOpTime_inlock(); 981 982 /** 983 * Callback that attempts to set the current term in topology coordinator and 984 * relinquishes primary if the term actually changes and we are primary. 985 * *updateTermResult will be the result of the update term attempt. 986 * Returns the finish event if it does not finish in this function, for example, 987 * due to stepdown, otherwise the returned EventHandle is invalid. 988 */ 989 EventHandle _updateTerm_inlock( 990 long long term, TopologyCoordinator::UpdateTermResult* updateTermResult = nullptr); 991 992 /** 993 * Callback that processes the ReplSetMetadata returned from a command run against another 994 * replica set member and so long as the config version in the metadata matches the replica set 995 * config version this node currently has, updates the current term. 996 * 997 * This does NOT update this node's notion of the commit point. 998 * 999 * Returns the finish event which is invalid if the process has already finished. 1000 */ 1001 EventHandle _processReplSetMetadata_inlock(const rpc::ReplSetMetadata& replMetadata); 1002 1003 /** 1004 * Prepares a metadata object for ReplSetMetadata. 1005 */ 1006 void _prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient, 1007 BSONObjBuilder* builder) const; 1008 1009 /** 1010 * Prepares a metadata object for OplogQueryMetadata. 1011 */ 1012 void _prepareOplogQueryMetadata_inlock(int rbid, BSONObjBuilder* builder) const; 1013 1014 /** 1015 * Blesses a snapshot to be used for new committed reads. 1016 */ 1017 void _updateCommittedSnapshot_inlock(const OpTime& newCommittedSnapshot); 1018 1019 /** 1020 * A helper method that returns the current stable optime based on the current commit point and 1021 * set of stable optime candidates. 1022 */ 1023 boost::optional<OpTime> _getStableOpTime_inlock(); 1024 1025 /** 1026 * Calculates the 'stable' replication optime given a set of optime candidates and the 1027 * current commit point. The stable optime is the greatest optime in 'candidates' that is 1028 * also less than or equal to 'commitPoint'. 1029 */ 1030 boost::optional<OpTime> _calculateStableOpTime_inlock(const std::set<OpTime>& candidates, 1031 const OpTime& commitPoint); 1032 1033 /** 1034 * Removes any optimes from the optime set 'candidates' that are less than 1035 * 'stableOpTime'. 1036 */ 1037 void _cleanupStableOpTimeCandidates(std::set<OpTime>* candidates, OpTime stableOpTime); 1038 1039 /** 1040 * Calculates and sets the value of the 'stable' replication optime for the storage engine. 1041 * See ReplicationCoordinatorImpl::_calculateStableOpTime for a definition of 'stable', in 1042 * this context. 1043 */ 1044 void _setStableTimestampForStorage_inlock(); 1045 1046 /** 1047 * Drops all snapshots and clears the "committed" snapshot. 1048 */ 1049 void _dropAllSnapshots_inlock(); 1050 1051 /** 1052 * Bottom half of _scheduleNextLivenessUpdate. 1053 * Must be called with _mutex held. 1054 */ 1055 void _scheduleNextLivenessUpdate_inlock(); 1056 1057 /** 1058 * Callback which marks downed nodes as down, triggers a stepdown if a majority of nodes are no 1059 * longer visible, and reschedules itself. 1060 */ 1061 void _handleLivenessTimeout(const executor::TaskExecutor::CallbackArgs& cbData); 1062 1063 /** 1064 * If "updatedMemberId" is the current _earliestMemberId, cancels the current 1065 * _handleLivenessTimeout callback and calls _scheduleNextLivenessUpdate to schedule a new one. 1066 * Returns immediately otherwise. 1067 */ 1068 void _cancelAndRescheduleLivenessUpdate_inlock(int updatedMemberId); 1069 1070 /** 1071 * Cancels all outstanding _priorityTakeover callbacks. 1072 */ 1073 void _cancelPriorityTakeover_inlock(); 1074 1075 /** 1076 * Cancels all outstanding _catchupTakeover callbacks. 1077 */ 1078 void _cancelCatchupTakeover_inlock(); 1079 1080 /** 1081 * Cancels the current _handleElectionTimeout callback and reschedules a new callback. 1082 * Returns immediately otherwise. 1083 */ 1084 void _cancelAndRescheduleElectionTimeout_inlock(); 1085 1086 /** 1087 * Callback which starts an election if this node is electable and using protocolVersion 1. 1088 */ 1089 void _startElectSelfIfEligibleV1(TopologyCoordinator::StartElectionReason reason); 1090 1091 /** 1092 * Resets the term of last vote to 0 to prevent any node from voting for term 0. 1093 */ 1094 void _resetElectionInfoOnProtocolVersionUpgrade(OperationContext* opCtx, 1095 const ReplSetConfig& oldConfig, 1096 const ReplSetConfig& newConfig); 1097 1098 /** 1099 * Schedules work to be run no sooner than 'when' and returns handle to callback. 1100 * If work cannot be scheduled due to shutdown, returns empty handle. 1101 * All other non-shutdown scheduling failures will abort the process. 1102 * Does not run 'work' if callback is canceled. 1103 */ 1104 CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work); 1105 1106 /** 1107 * Creates an event. 1108 * Returns invalid event handle if the executor is shutting down. 1109 * Otherwise aborts on non-shutdown error. 1110 */ 1111 EventHandle _makeEvent(); 1112 1113 /** 1114 * Wrap a function into executor callback. 1115 * If the callback is cancelled, the given function won't run. 1116 */ 1117 executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function<void()>& work); 1118 1119 /** 1120 * Finish catch-up mode and start drain mode. 1121 */ 1122 void _enterDrainMode_inlock(); 1123 1124 /** 1125 * Waits for the config state to leave kConfigStartingUp, which indicates that start() has 1126 * finished. 1127 */ 1128 void _waitForStartUpComplete(); 1129 1130 /** 1131 * Cancels the running election, if any, and returns an event that will be signaled when the 1132 * canceled election completes. If there is no running election, returns an invalid event 1133 * handle. 1134 */ 1135 executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inlock(); 1136 1137 /** 1138 * Waits until the optime of the current node is at least the 'opTime'. 1139 */ 1140 Status _waitUntilOpTime(OperationContext* opCtx, 1141 bool isMajorityReadConcern, 1142 OpTime opTime, 1143 boost::optional<Date_t> deadline = boost::none); 1144 1145 /** 1146 * Waits until the optime of the current node is at least the opTime specified in 'readConcern'. 1147 * Supports local and majority readConcern. 1148 */ 1149 // TODO: remove when SERVER-29729 is done 1150 Status _waitUntilOpTimeForReadDeprecated(OperationContext* opCtx, 1151 const ReadConcernArgs& readConcern); 1152 1153 /** 1154 * Waits until the deadline or until the optime of the current node is at least the clusterTime 1155 * specified in 'readConcern'. Supports local and majority readConcern. 1156 * If maxTimeMS and deadline are both specified, it waits for min(maxTimeMS, deadline). 1157 */ 1158 Status _waitUntilClusterTimeForRead(OperationContext* opCtx, 1159 const ReadConcernArgs& readConcern, 1160 boost::optional<Date_t> deadline); 1161 1162 /** 1163 * Returns a pseudorandom number no less than 0 and less than limit (which must be positive). 1164 */ 1165 int64_t _nextRandomInt64_inlock(int64_t limit); 1166 1167 // 1168 // All member variables are labeled with one of the following codes indicating the 1169 // synchronization rules for accessing them. 1170 // 1171 // (R) Read-only in concurrent operation; no synchronization required. 1172 // (S) Self-synchronizing; access in any way from any context. 1173 // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing; 1174 // Access in any context. 1175 // (M) Reads and writes guarded by _mutex 1176 // (GM) Readable under any global intent lock or _mutex. Must hold both the global lock in 1177 // exclusive mode (MODE_X) and hold _mutex to write. 1178 // (I) Independently synchronized, see member variable comment. 1179 1180 // Protects member data of this ReplicationCoordinator. 1181 mutable stdx::mutex _mutex; // (S) 1182 1183 // Handles to actively queued heartbeats. 1184 HeartbeatHandles _heartbeatHandles; // (M) 1185 1186 // When this node does not know itself to be a member of a config, it adds 1187 // every host that sends it a heartbeat request to this set, and also starts 1188 // sending heartbeat requests to that host. This set is cleared whenever 1189 // a node discovers that it is a member of a config. 1190 unordered_set<HostAndPort> _seedList; // (M) 1191 1192 // Back pointer to the ServiceContext that has started the instance. 1193 ServiceContext* const _service; // (S) 1194 1195 // Parsed command line arguments related to replication. 1196 const ReplSettings _settings; // (R) 1197 1198 // Mode of replication specified by _settings. 1199 const Mode _replMode; // (R) 1200 1201 // Pointer to the TopologyCoordinator owned by this ReplicationCoordinator. 1202 std::unique_ptr<TopologyCoordinator> _topCoord; // (M) 1203 1204 // Executor that drives the topology coordinator. 1205 std::unique_ptr<executor::TaskExecutor> _replExecutor; // (S) 1206 1207 // Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator. 1208 std::unique_ptr<ReplicationCoordinatorExternalState> _externalState; // (PS) 1209 1210 // Our RID, used to identify us to our sync source when sending replication progress 1211 // updates upstream. Set once in startReplication() and then never modified again. 1212 OID _myRID; // (M) 1213 1214 // list of information about clients waiting on replication. Does *not* own the WaiterInfos. 1215 WaiterList _replicationWaiterList; // (M) 1216 1217 // list of information about clients waiting for a particular opTime. 1218 // Does *not* own the WaiterInfos. 1219 WaiterList _opTimeWaiterList; // (M) 1220 1221 // Set to true when we are in the process of shutting down replication. 1222 bool _inShutdown; // (M) 1223 1224 // Election ID of the last election that resulted in this node becoming primary. 1225 OID _electionId; // (M) 1226 1227 // Used to signal threads waiting for changes to _memberState. 1228 stdx::condition_variable _memberStateChange; // (M) 1229 1230 // Current ReplicaSet state. 1231 MemberState _memberState; // (M) 1232 1233 // Used to signal threads waiting for changes to _memberState. 1234 stdx::condition_variable _drainFinishedCond; // (M) 1235 1236 ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M) 1237 1238 // Used to signal threads waiting for changes to _rsConfigState. 1239 stdx::condition_variable _rsConfigStateChange; // (M) 1240 1241 // Represents the configuration state of the coordinator, which controls how and when 1242 // _rsConfig may change. See the state transition diagram in the type definition of 1243 // ConfigState for details. 1244 ConfigState _rsConfigState; // (M) 1245 1246 // The current ReplicaSet configuration object, including the information about tag groups 1247 // that is used to satisfy write concern requests with named gle modes. 1248 ReplSetConfig _rsConfig; // (M) 1249 1250 // This member's index position in the current config. 1251 int _selfIndex; // (M) 1252 1253 // Condition to signal when new heartbeat data comes in. 1254 stdx::condition_variable _stepDownWaiters; // (M) 1255 1256 // State for conducting an election of this node. 1257 // the presence of a non-null _freshnessChecker pointer indicates that an election is 1258 // currently in progress. When using the V1 protocol, a non-null _voteRequester pointer 1259 // indicates this instead. 1260 // Only one election is allowed at a time. 1261 std::unique_ptr<FreshnessChecker> _freshnessChecker; // (M) 1262 1263 std::unique_ptr<ElectCmdRunner> _electCmdRunner; // (M) 1264 1265 std::unique_ptr<VoteRequester> _voteRequester; // (M) 1266 1267 // Event that the election code will signal when the in-progress election completes. 1268 // Unspecified value when _freshnessChecker is NULL. 1269 executor::TaskExecutor::EventHandle _electionFinishedEvent; // (M) 1270 1271 // Event that the election code will signal when the in-progress election dry run completes, 1272 // which includes writing the last vote and scheduling the real election. 1273 executor::TaskExecutor::EventHandle _electionDryRunFinishedEvent; // (M) 1274 1275 // Whether we slept last time we attempted an election but possibly tied with other nodes. 1276 bool _sleptLastElection; // (M) 1277 1278 // Flag that indicates whether writes to databases other than "local" are allowed. Used to 1279 // answer canAcceptWritesForDatabase() and canAcceptWritesFor() questions. 1280 // Always true for standalone nodes and masters in master-slave relationships. 1281 AtomicWord<bool> _canAcceptNonLocalWrites; // (GM) 1282 1283 // Flag that indicates whether reads from databases other than "local" are allowed. Unlike 1284 // _canAcceptNonLocalWrites, above, this question is about admission control on secondaries, 1285 // and we do not require that its observers be strongly synchronized. Accidentally 1286 // providing the prior value for a limited period of time is acceptable. Also unlike 1287 // _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries. 1288 AtomicUInt32 _canServeNonLocalReads; // (S) 1289 1290 // ReplicationProcess used to hold information related to the replication and application of 1291 // operations from the sync source. 1292 ReplicationProcess* const _replicationProcess; // (PS) 1293 1294 // Storage interface used by initial syncer. 1295 StorageInterface* _storage; // (PS) 1296 // InitialSyncer used for initial sync. 1297 std::shared_ptr<InitialSyncer> 1298 _initialSyncer; // (I) pointer set under mutex, copied by callers. 1299 1300 // Hands out the next snapshot name. 1301 AtomicUInt64 _snapshotNameGenerator; // (S) 1302 1303 // The OpTimes and SnapshotNames for all snapshots newer than the current commit point, kept in 1304 // sorted order. Any time this is changed, you must also update _uncommitedSnapshotsSize. 1305 std::deque<OpTime> _uncommittedSnapshots; // (M) 1306 1307 // A cache of the size of _uncommittedSnaphots that can be read without any locking. 1308 // May only be written to while holding _mutex. 1309 AtomicUInt64 _uncommittedSnapshotsSize; // (I) 1310 1311 // The non-null OpTime and SnapshotName of the current snapshot used for committed reads, if 1312 // there is one. 1313 // When engaged, this must be <= _lastCommittedOpTime and < _uncommittedSnapshots.front(). 1314 boost::optional<OpTime> _currentCommittedSnapshot; // (M) 1315 1316 // A set of optimes that are used for computing the replication system's current 'stable' 1317 // optime. Every time a node's applied optime is updated, it will be added to this set. 1318 // Optimes that are older than the current stable optime should get removed from this set. 1319 // This set should also be cleared if a rollback occurs. 1320 std::set<OpTime> _stableOpTimeCandidates; // (M) 1321 1322 // Used to signal threads that are waiting for new committed snapshots. 1323 stdx::condition_variable _currentCommittedSnapshotCond; // (M) 1324 1325 // Callback Handle used to cancel a scheduled LivenessTimeout callback. 1326 executor::TaskExecutor::CallbackHandle _handleLivenessTimeoutCbh; // (M) 1327 1328 // Callback Handle used to cancel a scheduled ElectionTimeout callback. 1329 executor::TaskExecutor::CallbackHandle _handleElectionTimeoutCbh; // (M) 1330 1331 // Election timeout callback will not run before this time. 1332 // If this date is Date_t(), the callback is either unscheduled or canceled. 1333 // Used for testing only. 1334 Date_t _handleElectionTimeoutWhen; // (M) 1335 1336 // Callback Handle used to cancel a scheduled PriorityTakeover callback. 1337 executor::TaskExecutor::CallbackHandle _priorityTakeoverCbh; // (M) 1338 1339 // Priority takeover callback will not run before this time. 1340 // If this date is Date_t(), the callback is either unscheduled or canceled. 1341 // Used for testing only. 1342 Date_t _priorityTakeoverWhen; // (M) 1343 1344 // Callback Handle used to cancel a scheduled CatchupTakeover callback. 1345 executor::TaskExecutor::CallbackHandle _catchupTakeoverCbh; // (M) 1346 1347 // Catchup takeover callback will not run before this time. 1348 // If this date is Date_t(), the callback is either unscheduled or canceled. 1349 // Used for testing only. 1350 Date_t _catchupTakeoverWhen; // (M) 1351 1352 // Callback handle used by _waitForStartUpComplete() to block until configuration 1353 // is loaded and external state threads have been started (unless this node is an arbiter). 1354 CallbackHandle _finishLoadLocalConfigCbh; // (M) 1355 1356 // The id of the earliest member, for which the handleLivenessTimeout callback has been 1357 // scheduled. We need this so that we don't needlessly cancel and reschedule the callback on 1358 // every liveness update. 1359 int _earliestMemberId = -1; // (M) 1360 1361 // Cached copy of the current config protocol version. 1362 AtomicInt64 _protVersion{1}; // (S) 1363 1364 // Source of random numbers used in setting election timeouts, etc. 1365 PseudoRandom _random; // (M) 1366 1367 // This setting affects the Applier prefetcher behavior. 1368 mutable stdx::mutex _indexPrefetchMutex; 1369 ReplSettings::IndexPrefetchConfig _indexPrefetchConfig = 1370 ReplSettings::IndexPrefetchConfig::PREFETCH_ALL; // (I) 1371 1372 // The catchup state including all catchup logic. The presence of a non-null pointer indicates 1373 // that the node is currently in catchup mode. 1374 std::unique_ptr<CatchupState> _catchupState; // (X) 1375 1376 // Atomic-synchronized copy of Topology Coordinator's _term, for use by the public getTerm() 1377 // function. 1378 // This variable must be written immediately after _term, and thus its value can lag. 1379 // Reading this value does not require the replication coordinator mutex to be locked. 1380 AtomicInt64 _termShadow; // (S) 1381 1382 // When we decide to step down due to hearing about a higher term, we remember the term we heard 1383 // here so we can update our term to match as part of finishing stepdown. 1384 boost::optional<long long> _pendingTermUpdateDuringStepDown; // (M) 1385 1386 // Whether data replication is active. 1387 bool _startedSteadyStateReplication = false; // (M) 1388 1389 // If we're in terminal shutdown. If true, we'll refuse to vote in elections. 1390 bool _inTerminalShutdown = false; // (M) 1391 }; 1392 1393 } // namespace repl 1394 } // namespace mongo 1395