1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <memory>
34 #include <utility>
35 #include <vector>
36 
37 #include "mongo/base/status.h"
38 #include "mongo/bson/timestamp.h"
39 #include "mongo/db/concurrency/d_concurrency.h"
40 #include "mongo/db/repl/initial_syncer.h"
41 #include "mongo/db/repl/member_state.h"
42 #include "mongo/db/repl/optime.h"
43 #include "mongo/db/repl/repl_set_config.h"
44 #include "mongo/db/repl/replication_coordinator.h"
45 #include "mongo/db/repl/replication_coordinator_external_state.h"
46 #include "mongo/db/repl/sync_source_resolver.h"
47 #include "mongo/db/repl/topology_coordinator.h"
48 #include "mongo/db/repl/update_position_args.h"
49 #include "mongo/executor/task_executor.h"
50 #include "mongo/platform/atomic_word.h"
51 #include "mongo/platform/random.h"
52 #include "mongo/platform/unordered_map.h"
53 #include "mongo/platform/unordered_set.h"
54 #include "mongo/stdx/condition_variable.h"
55 #include "mongo/stdx/mutex.h"
56 #include "mongo/util/concurrency/with_lock.h"
57 #include "mongo/util/net/hostandport.h"
58 
59 namespace mongo {
60 
61 class Timer;
62 template <typename T>
63 class StatusWith;
64 
65 namespace executor {
66 struct ConnectionPoolStats;
67 }  // namespace executor
68 
69 namespace rpc {
70 class OplogQueryMetadata;
71 class ReplSetMetadata;
72 }  // namespace rpc
73 
74 namespace repl {
75 
76 class ElectCmdRunner;
77 class FreshnessChecker;
78 class HandshakeArgs;
79 class HeartbeatResponseAction;
80 class LastVote;
81 class OplogReader;
82 class ReplicationProcess;
83 class ReplSetRequestVotesArgs;
84 class ReplSetConfig;
85 class SyncSourceFeedback;
86 class StorageInterface;
87 class TopologyCoordinator;
88 class VoteRequester;
89 
90 class ReplicationCoordinatorImpl : public ReplicationCoordinator {
91     MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl);
92 
93 public:
94     ReplicationCoordinatorImpl(ServiceContext* serviceContext,
95                                const ReplSettings& settings,
96                                std::unique_ptr<ReplicationCoordinatorExternalState> externalState,
97                                std::unique_ptr<executor::TaskExecutor> executor,
98                                std::unique_ptr<TopologyCoordinator> topoCoord,
99                                ReplicationProcess* replicationProcess,
100                                StorageInterface* storage,
101                                int64_t prngSeed);
102 
103     virtual ~ReplicationCoordinatorImpl();
104 
105     // ================== Members of public ReplicationCoordinator API ===================
106 
107     virtual void startup(OperationContext* opCtx) override;
108 
109     virtual void enterTerminalShutdown() override;
110 
111     virtual void shutdown(OperationContext* opCtx) override;
112 
113     virtual const ReplSettings& getSettings() const override;
114 
115     virtual Mode getReplicationMode() const override;
116 
117     virtual MemberState getMemberState() const override;
118 
119     virtual Status waitForMemberState(MemberState expectedState, Milliseconds timeout) override;
120 
121     virtual bool isInPrimaryOrSecondaryState() const override;
122 
123     virtual Seconds getSlaveDelaySecs() const override;
124 
125     virtual void clearSyncSourceBlacklist() override;
126 
127     virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
128         OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern);
129 
130     virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient(
131         OperationContext* opCtx, const WriteConcernOptions& writeConcern);
132 
133     virtual Status stepDown(OperationContext* opCtx,
134                             bool force,
135                             const Milliseconds& waitTime,
136                             const Milliseconds& stepdownTime);
137 
138     virtual bool isMasterForReportingPurposes();
139 
140     virtual bool canAcceptWritesForDatabase(OperationContext* opCtx, StringData dbName);
141     virtual bool canAcceptWritesForDatabase_UNSAFE(OperationContext* opCtx, StringData dbName);
142 
143     bool canAcceptWritesFor(OperationContext* opCtx, const NamespaceString& ns) override;
144     bool canAcceptWritesFor_UNSAFE(OperationContext* opCtx, const NamespaceString& ns) override;
145 
146     virtual Status checkIfWriteConcernCanBeSatisfied(const WriteConcernOptions& writeConcern) const;
147 
148     virtual Status checkCanServeReadsFor(OperationContext* opCtx,
149                                          const NamespaceString& ns,
150                                          bool slaveOk);
151     virtual Status checkCanServeReadsFor_UNSAFE(OperationContext* opCtx,
152                                                 const NamespaceString& ns,
153                                                 bool slaveOk);
154 
155     virtual bool shouldRelaxIndexConstraints(OperationContext* opCtx, const NamespaceString& ns);
156 
157     virtual Status setLastOptimeForSlave(const OID& rid, const Timestamp& ts);
158 
159     virtual void setMyLastAppliedOpTime(const OpTime& opTime);
160     virtual void setMyLastDurableOpTime(const OpTime& opTime);
161 
162     virtual void setMyLastAppliedOpTimeForward(const OpTime& opTime, DataConsistency consistency);
163     virtual void setMyLastDurableOpTimeForward(const OpTime& opTime);
164 
165     virtual void resetMyLastOpTimes();
166 
167     virtual void setMyHeartbeatMessage(const std::string& msg);
168 
169     virtual OpTime getMyLastAppliedOpTime() const override;
170     virtual OpTime getMyLastDurableOpTime() const override;
171 
172     virtual Status waitUntilOpTimeForReadUntil(OperationContext* opCtx,
173                                                const ReadConcernArgs& readConcern,
174                                                boost::optional<Date_t> deadline) override;
175 
176     virtual Status waitUntilOpTimeForRead(OperationContext* opCtx,
177                                           const ReadConcernArgs& readConcern) override;
178 
179     virtual OID getElectionId() override;
180 
181     virtual OID getMyRID() const override;
182 
183     virtual int getMyId() const override;
184 
185     virtual Status setFollowerMode(const MemberState& newState) override;
186 
187     virtual ApplierState getApplierState() override;
188 
189     virtual void signalDrainComplete(OperationContext* opCtx,
190                                      long long termWhenBufferIsEmpty) override;
191 
192     virtual Status waitForDrainFinish(Milliseconds timeout) override;
193 
194     virtual void signalUpstreamUpdater() override;
195 
196     virtual Status resyncData(OperationContext* opCtx, bool waitUntilCompleted) override;
197 
198     virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand() const override;
199 
200     virtual Status processReplSetGetStatus(BSONObjBuilder* result,
201                                            ReplSetGetStatusResponseStyle responseStyle) override;
202 
203     virtual void fillIsMasterForReplSet(IsMasterResponse* result,
204                                         const SplitHorizon::Parameters& horizonParams) override;
205 
206     virtual void appendSlaveInfoData(BSONObjBuilder* result) override;
207 
208     virtual ReplSetConfig getConfig() const override;
209 
210     virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
211 
212     virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
213 
214     virtual void advanceCommitPoint(const OpTime& committedOpTime) override;
215 
216     virtual void cancelAndRescheduleElectionTimeout() override;
217 
218     virtual Status setMaintenanceMode(bool activate) override;
219 
220     virtual bool getMaintenanceMode() override;
221 
222     virtual Status processReplSetSyncFrom(OperationContext* opCtx,
223                                           const HostAndPort& target,
224                                           BSONObjBuilder* resultObj) override;
225 
226     virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj) override;
227 
228     virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args,
229                                     ReplSetHeartbeatResponse* response) override;
230 
231     virtual Status processReplSetReconfig(OperationContext* opCtx,
232                                           const ReplSetReconfigArgs& args,
233                                           BSONObjBuilder* resultObj) override;
234 
235     virtual Status processReplSetInitiate(OperationContext* opCtx,
236                                           const BSONObj& configObj,
237                                           BSONObjBuilder* resultObj) override;
238 
239     virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
240                                        BSONObjBuilder* resultObj) override;
241 
242     virtual Status processReplSetElect(const ReplSetElectArgs& args,
243                                        BSONObjBuilder* response) override;
244 
245     virtual Status processReplSetUpdatePosition(const UpdatePositionArgs& updates,
246                                                 long long* configVersion) override;
247 
248     virtual Status processHandshake(OperationContext* opCtx,
249                                     const HandshakeArgs& handshake) override;
250 
251     virtual bool buildsIndexes() override;
252 
253     virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op,
254                                                        bool durablyWritten) override;
255 
256     virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override;
257 
258     virtual WriteConcernOptions getGetLastErrorDefault() override;
259 
260     virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override;
261 
262     virtual bool isReplEnabled() const override;
263 
264     virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) override;
265 
266     virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
267 
268     virtual void resetLastOpTimesFromOplog(OperationContext* opCtx,
269                                            DataConsistency consistency) override;
270 
271     virtual bool shouldChangeSyncSource(
272         const HostAndPort& currentSource,
273         const rpc::ReplSetMetadata& replMetadata,
274         boost::optional<rpc::OplogQueryMetadata> oqMetadata) override;
275 
276     virtual OpTime getLastCommittedOpTime() const override;
277 
278     virtual Status processReplSetRequestVotes(OperationContext* opCtx,
279                                               const ReplSetRequestVotesArgs& args,
280                                               ReplSetRequestVotesResponse* response) override;
281 
282     virtual void prepareReplMetadata(const BSONObj& metadataRequestObj,
283                                      const OpTime& lastOpTimeFromClient,
284                                      BSONObjBuilder* builder) const override;
285 
286     virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
287                                       ReplSetHeartbeatResponse* response) override;
288 
289     virtual bool isV1ElectionProtocol() const override;
290 
291     virtual bool getWriteConcernMajorityShouldJournal() override;
292 
293     virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override;
294 
295     virtual void dropAllSnapshots() override;
296     /**
297      * Get current term from topology coordinator
298      */
299     virtual long long getTerm() override;
300 
301     // Returns the ServiceContext where this instance runs.
getServiceContext()302     virtual ServiceContext* getServiceContext() override {
303         return _service;
304     }
305 
306     virtual Status updateTerm(OperationContext* opCtx, long long term) override;
307 
308     virtual Timestamp reserveSnapshotName(OperationContext* opCtx) override;
309 
310     virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
311 
312     virtual void waitUntilSnapshotCommitted(OperationContext* opCtx,
313                                             const Timestamp& untilSnapshot) override;
314 
315     virtual void appendDiagnosticBSON(BSONObjBuilder*) override;
316 
317     virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
318 
319     virtual size_t getNumUncommittedSnapshots() override;
320 
321     virtual WriteConcernOptions populateUnsetWriteConcernOptionsSyncMode(
322         WriteConcernOptions wc) override;
323 
324     virtual ReplSettings::IndexPrefetchConfig getIndexPrefetchConfig() const override;
325     virtual void setIndexPrefetchConfig(const ReplSettings::IndexPrefetchConfig cfg) override;
326 
327     virtual Status stepUpIfEligible(bool skipDryRun) override;
328 
329     virtual Status abortCatchupIfNeeded() override;
330 
331     // ================== Test support API ===================
332 
333     /**
334      * If called after startReplication(), blocks until all asynchronous
335      * activities associated with replication start-up complete.
336      */
337     void waitForStartUpComplete_forTest();
338 
339     /**
340      * Gets the replica set configuration in use by the node.
341      */
342     ReplSetConfig getReplicaSetConfig_forTest();
343 
344     /**
345      * Returns scheduled time of election timeout callback.
346      * Returns Date_t() if callback is not scheduled.
347      */
348     Date_t getElectionTimeout_forTest() const;
349 
350     /*
351      * Return a randomized offset amount that is scaled in proportion to the size of the
352      * _electionTimeoutPeriod.
353      */
354     Milliseconds getRandomizedElectionOffset_forTest();
355 
356     /**
357      * Returns the scheduled time of the priority takeover callback. If a priority
358      * takeover has not been scheduled, returns boost::none.
359      */
360     boost::optional<Date_t> getPriorityTakeover_forTest() const;
361 
362     /**
363      * Returns the scheduled time of the catchup takeover callback. If a catchup
364      * takeover has not been scheduled, returns boost::none.
365      */
366     boost::optional<Date_t> getCatchupTakeover_forTest() const;
367 
368     /**
369      * Returns the catchup takeover CallbackHandle.
370      */
371     executor::TaskExecutor::CallbackHandle getCatchupTakeoverCbh_forTest() const;
372 
373     /**
374      * Simple wrappers around _setLastOptime_inlock to make it easier to test.
375      */
376     Status setLastAppliedOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
377     Status setLastDurableOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
378 
379     /**
380      * Simple test wrappers that expose private methods.
381      */
382     boost::optional<OpTime> calculateStableOpTime_forTest(const std::set<OpTime>& candidates,
383                                                           const OpTime& commitPoint);
384     void cleanupStableOpTimeCandidates_forTest(std::set<OpTime>* candidates, OpTime stableOpTime);
385     std::set<OpTime> getStableOpTimeCandidates_forTest();
386     boost::optional<OpTime> getStableOpTime_forTest();
387 
388     /**
389      * Non-blocking version of updateTerm.
390      * Returns event handle that we can use to wait for the operation to complete.
391      * When the operation is complete (waitForEvent() returns), 'updateResult' will be set
392      * to a status telling if the term increased or a stepdown was triggered.
393      */
394     executor::TaskExecutor::EventHandle updateTerm_forTest(
395         long long term, TopologyCoordinator::UpdateTermResult* updateResult);
396 
397     /**
398      * If called after _startElectSelfV1(), blocks until all asynchronous
399      * activities associated with election complete.
400      */
401     void waitForElectionFinish_forTest();
402 
403     /**
404      * If called after _startElectSelfV1(), blocks until all asynchronous
405      * activities associated with election dry run complete, including writing
406      * last vote and scheduling the real election.
407      */
408     void waitForElectionDryRunFinish_forTest();
409 
410     /**
411      * Waits until a stepdown command has begun. Callers should ensure that the stepdown attempt
412      * won't fully complete before this method is called, or this method may never return.
413      */
414     void waitForStepDownAttempt_forTest();
415 
416 private:
417     using CallbackFn = executor::TaskExecutor::CallbackFn;
418 
419     using CallbackHandle = executor::TaskExecutor::CallbackHandle;
420 
421     using EventHandle = executor::TaskExecutor::EventHandle;
422 
423     using ScheduleFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
424         const executor::TaskExecutor::CallbackFn& work)>;
425 
426     class LoseElectionGuardV1;
427     class LoseElectionDryRunGuardV1;
428 
429     /**
430      * Configuration states for a replica set node.
431      *
432      * Transition diagram:
433      *
434      * PreStart ------------------> ReplicationDisabled
435      *    |
436      *    |
437      *    v
438      * StartingUp -------> Uninitialized <------> Initiating
439      *         \                     ^               |
440      *          -------              |               |
441      *                 |             |               |
442      *                 v             v               |
443      * Reconfig <---> Steady <----> HBReconfig       |
444      *                    ^                          /
445      *                    |                         /
446      *                     \                       /
447      *                      -----------------------
448      */
449     enum ConfigState {
450         kConfigPreStart,
451         kConfigStartingUp,
452         kConfigReplicationDisabled,
453         kConfigUninitialized,
454         kConfigSteady,
455         kConfigInitiating,
456         kConfigReconfiguring,
457         kConfigHBReconfiguring
458     };
459 
460     /**
461      * Type describing actions to take after a change to the MemberState _memberState.
462      */
463     enum PostMemberStateUpdateAction {
464         kActionNone,
465         kActionCloseAllConnections,  // Also indicates that we should clear sharding state.
466         kActionFollowerModeStateChange,
467         kActionWinElection,
468         kActionStartSingleNodeElection
469     };
470 
471     // Abstract struct that holds information about clients waiting for replication.
472     // Subclasses need to define how to notify them.
473     struct Waiter {
474         Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern);
475         virtual ~Waiter() = default;
476 
477         BSONObj toBSON() const;
478         std::string toString() const;
479         // It is invalid to call notify_inlock() unless holding ReplicationCoordinatorImpl::_mutex.
480         virtual void notify_inlock() = 0;
481 
482         const OpTime opTime;
483         const WriteConcernOptions* writeConcern = nullptr;
484     };
485 
486     // When ThreadWaiter gets notified, it will signal the conditional variable.
487     //
488     // This is used when a thread wants to block inline until the opTime is reached with the given
489     // writeConcern.
490     struct ThreadWaiter : public Waiter {
491         ThreadWaiter(OpTime _opTime,
492                      const WriteConcernOptions* _writeConcern,
493                      stdx::condition_variable* _condVar);
494         void notify_inlock() override;
495 
496         stdx::condition_variable* condVar = nullptr;
497     };
498 
499     // When the waiter is notified, finishCallback will be called while holding replCoord _mutex
500     // since WaiterLists are protected by _mutex.
501     //
502     // This is used when we want to run a callback when the opTime is reached.
503     struct CallbackWaiter : public Waiter {
504         using FinishFunc = stdx::function<void()>;
505 
506         CallbackWaiter(OpTime _opTime, FinishFunc _finishCallback);
507         void notify_inlock() override;
508 
509         // The callback that will be called when this waiter is notified.
510         FinishFunc finishCallback = nullptr;
511     };
512 
513     class WaiterGuard;
514 
515     class WaiterList {
516     public:
517         using WaiterType = Waiter*;
518 
519         // Adds waiter into the list.
520         void add_inlock(WaiterType waiter);
521         // Returns whether waiter is found and removed.
522         bool remove_inlock(WaiterType waiter);
523         // Signals and removes all waiters that satisfy the condition.
524         void signalAndRemoveIf_inlock(stdx::function<bool(WaiterType)> fun);
525         // Signals and removes all waiters from the list.
526         void signalAndRemoveAll_inlock();
527 
528     private:
529         std::vector<WaiterType> _list;
530     };
531 
532     typedef std::vector<executor::TaskExecutor::CallbackHandle> HeartbeatHandles;
533 
534     // The state and logic of primary catchup.
535     //
536     // When start() is called, CatchupState will schedule the timeout callback. When we get
537     // responses of the latest heartbeats from all nodes, the target time (opTime of _waiter) is
538     // set.
539     // The primary exits catchup mode when any of the following happens.
540     //   1) My last applied optime reaches the target optime, if we've received a heartbeat from all
541     //      nodes.
542     //   2) Catchup timeout expires.
543     //   3) Primary steps down.
544     //   4) The primary has to roll back to catch up.
545     //   5) The primary is too stale to catch up.
546     //
547     // On abort, the state resets the pointer to itself in ReplCoordImpl. In other words, the
548     // life cycle of the state object aligns with the conceptual state.
549     // In shutdown, the timeout callback will be canceled by the executor and the state is safe to
550     // destroy.
551     //
552     // Any function of the state must be called while holding _mutex.
553     class CatchupState {
554     public:
CatchupState(ReplicationCoordinatorImpl * repl)555         CatchupState(ReplicationCoordinatorImpl* repl) : _repl(repl) {}
556         // start() can only be called once.
557         void start_inlock();
558         // Reset the state itself to destruct the state.
559         void abort_inlock();
560         // Heartbeat calls this function to update the target optime.
561         void signalHeartbeatUpdate_inlock();
562 
563     private:
564         ReplicationCoordinatorImpl* _repl;  // Not owned.
565         // Callback handle used to cancel a scheduled catchup timeout callback.
566         executor::TaskExecutor::CallbackHandle _timeoutCbh;
567         // Handle to a Waiter that contains the current target optime to reach after which
568         // we can exit catchup mode.
569         std::unique_ptr<CallbackWaiter> _waiter;
570     };
571 
572     void _resetMyLastOpTimes_inlock();
573 
574     /**
575      * Returns the _writeConcernMajorityJournalDefault of our current _rsConfig.
576      */
577     bool getWriteConcernMajorityShouldJournal_inlock() const;
578 
579     /**
580      * Returns the OpTime of the current committed snapshot, if one exists.
581      */
582     OpTime _getCurrentCommittedSnapshotOpTime_inlock() const;
583 
584     /**
585      * Returns the OpTime of the current committed snapshot converted to LogicalTime.
586      */
587     LogicalTime _getCurrentCommittedLogicalTime_inlock() const;
588 
589     /**
590      *  Verifies that ReadConcernArgs match node's readConcern.
591      */
592     Status _validateReadConcern(OperationContext* opCtx, const ReadConcernArgs& readConcern);
593 
594     /**
595      * Helper to update our saved config, cancel any pending heartbeats, and kick off sending
596      * new heartbeats based on the new config.
597      *
598      * Returns an action to be performed after unlocking _mutex, via
599      * _performPostMemberStateUpdateAction.
600      */
601     PostMemberStateUpdateAction _setCurrentRSConfig_inlock(OperationContext* opCtx,
602                                                            const ReplSetConfig& newConfig,
603                                                            int myIndex);
604 
605     /**
606      * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication.
607      */
608     void _wakeReadyWaiters_inlock();
609 
610     /**
611      * Scheduled to cause the ReplicationCoordinator to reconsider any state that might
612      * need to change as a result of time passing - for instance becoming PRIMARY when a single
613      * node replica set member's stepDown period ends.
614      */
615     void _handleTimePassing(const executor::TaskExecutor::CallbackArgs& cbData);
616 
617     /**
618      * Chooses a candidate for election handoff and sends a ReplSetStepUp command to it.
619      */
620     void _performElectionHandoff();
621 
622     /**
623      * Helper method for _awaitReplication that takes an already locked unique_lock, but leaves
624      * operation timing to the caller.
625      */
626     Status _awaitReplication_inlock(stdx::unique_lock<stdx::mutex>* lock,
627                                     OperationContext* opCtx,
628                                     const OpTime& opTime,
629                                     Timestamp minSnapshot,
630                                     const WriteConcernOptions& writeConcern);
631 
632     /**
633      * Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable.
634      *
635      * If the writeConcern is 'majority', also waits for _currentCommittedSnapshot to be newer than
636      * minSnapshot.
637      */
638     bool _doneWaitingForReplication_inlock(const OpTime& opTime,
639                                            Timestamp minSnapshot,
640                                            const WriteConcernOptions& writeConcern);
641 
642     Status _checkIfWriteConcernCanBeSatisfied_inlock(const WriteConcernOptions& writeConcern) const;
643 
644     /**
645      * Wakes up threads in the process of handling a stepdown request based on whether the
646      * TopologyCoordinator now believes enough secondaries are caught up for the stepdown request to
647      * complete.
648      */
649     void _signalStepDownWaiterIfReady_inlock();
650 
651     bool _canAcceptWritesFor_inlock(const NamespaceString& ns);
652 
653     OID _getMyRID_inlock() const;
654 
655     int _getMyId_inlock() const;
656 
657     OpTime _getMyLastAppliedOpTime_inlock() const;
658     OpTime _getMyLastDurableOpTime_inlock() const;
659 
660     /**
661      * Helper method for updating our tracking of the last optime applied by a given node.
662      * This is only valid to call on replica sets.
663      * "configVersion" will be populated with our config version if it and the configVersion
664      * of "args" differ.
665      */
666     Status _setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args,
667                                  long long* configVersion);
668 
669     /**
670      * This function will report our position externally (like upstream) if necessary.
671      *
672      * Takes in a unique lock, that must already be locked, on _mutex.
673      *
674      * Lock will be released after this method finishes.
675      */
676     void _reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock);
677 
678     /**
679      * Helpers to set the last applied and durable OpTime.
680      */
681     void _setMyLastAppliedOpTime_inlock(const OpTime& opTime,
682                                         bool isRollbackAllowed,
683                                         DataConsistency consistency);
684     void _setMyLastDurableOpTime_inlock(const OpTime& opTime, bool isRollbackAllowed);
685 
686     /**
687      * Schedules a heartbeat to be sent to "target" at "when". "targetIndex" is the index
688      * into the replica set config members array that corresponds to the "target", or -1 if
689      * "target" is not in _rsConfig.
690      */
691     void _scheduleHeartbeatToTarget_inlock(const HostAndPort& target, int targetIndex, Date_t when);
692 
693     /**
694      * Processes each heartbeat response.
695      *
696      * Schedules additional heartbeats, triggers elections and step downs, etc.
697      */
698     void _handleHeartbeatResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
699                                   int targetIndex);
700 
701     void _trackHeartbeatHandle_inlock(
702         const StatusWith<executor::TaskExecutor::CallbackHandle>& handle);
703 
704     void _untrackHeartbeatHandle_inlock(const executor::TaskExecutor::CallbackHandle& handle);
705 
706     /*
707      * Return a randomized offset amount that is scaled in proportion to the size of the
708      * _electionTimeoutPeriod. Used to add randomization to an election timeout.
709      */
710     Milliseconds _getRandomizedElectionOffset_inlock();
711 
712     /**
713      * Helper for _handleHeartbeatResponse.
714      *
715      * Updates the lastDurableOpTime and lastAppliedOpTime associated with the member at
716      * "memberIndex" in our config.
717      */
718     void _updateOpTimesFromHeartbeat_inlock(int targetIndex,
719                                             const OpTime& durableOpTime,
720                                             const OpTime& appliedOpTime);
721 
722     /**
723      * Starts a heartbeat for each member in the current config.  Called while holding _mutex.
724      */
725     void _startHeartbeats_inlock();
726 
727     /**
728      * Cancels all heartbeats.  Called while holding replCoord _mutex.
729      */
730     void _cancelHeartbeats_inlock();
731 
732     /**
733      * Cancels all heartbeats, then starts a heartbeat for each member in the current config.
734      * Called while holding replCoord _mutex.
735      */
736     void _restartHeartbeats_inlock();
737 
738     /**
739      * Asynchronously sends a heartbeat to "target". "targetIndex" is the index
740      * into the replica set config members array that corresponds to the "target", or -1 if
741      * we don't have a valid replica set config.
742      *
743      * Scheduled by _scheduleHeartbeatToTarget_inlock.
744      */
745     void _doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData,
746                             const HostAndPort& target,
747                             int targetIndex);
748 
749 
750     MemberState _getMemberState_inlock() const;
751 
752     /**
753      * Starts loading the replication configuration from local storage, and if it is valid,
754      * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set
755      * config (sets _rsConfig and _thisMembersConfigIndex).
756      * Returns true if it finishes loading the local config, which most likely means there
757      * was no local config at all or it was invalid in some way, and false if there was a valid
758      * config detected but more work is needed to set it as the local config (which will be
759      * handled by the callback to _finishLoadLocalConfig).
760      */
761     bool _startLoadLocalConfig(OperationContext* opCtx);
762 
763     /**
764      * Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState
765      * to kConfigSteady, so that we can begin processing heartbeats and reconfigs.
766      */
767     void _finishLoadLocalConfig(const executor::TaskExecutor::CallbackArgs& cbData,
768                                 const ReplSetConfig& localConfig,
769                                 const StatusWith<OpTime>& lastOpTimeStatus,
770                                 const StatusWith<LastVote>& lastVoteStatus);
771 
772     /**
773      * Start replicating data, and does an initial sync if needed first.
774      */
775     void _startDataReplication(OperationContext* opCtx,
776                                stdx::function<void()> startCompleted = nullptr);
777 
778     /**
779      * Stops replicating data by stopping the applier, fetcher and such.
780      */
781     void _stopDataReplication(OperationContext* opCtx);
782 
783     /**
784      * Finishes the work of processReplSetInitiate() in the event of a successful quorum check.
785      */
786     void _finishReplSetInitiate(OperationContext* opCtx,
787                                 const ReplSetConfig& newConfig,
788                                 int myIndex);
789 
790     /**
791      * Finishes the work of processReplSetReconfig, in the event of
792      * a successful quorum check.
793      */
794     void _finishReplSetReconfig(const executor::TaskExecutor::CallbackArgs& cbData,
795                                 const ReplSetConfig& newConfig,
796                                 bool isForceReconfig,
797                                 int myIndex,
798                                 const executor::TaskExecutor::EventHandle& finishedEvent);
799 
800     /**
801      * Changes _rsConfigState to newState, and notify any waiters.
802      */
803     void _setConfigState_inlock(ConfigState newState);
804 
805     /**
806      * Updates the cached value, _memberState, to match _topCoord's reported
807      * member state, from getMemberState().
808      *
809      * Returns an enum indicating what action to take after releasing _mutex, if any.
810      * Call performPostMemberStateUpdateAction on the return value after releasing
811      * _mutex.
812      *
813      * Note: opCtx may be null as currently not all paths thread an OperationContext all the way
814      * down, but it must be non-null for any calls that change _canAcceptNonLocalWrites.
815      */
816     PostMemberStateUpdateAction _updateMemberStateFromTopologyCoordinator_inlock(
817         OperationContext* opCtx);
818 
819     /**
820      * Performs a post member-state update action.  Do not call while holding _mutex. "action" must
821      * not be kActionWinElection, use _postWonElectionUpdateMemberState_inlock instead.
822      */
823     void _performPostMemberStateUpdateAction(PostMemberStateUpdateAction action);
824 
825     /**
826      * Update state after winning an election.
827      */
828     void _postWonElectionUpdateMemberState_inlock();
829 
830     /**
831      * Helper to select appropriate sync source after transitioning from a follower state.
832      */
833     void _onFollowerModeStateChange();
834 
835     /**
836      * Begins an attempt to elect this node.
837      * Called after an incoming heartbeat changes this node's view of the set such that it
838      * believes it can be elected PRIMARY.
839      * For proper concurrency, start methods must be called while holding _mutex.
840      *
841      * For old style elections the election path is:
842      *      _startElectSelf_inlock()
843      *      _onFreshnessCheckComplete()
844      *      _onElectCmdRunnerComplete()
845      * For V1 (raft) style elections the election path is:
846      *      _startElectSelfV1() or _startElectSelfV1_inlock()
847      *      _processDryRunResult() (may skip)
848      *      _startRealElection_inlock()
849      *      _writeLastVoteForMyElection()
850      *      _startVoteRequester_inlock()
851      *      _onVoteRequestComplete()
852      */
853     void _startElectSelf_inlock();
854     void _startElectSelfV1_inlock(TopologyCoordinator::StartElectionReason reason);
855     void _startElectSelfV1(TopologyCoordinator::StartElectionReason reason);
856 
857     /**
858      * Callback called when the FreshnessChecker has completed; checks the results and
859      * decides whether to continue election proceedings.
860      **/
861     void _onFreshnessCheckComplete();
862 
863     /**
864      * Callback called when the ElectCmdRunner has completed; checks the results and
865      * decides whether to complete the election and change state to primary.
866      **/
867     void _onElectCmdRunnerComplete();
868 
869     /**
870      * Callback called when the dryRun VoteRequester has completed; checks the results and
871      * decides whether to conduct a proper election.
872      * "originalTerm" was the term during which the dry run began, if the term has since
873      * changed, do not run for election.
874      */
875     void _processDryRunResult(long long originalTerm);
876 
877     /**
878      * Begins executing a real election. This is called either a successful dry run, or when the
879      * dry run was skipped (which may be specified for a ReplSetStepUp).
880      */
881     void _startRealElection_inlock(long long originalTerm);
882 
883     /**
884      * Writes the last vote in persistent storage after completing dry run successfully.
885      * This job will be scheduled to run in DB worker threads.
886      */
887     void _writeLastVoteForMyElection(LastVote lastVote,
888                                      const executor::TaskExecutor::CallbackArgs& cbData);
889 
890     /**
891      * Starts VoteRequester to run the real election when last vote write has completed.
892      */
893     void _startVoteRequester_inlock(long long newTerm);
894 
895     /**
896      * Callback called when the VoteRequester has completed; checks the results and
897      * decides whether to change state to primary and alert other nodes of our primary-ness.
898      * "originalTerm" was the term during which the election began, if the term has since
899      * changed, do not step up as primary.
900      */
901     void _onVoteRequestComplete(long long originalTerm);
902 
903     /**
904      * Callback called after a random delay, to prevent repeated election ties.
905      */
906     void _recoverFromElectionTie(const executor::TaskExecutor::CallbackArgs& cbData);
907 
908     /**
909      * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply
910      * ignored and no error is thrown.
911      *
912      * Must be scheduled as a callback.
913      */
914     void _unblacklistSyncSource(const executor::TaskExecutor::CallbackArgs& cbData,
915                                 const HostAndPort& host);
916 
917     /**
918      * Schedules a request that the given host step down; logs any errors.
919      */
920     void _requestRemotePrimaryStepdown(const HostAndPort& target);
921 
922     /**
923      * Schedules stepdown to run with the global exclusive lock.
924      */
925     executor::TaskExecutor::EventHandle _stepDownStart();
926 
927     /**
928      * Completes a step-down of the current node.  Must be run with a global
929      * shared or global exclusive lock.
930      * Signals 'finishedEvent' on successful completion.
931      */
932     void _stepDownFinish(const executor::TaskExecutor::CallbackArgs& cbData,
933                          const executor::TaskExecutor::EventHandle& finishedEvent);
934 
935     /**
936      * Schedules a replica set config change.
937      */
938     void _scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig);
939 
940     /**
941      * Method to write a configuration transmitted via heartbeat message to stable storage.
942      */
943     void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd,
944                                  const ReplSetConfig& newConfig);
945 
946     /**
947      * Conclusion actions of a heartbeat-triggered reconfiguration.
948      */
949     void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData,
950                                   const ReplSetConfig& newConfig,
951                                   StatusWith<int> myIndex);
952 
953     /**
954      * Utility method that schedules or performs actions specified by a HeartbeatResponseAction
955      * returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
956      * value of "responseStatus".
957      *
958      * Requires "lock" to own _mutex, and returns the same unique_lock.
959      */
960     stdx::unique_lock<stdx::mutex> _handleHeartbeatResponseAction_inlock(
961         const HeartbeatResponseAction& action,
962         const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
963         stdx::unique_lock<stdx::mutex> lock);
964 
965     /**
966      * Updates the last committed OpTime to be min(committedOpTime, lastApplied) if it is more
967      * recent than the current last committed OpTime.
968      */
969     void _advanceCommitPoint_inlock(const OpTime& committedOpTime);
970 
971     /**
972      * Scan the memberData and determine the highest last applied or last
973      * durable optime present on a majority of servers; set _lastCommittedOpTime to this
974      * new entry.  Wake any threads waiting for replication that now have their
975      * write concern satisfied.
976      *
977      * Whether the last applied or last durable op time is used depends on whether
978      * the config getWriteConcernMajorityShouldJournal is set.
979      */
980     void _updateLastCommittedOpTime_inlock();
981 
982     /**
983      * Callback that attempts to set the current term in topology coordinator and
984      * relinquishes primary if the term actually changes and we are primary.
985      * *updateTermResult will be the result of the update term attempt.
986      * Returns the finish event if it does not finish in this function, for example,
987      * due to stepdown, otherwise the returned EventHandle is invalid.
988      */
989     EventHandle _updateTerm_inlock(
990         long long term, TopologyCoordinator::UpdateTermResult* updateTermResult = nullptr);
991 
992     /**
993      * Callback that processes the ReplSetMetadata returned from a command run against another
994      * replica set member and so long as the config version in the metadata matches the replica set
995      * config version this node currently has, updates the current term.
996      *
997      * This does NOT update this node's notion of the commit point.
998      *
999      * Returns the finish event which is invalid if the process has already finished.
1000      */
1001     EventHandle _processReplSetMetadata_inlock(const rpc::ReplSetMetadata& replMetadata);
1002 
1003     /**
1004      * Prepares a metadata object for ReplSetMetadata.
1005      */
1006     void _prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient,
1007                                         BSONObjBuilder* builder) const;
1008 
1009     /**
1010      * Prepares a metadata object for OplogQueryMetadata.
1011      */
1012     void _prepareOplogQueryMetadata_inlock(int rbid, BSONObjBuilder* builder) const;
1013 
1014     /**
1015      * Blesses a snapshot to be used for new committed reads.
1016      */
1017     void _updateCommittedSnapshot_inlock(const OpTime& newCommittedSnapshot);
1018 
1019     /**
1020      * A helper method that returns the current stable optime based on the current commit point and
1021      * set of stable optime candidates.
1022      */
1023     boost::optional<OpTime> _getStableOpTime_inlock();
1024 
1025     /**
1026      * Calculates the 'stable' replication optime given a set of optime candidates and the
1027      * current commit point. The stable optime is the greatest optime in 'candidates' that is
1028      * also less than or equal to 'commitPoint'.
1029      */
1030     boost::optional<OpTime> _calculateStableOpTime_inlock(const std::set<OpTime>& candidates,
1031                                                           const OpTime& commitPoint);
1032 
1033     /**
1034      * Removes any optimes from the optime set 'candidates' that are less than
1035      * 'stableOpTime'.
1036      */
1037     void _cleanupStableOpTimeCandidates(std::set<OpTime>* candidates, OpTime stableOpTime);
1038 
1039     /**
1040      * Calculates and sets the value of the 'stable' replication optime for the storage engine.
1041      * See ReplicationCoordinatorImpl::_calculateStableOpTime for a definition of 'stable', in
1042      * this context.
1043      */
1044     void _setStableTimestampForStorage_inlock();
1045 
1046     /**
1047      * Drops all snapshots and clears the "committed" snapshot.
1048      */
1049     void _dropAllSnapshots_inlock();
1050 
1051     /**
1052      * Bottom half of _scheduleNextLivenessUpdate.
1053      * Must be called with _mutex held.
1054      */
1055     void _scheduleNextLivenessUpdate_inlock();
1056 
1057     /**
1058      * Callback which marks downed nodes as down, triggers a stepdown if a majority of nodes are no
1059      * longer visible, and reschedules itself.
1060      */
1061     void _handleLivenessTimeout(const executor::TaskExecutor::CallbackArgs& cbData);
1062 
1063     /**
1064      * If "updatedMemberId" is the current _earliestMemberId, cancels the current
1065      * _handleLivenessTimeout callback and calls _scheduleNextLivenessUpdate to schedule a new one.
1066      * Returns immediately otherwise.
1067      */
1068     void _cancelAndRescheduleLivenessUpdate_inlock(int updatedMemberId);
1069 
1070     /**
1071      * Cancels all outstanding _priorityTakeover callbacks.
1072      */
1073     void _cancelPriorityTakeover_inlock();
1074 
1075     /**
1076      * Cancels all outstanding _catchupTakeover callbacks.
1077      */
1078     void _cancelCatchupTakeover_inlock();
1079 
1080     /**
1081      * Cancels the current _handleElectionTimeout callback and reschedules a new callback.
1082      * Returns immediately otherwise.
1083      */
1084     void _cancelAndRescheduleElectionTimeout_inlock();
1085 
1086     /**
1087      * Callback which starts an election if this node is electable and using protocolVersion 1.
1088      */
1089     void _startElectSelfIfEligibleV1(TopologyCoordinator::StartElectionReason reason);
1090 
1091     /**
1092      * Resets the term of last vote to 0 to prevent any node from voting for term 0.
1093      */
1094     void _resetElectionInfoOnProtocolVersionUpgrade(OperationContext* opCtx,
1095                                                     const ReplSetConfig& oldConfig,
1096                                                     const ReplSetConfig& newConfig);
1097 
1098     /**
1099      * Schedules work to be run no sooner than 'when' and returns handle to callback.
1100      * If work cannot be scheduled due to shutdown, returns empty handle.
1101      * All other non-shutdown scheduling failures will abort the process.
1102      * Does not run 'work' if callback is canceled.
1103      */
1104     CallbackHandle _scheduleWorkAt(Date_t when, const CallbackFn& work);
1105 
1106     /**
1107      * Creates an event.
1108      * Returns invalid event handle if the executor is shutting down.
1109      * Otherwise aborts on non-shutdown error.
1110      */
1111     EventHandle _makeEvent();
1112 
1113     /**
1114      * Wrap a function into executor callback.
1115      * If the callback is cancelled, the given function won't run.
1116      */
1117     executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function<void()>& work);
1118 
1119     /**
1120      * Finish catch-up mode and start drain mode.
1121      */
1122     void _enterDrainMode_inlock();
1123 
1124     /**
1125      * Waits for the config state to leave kConfigStartingUp, which indicates that start() has
1126      * finished.
1127      */
1128     void _waitForStartUpComplete();
1129 
1130     /**
1131      * Cancels the running election, if any, and returns an event that will be signaled when the
1132      * canceled election completes. If there is no running election, returns an invalid event
1133      * handle.
1134      */
1135     executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inlock();
1136 
1137     /**
1138      * Waits until the optime of the current node is at least the 'opTime'.
1139      */
1140     Status _waitUntilOpTime(OperationContext* opCtx,
1141                             bool isMajorityReadConcern,
1142                             OpTime opTime,
1143                             boost::optional<Date_t> deadline = boost::none);
1144 
1145     /**
1146      * Waits until the optime of the current node is at least the opTime specified in 'readConcern'.
1147      * Supports local and majority readConcern.
1148      */
1149     // TODO: remove when SERVER-29729 is done
1150     Status _waitUntilOpTimeForReadDeprecated(OperationContext* opCtx,
1151                                              const ReadConcernArgs& readConcern);
1152 
1153     /**
1154      * Waits until the deadline or until the optime of the current node is at least the clusterTime
1155      * specified in 'readConcern'. Supports local and majority readConcern.
1156      * If maxTimeMS and deadline are both specified, it waits for min(maxTimeMS, deadline).
1157      */
1158     Status _waitUntilClusterTimeForRead(OperationContext* opCtx,
1159                                         const ReadConcernArgs& readConcern,
1160                                         boost::optional<Date_t> deadline);
1161 
1162     /**
1163      * Returns a pseudorandom number no less than 0 and less than limit (which must be positive).
1164      */
1165     int64_t _nextRandomInt64_inlock(int64_t limit);
1166 
1167     //
1168     // All member variables are labeled with one of the following codes indicating the
1169     // synchronization rules for accessing them.
1170     //
1171     // (R)  Read-only in concurrent operation; no synchronization required.
1172     // (S)  Self-synchronizing; access in any way from any context.
1173     // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing;
1174     //      Access in any context.
1175     // (M)  Reads and writes guarded by _mutex
1176     // (GM) Readable under any global intent lock or _mutex.  Must hold both the global lock in
1177     //      exclusive mode (MODE_X) and hold _mutex to write.
1178     // (I)  Independently synchronized, see member variable comment.
1179 
1180     // Protects member data of this ReplicationCoordinator.
1181     mutable stdx::mutex _mutex;  // (S)
1182 
1183     // Handles to actively queued heartbeats.
1184     HeartbeatHandles _heartbeatHandles;  // (M)
1185 
1186     // When this node does not know itself to be a member of a config, it adds
1187     // every host that sends it a heartbeat request to this set, and also starts
1188     // sending heartbeat requests to that host.  This set is cleared whenever
1189     // a node discovers that it is a member of a config.
1190     unordered_set<HostAndPort> _seedList;  // (M)
1191 
1192     // Back pointer to the ServiceContext that has started the instance.
1193     ServiceContext* const _service;  // (S)
1194 
1195     // Parsed command line arguments related to replication.
1196     const ReplSettings _settings;  // (R)
1197 
1198     // Mode of replication specified by _settings.
1199     const Mode _replMode;  // (R)
1200 
1201     // Pointer to the TopologyCoordinator owned by this ReplicationCoordinator.
1202     std::unique_ptr<TopologyCoordinator> _topCoord;  // (M)
1203 
1204     // Executor that drives the topology coordinator.
1205     std::unique_ptr<executor::TaskExecutor> _replExecutor;  // (S)
1206 
1207     // Pointer to the ReplicationCoordinatorExternalState owned by this ReplicationCoordinator.
1208     std::unique_ptr<ReplicationCoordinatorExternalState> _externalState;  // (PS)
1209 
1210     // Our RID, used to identify us to our sync source when sending replication progress
1211     // updates upstream.  Set once in startReplication() and then never modified again.
1212     OID _myRID;  // (M)
1213 
1214     // list of information about clients waiting on replication.  Does *not* own the WaiterInfos.
1215     WaiterList _replicationWaiterList;  // (M)
1216 
1217     // list of information about clients waiting for a particular opTime.
1218     // Does *not* own the WaiterInfos.
1219     WaiterList _opTimeWaiterList;  // (M)
1220 
1221     // Set to true when we are in the process of shutting down replication.
1222     bool _inShutdown;  // (M)
1223 
1224     // Election ID of the last election that resulted in this node becoming primary.
1225     OID _electionId;  // (M)
1226 
1227     // Used to signal threads waiting for changes to _memberState.
1228     stdx::condition_variable _memberStateChange;  // (M)
1229 
1230     // Current ReplicaSet state.
1231     MemberState _memberState;  // (M)
1232 
1233     // Used to signal threads waiting for changes to _memberState.
1234     stdx::condition_variable _drainFinishedCond;  // (M)
1235 
1236     ReplicationCoordinator::ApplierState _applierState = ApplierState::Running;  // (M)
1237 
1238     // Used to signal threads waiting for changes to _rsConfigState.
1239     stdx::condition_variable _rsConfigStateChange;  // (M)
1240 
1241     // Represents the configuration state of the coordinator, which controls how and when
1242     // _rsConfig may change.  See the state transition diagram in the type definition of
1243     // ConfigState for details.
1244     ConfigState _rsConfigState;  // (M)
1245 
1246     // The current ReplicaSet configuration object, including the information about tag groups
1247     // that is used to satisfy write concern requests with named gle modes.
1248     ReplSetConfig _rsConfig;  // (M)
1249 
1250     // This member's index position in the current config.
1251     int _selfIndex;  // (M)
1252 
1253     // Condition to signal when new heartbeat data comes in.
1254     stdx::condition_variable _stepDownWaiters;  // (M)
1255 
1256     // State for conducting an election of this node.
1257     // the presence of a non-null _freshnessChecker pointer indicates that an election is
1258     // currently in progress. When using the V1 protocol, a non-null _voteRequester pointer
1259     // indicates this instead.
1260     // Only one election is allowed at a time.
1261     std::unique_ptr<FreshnessChecker> _freshnessChecker;  // (M)
1262 
1263     std::unique_ptr<ElectCmdRunner> _electCmdRunner;  // (M)
1264 
1265     std::unique_ptr<VoteRequester> _voteRequester;  // (M)
1266 
1267     // Event that the election code will signal when the in-progress election completes.
1268     // Unspecified value when _freshnessChecker is NULL.
1269     executor::TaskExecutor::EventHandle _electionFinishedEvent;  // (M)
1270 
1271     // Event that the election code will signal when the in-progress election dry run completes,
1272     // which includes writing the last vote and scheduling the real election.
1273     executor::TaskExecutor::EventHandle _electionDryRunFinishedEvent;  // (M)
1274 
1275     // Whether we slept last time we attempted an election but possibly tied with other nodes.
1276     bool _sleptLastElection;  // (M)
1277 
1278     // Flag that indicates whether writes to databases other than "local" are allowed.  Used to
1279     // answer canAcceptWritesForDatabase() and canAcceptWritesFor() questions.
1280     // Always true for standalone nodes and masters in master-slave relationships.
1281     AtomicWord<bool> _canAcceptNonLocalWrites;  // (GM)
1282 
1283     // Flag that indicates whether reads from databases other than "local" are allowed.  Unlike
1284     // _canAcceptNonLocalWrites, above, this question is about admission control on secondaries,
1285     // and we do not require that its observers be strongly synchronized.  Accidentally
1286     // providing the prior value for a limited period of time is acceptable.  Also unlike
1287     // _canAcceptNonLocalWrites, its value is only meaningful on replica set secondaries.
1288     AtomicUInt32 _canServeNonLocalReads;  // (S)
1289 
1290     // ReplicationProcess used to hold information related to the replication and application of
1291     // operations from the sync source.
1292     ReplicationProcess* const _replicationProcess;  // (PS)
1293 
1294     // Storage interface used by initial syncer.
1295     StorageInterface* _storage;  // (PS)
1296     // InitialSyncer used for initial sync.
1297     std::shared_ptr<InitialSyncer>
1298         _initialSyncer;  // (I) pointer set under mutex, copied by callers.
1299 
1300     // Hands out the next snapshot name.
1301     AtomicUInt64 _snapshotNameGenerator;  // (S)
1302 
1303     // The OpTimes and SnapshotNames for all snapshots newer than the current commit point, kept in
1304     // sorted order. Any time this is changed, you must also update _uncommitedSnapshotsSize.
1305     std::deque<OpTime> _uncommittedSnapshots;  // (M)
1306 
1307     // A cache of the size of _uncommittedSnaphots that can be read without any locking.
1308     // May only be written to while holding _mutex.
1309     AtomicUInt64 _uncommittedSnapshotsSize;  // (I)
1310 
1311     // The non-null OpTime and SnapshotName of the current snapshot used for committed reads, if
1312     // there is one.
1313     // When engaged, this must be <= _lastCommittedOpTime and < _uncommittedSnapshots.front().
1314     boost::optional<OpTime> _currentCommittedSnapshot;  // (M)
1315 
1316     // A set of optimes that are used for computing the replication system's current 'stable'
1317     // optime. Every time a node's applied optime is updated, it will be added to this set.
1318     // Optimes that are older than the current stable optime should get removed from this set.
1319     // This set should also be cleared if a rollback occurs.
1320     std::set<OpTime> _stableOpTimeCandidates;  // (M)
1321 
1322     // Used to signal threads that are waiting for new committed snapshots.
1323     stdx::condition_variable _currentCommittedSnapshotCond;  // (M)
1324 
1325     // Callback Handle used to cancel a scheduled LivenessTimeout callback.
1326     executor::TaskExecutor::CallbackHandle _handleLivenessTimeoutCbh;  // (M)
1327 
1328     // Callback Handle used to cancel a scheduled ElectionTimeout callback.
1329     executor::TaskExecutor::CallbackHandle _handleElectionTimeoutCbh;  // (M)
1330 
1331     // Election timeout callback will not run before this time.
1332     // If this date is Date_t(), the callback is either unscheduled or canceled.
1333     // Used for testing only.
1334     Date_t _handleElectionTimeoutWhen;  // (M)
1335 
1336     // Callback Handle used to cancel a scheduled PriorityTakeover callback.
1337     executor::TaskExecutor::CallbackHandle _priorityTakeoverCbh;  // (M)
1338 
1339     // Priority takeover callback will not run before this time.
1340     // If this date is Date_t(), the callback is either unscheduled or canceled.
1341     // Used for testing only.
1342     Date_t _priorityTakeoverWhen;  // (M)
1343 
1344     // Callback Handle used to cancel a scheduled CatchupTakeover callback.
1345     executor::TaskExecutor::CallbackHandle _catchupTakeoverCbh;  // (M)
1346 
1347     // Catchup takeover callback will not run before this time.
1348     // If this date is Date_t(), the callback is either unscheduled or canceled.
1349     // Used for testing only.
1350     Date_t _catchupTakeoverWhen;  // (M)
1351 
1352     // Callback handle used by _waitForStartUpComplete() to block until configuration
1353     // is loaded and external state threads have been started (unless this node is an arbiter).
1354     CallbackHandle _finishLoadLocalConfigCbh;  // (M)
1355 
1356     // The id of the earliest member, for which the handleLivenessTimeout callback has been
1357     // scheduled.  We need this so that we don't needlessly cancel and reschedule the callback on
1358     // every liveness update.
1359     int _earliestMemberId = -1;  // (M)
1360 
1361     // Cached copy of the current config protocol version.
1362     AtomicInt64 _protVersion{1};  // (S)
1363 
1364     // Source of random numbers used in setting election timeouts, etc.
1365     PseudoRandom _random;  // (M)
1366 
1367     // This setting affects the Applier prefetcher behavior.
1368     mutable stdx::mutex _indexPrefetchMutex;
1369     ReplSettings::IndexPrefetchConfig _indexPrefetchConfig =
1370         ReplSettings::IndexPrefetchConfig::PREFETCH_ALL;  // (I)
1371 
1372     // The catchup state including all catchup logic. The presence of a non-null pointer indicates
1373     // that the node is currently in catchup mode.
1374     std::unique_ptr<CatchupState> _catchupState;  // (X)
1375 
1376     // Atomic-synchronized copy of Topology Coordinator's _term, for use by the public getTerm()
1377     // function.
1378     // This variable must be written immediately after _term, and thus its value can lag.
1379     // Reading this value does not require the replication coordinator mutex to be locked.
1380     AtomicInt64 _termShadow;  // (S)
1381 
1382     // When we decide to step down due to hearing about a higher term, we remember the term we heard
1383     // here so we can update our term to match as part of finishing stepdown.
1384     boost::optional<long long> _pendingTermUpdateDuringStepDown;  // (M)
1385 
1386     // Whether data replication is active.
1387     bool _startedSteadyStateReplication = false;  // (M)
1388 
1389     // If we're in terminal shutdown.  If true, we'll refuse to vote in elections.
1390     bool _inTerminalShutdown = false;  // (M)
1391 };
1392 
1393 }  // namespace repl
1394 }  // namespace mongo
1395