1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <boost/optional.hpp>
34 #include <cstddef>
35 
36 #include "mongo/base/disallow_copying.h"
37 #include "mongo/bson/timestamp.h"
38 #include "mongo/db/repl/member_state.h"
39 #include "mongo/db/repl/multiapplier.h"
40 #include "mongo/db/repl/oplog_buffer.h"
41 #include "mongo/db/repl/optime.h"
42 #include "mongo/db/repl/sync_tail.h"
43 #include "mongo/stdx/functional.h"
44 #include "mongo/util/time_support.h"
45 
46 namespace mongo {
47 
48 class BSONObj;
49 class OID;
50 class OldThreadPool;
51 class OperationContext;
52 class ServiceContext;
53 class Status;
54 struct HostAndPort;
55 template <typename T>
56 class StatusWith;
57 
58 namespace repl {
59 
60 class LastVote;
61 class ReplSettings;
62 class ReplicationCoordinator;
63 
64 /**
65  * This class represents the interface the ReplicationCoordinator uses to interact with the
66  * rest of the system.  All functionality of the ReplicationCoordinatorImpl that would introduce
67  * dependencies on large sections of the server code and thus break the unit testability of
68  * ReplicationCoordinatorImpl should be moved here.
69  */
70 class ReplicationCoordinatorExternalState {
71     MONGO_DISALLOW_COPYING(ReplicationCoordinatorExternalState);
72 
73 public:
ReplicationCoordinatorExternalState()74     ReplicationCoordinatorExternalState() {}
~ReplicationCoordinatorExternalState()75     virtual ~ReplicationCoordinatorExternalState() {}
76 
77     /**
78      * Starts the journal listener, and snapshot threads
79      *
80      * NOTE: Only starts threads if they are not already started,
81      */
82     virtual void startThreads(const ReplSettings& settings) = 0;
83 
84     /**
85      * Returns true if an incomplete initial sync is detected.
86      */
87     virtual bool isInitialSyncFlagSet(OperationContext* opCtx) = 0;
88 
89     /**
90      * Starts steady state sync for replica set member.
91      *
92      * NOTE: Use either this or the Master/Slave version, but not both.
93      */
94     virtual void startSteadyStateReplication(OperationContext* opCtx,
95                                              ReplicationCoordinator* replCoord) = 0;
96 
97     /**
98      * Stops the data replication threads = bgsync, applier, reporter.
99      */
100     virtual void stopDataReplication(OperationContext* opCtx) = 0;
101 
102     /**
103      * Starts the Master/Slave threads and sets up logOp
104      */
105     virtual void startMasterSlave(OperationContext* opCtx) = 0;
106 
107     /**
108      * Performs any necessary external state specific shutdown tasks, such as cleaning up
109      * the threads it started.
110      */
111     virtual void shutdown(OperationContext* opCtx) = 0;
112 
113     /**
114      * Returns task executor for scheduling tasks to be run asynchronously.
115      */
116     virtual executor::TaskExecutor* getTaskExecutor() const = 0;
117 
118     /**
119      * Returns shared db worker thread pool for collection cloning.
120      */
121     virtual OldThreadPool* getDbWorkThreadPool() const = 0;
122 
123     /**
124      * Runs the repair database command on the "local" db, if the storage engine is MMapV1.
125      * Note: Used after initial sync to compact the database files.
126      */
127     virtual Status runRepairOnLocalDB(OperationContext* opCtx) = 0;
128 
129     /**
130      * Creates the oplog, writes the first entry and stores the replica set config document.
131      */
132     virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config) = 0;
133 
134     /**
135      * Waits for all committed writes to be visible in the oplog.  Committed writes will be hidden
136      * if there are uncommitted writes ahead of them, and some operations require that all committed
137      * writes are visible before proceeding.
138      */
139     virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) = 0;
140 
141     /**
142      * Called when a node on way to becoming a primary is ready to leave drain mode. It is called
143      * outside of the global X lock and the replication coordinator mutex.
144      *
145      * Throws on errors.
146      */
147     virtual void onDrainComplete(OperationContext* opCtx) = 0;
148 
149     /**
150      * Called as part of the process of transitioning to primary and run with the global X lock and
151      * the replication coordinator mutex acquired, so no majoirty writes are allowed while in this
152      * state. See the call site in ReplicationCoordinatorImpl for details about when and how it is
153      * called.
154      *
155      * Among other things, this writes a message about our transition to primary to the oplog if
156      * isV1 and and returns the optime of that message. If !isV1, returns the optime of the last op
157      * in the oplog.
158      *
159      * Throws on errors.
160      */
161     virtual OpTime onTransitionToPrimary(OperationContext* opCtx, bool isV1ElectionProtocol) = 0;
162 
163     /**
164      * Simple wrapper around SyncSourceFeedback::forwardSlaveProgress.  Signals to the
165      * SyncSourceFeedback thread that it needs to wake up and send a replSetUpdatePosition
166      * command upstream.
167      */
168     virtual void forwardSlaveProgress() = 0;
169 
170     /**
171      * Queries the singleton document in local.me.  If it exists and our hostname has not
172      * changed since we wrote, returns the RID stored in the object.  If the document does not
173      * exist or our hostname doesn't match what was recorded in local.me, generates a new OID
174      * to use as our RID, stores it in local.me, and returns it.
175      */
176     virtual OID ensureMe(OperationContext*) = 0;
177 
178     /**
179      * Returns true if "host" is one of the network identities of this node.
180      */
181     virtual bool isSelf(const HostAndPort& host, ServiceContext* service) = 0;
182 
183     /**
184      * Gets the replica set config document from local storage, or returns an error.
185      */
186     virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* opCtx) = 0;
187 
188     /**
189      * Stores the replica set config document in local storage, or returns an error.
190      */
191     virtual Status storeLocalConfigDocument(OperationContext* opCtx, const BSONObj& config) = 0;
192 
193 
194     /**
195      * Creates the collection for "lastVote" documents and initializes it, or returns an error.
196      */
197     virtual Status createLocalLastVoteCollection(OperationContext* opCtx) = 0;
198 
199     /**
200      * Gets the replica set lastVote document from local storage, or returns an error.
201      */
202     virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx) = 0;
203 
204     /**
205      * Stores the replica set lastVote document in local storage, or returns an error.
206      */
207     virtual Status storeLocalLastVoteDocument(OperationContext* opCtx,
208                                               const LastVote& lastVote) = 0;
209 
210     /**
211      * Sets the global opTime to be 'newTime'.
212      */
213     virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) = 0;
214 
215     /**
216      * Gets the last optime of an operation performed on this host, from stable
217      * storage.
218      */
219     virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx) = 0;
220 
221     /**
222      * Returns the HostAndPort of the remote client connected to us that initiated the operation
223      * represented by "opCtx".
224      */
225     virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx) = 0;
226 
227     /**
228      * Closes all connections in the given TransportLayer except those marked with the
229      * keepOpen property, which should just be connections used for heartbeating.
230      * This is used during stepdown, and transition out of primary.
231      */
232     virtual void closeConnections() = 0;
233 
234     /**
235      * Kills all operations that have a Client that is associated with an incoming user
236      * connection.  Used during stepdown.
237      */
238     virtual void killAllUserOperations(OperationContext* opCtx) = 0;
239 
240     /**
241      * Resets any active sharding metadata on this server and stops any sharding-related threads
242      * (such as the balancer). It is called after stepDown to ensure that if the node becomes
243      * primary again in the future it will recover its state from a clean slate.
244      */
245     virtual void shardingOnStepDownHook() = 0;
246 
247     /**
248      * Notifies the bgsync and syncSourceFeedback threads to choose a new sync source.
249      */
250     virtual void signalApplierToChooseNewSyncSource() = 0;
251 
252     /**
253      * Notifies the bgsync to stop fetching data.
254      */
255     virtual void stopProducer() = 0;
256 
257     /**
258      * Start bgsync's producer if it's stopped.
259      */
260     virtual void startProducerIfStopped() = 0;
261 
262     /**
263      * Drops all snapshots and clears the "committed" snapshot.
264      */
265     virtual void dropAllSnapshots() = 0;
266 
267     /**
268      * Updates the committed snapshot to the newCommitPoint, and deletes older snapshots.
269      *
270      * It is illegal to call with a newCommitPoint that does not name an existing snapshot.
271      */
272     virtual void updateCommittedSnapshot(const OpTime& newCommitPoint) = 0;
273 
274     /**
275      * Returns whether or not the SnapshotThread is active.
276      */
277     virtual bool snapshotsEnabled() const = 0;
278 
279     /**
280      * Notifies listeners of a change in the commit level.
281      */
282     virtual void notifyOplogMetadataWaiters(const OpTime& committedOpTime) = 0;
283 
284     /**
285      * Returns multiplier to apply to election timeout to obtain upper bound
286      * on randomized offset.
287      */
288     virtual double getElectionTimeoutOffsetLimitFraction() const = 0;
289 
290     /**
291      * Returns true if the current storage engine supports read committed.
292      */
293     virtual bool isReadCommittedSupportedByStorageEngine(OperationContext* opCtx) const = 0;
294 
295     /**
296      * Applies the operations described in the oplog entries contained in "ops" using the
297      * "applyOperation" function.
298      */
299     virtual StatusWith<OpTime> multiApply(OperationContext* opCtx,
300                                           MultiApplier::Operations ops,
301                                           MultiApplier::ApplyOperationFn applyOperation) = 0;
302 
303     /**
304      * Used by multiApply() to writes operations to database during steady state replication.
305      */
306     virtual Status multiSyncApply(MultiApplier::OperationPtrs* ops) = 0;
307 
308     /**
309      * Used by multiApply() to writes operations to database during initial sync. `fetchCount` is a
310      * pointer to a counter that is incremented every time we fetch a missing document.
311      *
312      */
313     virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
314                                          const HostAndPort& source,
315                                          AtomicUInt32* fetchCount) = 0;
316 
317     /**
318      * This function creates an oplog buffer of the type specified at server startup.
319      */
320     virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(
321         OperationContext* opCtx) const = 0;
322 
323     /**
324      * Creates an oplog buffer suitable for steady state replication.
325      */
326     virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer(
327         OperationContext* opCtx) const = 0;
328 
329     /**
330      * Returns maximum number of times that the oplog fetcher will consecutively restart the oplog
331      * tailing query on non-cancellation errors during steady state replication.
332      */
333     virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const = 0;
334 
335     /**
336      * Returns maximum number of times that the oplog fetcher will consecutively restart the oplog
337      * tailing query on non-cancellation errors during initial sync.
338      */
339     virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const = 0;
340 
341     /**
342      * Returns initial sync oplog application batch limits.
343      */
344     virtual SyncTail::BatchLimits getInitialSyncBatchLimits() const = 0;
345 
346     /*
347      * Creates noop writer instance. Setting the _noopWriter member is not protected by a guard,
348      * hence it must be called before multi-threaded operations start.
349      */
350     virtual void setupNoopWriter(Seconds waitTime) = 0;
351 
352     /*
353      * Starts periodic noop writes to oplog.
354      */
355     virtual void startNoopWriter(OpTime) = 0;
356 
357     /*
358      * Stops periodic noop writes to oplog.
359      */
360     virtual void stopNoopWriter() = 0;
361 };
362 
363 }  // namespace repl
364 }  // namespace mongo
365