1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <boost/optional.hpp> 34 #include <cstddef> 35 36 #include "mongo/base/disallow_copying.h" 37 #include "mongo/bson/timestamp.h" 38 #include "mongo/db/repl/member_state.h" 39 #include "mongo/db/repl/multiapplier.h" 40 #include "mongo/db/repl/oplog_buffer.h" 41 #include "mongo/db/repl/optime.h" 42 #include "mongo/db/repl/sync_tail.h" 43 #include "mongo/stdx/functional.h" 44 #include "mongo/util/time_support.h" 45 46 namespace mongo { 47 48 class BSONObj; 49 class OID; 50 class OldThreadPool; 51 class OperationContext; 52 class ServiceContext; 53 class Status; 54 struct HostAndPort; 55 template <typename T> 56 class StatusWith; 57 58 namespace repl { 59 60 class LastVote; 61 class ReplSettings; 62 class ReplicationCoordinator; 63 64 /** 65 * This class represents the interface the ReplicationCoordinator uses to interact with the 66 * rest of the system. All functionality of the ReplicationCoordinatorImpl that would introduce 67 * dependencies on large sections of the server code and thus break the unit testability of 68 * ReplicationCoordinatorImpl should be moved here. 69 */ 70 class ReplicationCoordinatorExternalState { 71 MONGO_DISALLOW_COPYING(ReplicationCoordinatorExternalState); 72 73 public: ReplicationCoordinatorExternalState()74 ReplicationCoordinatorExternalState() {} ~ReplicationCoordinatorExternalState()75 virtual ~ReplicationCoordinatorExternalState() {} 76 77 /** 78 * Starts the journal listener, and snapshot threads 79 * 80 * NOTE: Only starts threads if they are not already started, 81 */ 82 virtual void startThreads(const ReplSettings& settings) = 0; 83 84 /** 85 * Returns true if an incomplete initial sync is detected. 86 */ 87 virtual bool isInitialSyncFlagSet(OperationContext* opCtx) = 0; 88 89 /** 90 * Starts steady state sync for replica set member. 91 * 92 * NOTE: Use either this or the Master/Slave version, but not both. 93 */ 94 virtual void startSteadyStateReplication(OperationContext* opCtx, 95 ReplicationCoordinator* replCoord) = 0; 96 97 /** 98 * Stops the data replication threads = bgsync, applier, reporter. 99 */ 100 virtual void stopDataReplication(OperationContext* opCtx) = 0; 101 102 /** 103 * Starts the Master/Slave threads and sets up logOp 104 */ 105 virtual void startMasterSlave(OperationContext* opCtx) = 0; 106 107 /** 108 * Performs any necessary external state specific shutdown tasks, such as cleaning up 109 * the threads it started. 110 */ 111 virtual void shutdown(OperationContext* opCtx) = 0; 112 113 /** 114 * Returns task executor for scheduling tasks to be run asynchronously. 115 */ 116 virtual executor::TaskExecutor* getTaskExecutor() const = 0; 117 118 /** 119 * Returns shared db worker thread pool for collection cloning. 120 */ 121 virtual OldThreadPool* getDbWorkThreadPool() const = 0; 122 123 /** 124 * Runs the repair database command on the "local" db, if the storage engine is MMapV1. 125 * Note: Used after initial sync to compact the database files. 126 */ 127 virtual Status runRepairOnLocalDB(OperationContext* opCtx) = 0; 128 129 /** 130 * Creates the oplog, writes the first entry and stores the replica set config document. 131 */ 132 virtual Status initializeReplSetStorage(OperationContext* opCtx, const BSONObj& config) = 0; 133 134 /** 135 * Waits for all committed writes to be visible in the oplog. Committed writes will be hidden 136 * if there are uncommitted writes ahead of them, and some operations require that all committed 137 * writes are visible before proceeding. 138 */ 139 virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) = 0; 140 141 /** 142 * Called when a node on way to becoming a primary is ready to leave drain mode. It is called 143 * outside of the global X lock and the replication coordinator mutex. 144 * 145 * Throws on errors. 146 */ 147 virtual void onDrainComplete(OperationContext* opCtx) = 0; 148 149 /** 150 * Called as part of the process of transitioning to primary and run with the global X lock and 151 * the replication coordinator mutex acquired, so no majoirty writes are allowed while in this 152 * state. See the call site in ReplicationCoordinatorImpl for details about when and how it is 153 * called. 154 * 155 * Among other things, this writes a message about our transition to primary to the oplog if 156 * isV1 and and returns the optime of that message. If !isV1, returns the optime of the last op 157 * in the oplog. 158 * 159 * Throws on errors. 160 */ 161 virtual OpTime onTransitionToPrimary(OperationContext* opCtx, bool isV1ElectionProtocol) = 0; 162 163 /** 164 * Simple wrapper around SyncSourceFeedback::forwardSlaveProgress. Signals to the 165 * SyncSourceFeedback thread that it needs to wake up and send a replSetUpdatePosition 166 * command upstream. 167 */ 168 virtual void forwardSlaveProgress() = 0; 169 170 /** 171 * Queries the singleton document in local.me. If it exists and our hostname has not 172 * changed since we wrote, returns the RID stored in the object. If the document does not 173 * exist or our hostname doesn't match what was recorded in local.me, generates a new OID 174 * to use as our RID, stores it in local.me, and returns it. 175 */ 176 virtual OID ensureMe(OperationContext*) = 0; 177 178 /** 179 * Returns true if "host" is one of the network identities of this node. 180 */ 181 virtual bool isSelf(const HostAndPort& host, ServiceContext* service) = 0; 182 183 /** 184 * Gets the replica set config document from local storage, or returns an error. 185 */ 186 virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* opCtx) = 0; 187 188 /** 189 * Stores the replica set config document in local storage, or returns an error. 190 */ 191 virtual Status storeLocalConfigDocument(OperationContext* opCtx, const BSONObj& config) = 0; 192 193 194 /** 195 * Creates the collection for "lastVote" documents and initializes it, or returns an error. 196 */ 197 virtual Status createLocalLastVoteCollection(OperationContext* opCtx) = 0; 198 199 /** 200 * Gets the replica set lastVote document from local storage, or returns an error. 201 */ 202 virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx) = 0; 203 204 /** 205 * Stores the replica set lastVote document in local storage, or returns an error. 206 */ 207 virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, 208 const LastVote& lastVote) = 0; 209 210 /** 211 * Sets the global opTime to be 'newTime'. 212 */ 213 virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) = 0; 214 215 /** 216 * Gets the last optime of an operation performed on this host, from stable 217 * storage. 218 */ 219 virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx) = 0; 220 221 /** 222 * Returns the HostAndPort of the remote client connected to us that initiated the operation 223 * represented by "opCtx". 224 */ 225 virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx) = 0; 226 227 /** 228 * Closes all connections in the given TransportLayer except those marked with the 229 * keepOpen property, which should just be connections used for heartbeating. 230 * This is used during stepdown, and transition out of primary. 231 */ 232 virtual void closeConnections() = 0; 233 234 /** 235 * Kills all operations that have a Client that is associated with an incoming user 236 * connection. Used during stepdown. 237 */ 238 virtual void killAllUserOperations(OperationContext* opCtx) = 0; 239 240 /** 241 * Resets any active sharding metadata on this server and stops any sharding-related threads 242 * (such as the balancer). It is called after stepDown to ensure that if the node becomes 243 * primary again in the future it will recover its state from a clean slate. 244 */ 245 virtual void shardingOnStepDownHook() = 0; 246 247 /** 248 * Notifies the bgsync and syncSourceFeedback threads to choose a new sync source. 249 */ 250 virtual void signalApplierToChooseNewSyncSource() = 0; 251 252 /** 253 * Notifies the bgsync to stop fetching data. 254 */ 255 virtual void stopProducer() = 0; 256 257 /** 258 * Start bgsync's producer if it's stopped. 259 */ 260 virtual void startProducerIfStopped() = 0; 261 262 /** 263 * Drops all snapshots and clears the "committed" snapshot. 264 */ 265 virtual void dropAllSnapshots() = 0; 266 267 /** 268 * Updates the committed snapshot to the newCommitPoint, and deletes older snapshots. 269 * 270 * It is illegal to call with a newCommitPoint that does not name an existing snapshot. 271 */ 272 virtual void updateCommittedSnapshot(const OpTime& newCommitPoint) = 0; 273 274 /** 275 * Returns whether or not the SnapshotThread is active. 276 */ 277 virtual bool snapshotsEnabled() const = 0; 278 279 /** 280 * Notifies listeners of a change in the commit level. 281 */ 282 virtual void notifyOplogMetadataWaiters(const OpTime& committedOpTime) = 0; 283 284 /** 285 * Returns multiplier to apply to election timeout to obtain upper bound 286 * on randomized offset. 287 */ 288 virtual double getElectionTimeoutOffsetLimitFraction() const = 0; 289 290 /** 291 * Returns true if the current storage engine supports read committed. 292 */ 293 virtual bool isReadCommittedSupportedByStorageEngine(OperationContext* opCtx) const = 0; 294 295 /** 296 * Applies the operations described in the oplog entries contained in "ops" using the 297 * "applyOperation" function. 298 */ 299 virtual StatusWith<OpTime> multiApply(OperationContext* opCtx, 300 MultiApplier::Operations ops, 301 MultiApplier::ApplyOperationFn applyOperation) = 0; 302 303 /** 304 * Used by multiApply() to writes operations to database during steady state replication. 305 */ 306 virtual Status multiSyncApply(MultiApplier::OperationPtrs* ops) = 0; 307 308 /** 309 * Used by multiApply() to writes operations to database during initial sync. `fetchCount` is a 310 * pointer to a counter that is incremented every time we fetch a missing document. 311 * 312 */ 313 virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, 314 const HostAndPort& source, 315 AtomicUInt32* fetchCount) = 0; 316 317 /** 318 * This function creates an oplog buffer of the type specified at server startup. 319 */ 320 virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer( 321 OperationContext* opCtx) const = 0; 322 323 /** 324 * Creates an oplog buffer suitable for steady state replication. 325 */ 326 virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer( 327 OperationContext* opCtx) const = 0; 328 329 /** 330 * Returns maximum number of times that the oplog fetcher will consecutively restart the oplog 331 * tailing query on non-cancellation errors during steady state replication. 332 */ 333 virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const = 0; 334 335 /** 336 * Returns maximum number of times that the oplog fetcher will consecutively restart the oplog 337 * tailing query on non-cancellation errors during initial sync. 338 */ 339 virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const = 0; 340 341 /** 342 * Returns initial sync oplog application batch limits. 343 */ 344 virtual SyncTail::BatchLimits getInitialSyncBatchLimits() const = 0; 345 346 /* 347 * Creates noop writer instance. Setting the _noopWriter member is not protected by a guard, 348 * hence it must be called before multi-threaded operations start. 349 */ 350 virtual void setupNoopWriter(Seconds waitTime) = 0; 351 352 /* 353 * Starts periodic noop writes to oplog. 354 */ 355 virtual void startNoopWriter(OpTime) = 0; 356 357 /* 358 * Stops periodic noop writes to oplog. 359 */ 360 virtual void stopNoopWriter() = 0; 361 }; 362 363 } // namespace repl 364 } // namespace mongo 365