1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <memory> 34 #include <string> 35 #include <vector> 36 37 #include "mongo/base/disallow_copying.h" 38 #include "mongo/base/status.h" 39 #include "mongo/base/string_data.h" 40 #include "mongo/bson/bsonobj.h" 41 #include "mongo/client/fetcher.h" 42 #include "mongo/client/remote_command_retry_scheduler.h" 43 #include "mongo/db/catalog/collection_options.h" 44 #include "mongo/db/namespace_string.h" 45 #include "mongo/db/operation_context.h" 46 #include "mongo/db/repl/base_cloner.h" 47 #include "mongo/db/repl/callback_completion_guard.h" 48 #include "mongo/db/repl/storage_interface.h" 49 #include "mongo/db/repl/task_runner.h" 50 #include "mongo/executor/task_executor.h" 51 #include "mongo/s/query/async_results_merger.h" 52 #include "mongo/stdx/condition_variable.h" 53 #include "mongo/stdx/functional.h" 54 #include "mongo/stdx/mutex.h" 55 #include "mongo/util/concurrency/old_thread_pool.h" 56 #include "mongo/util/net/hostandport.h" 57 #include "mongo/util/progress_meter.h" 58 59 namespace mongo { 60 61 class OldThreadPool; 62 63 namespace repl { 64 65 class StorageInterface; 66 67 class CollectionCloner : public BaseCloner { 68 MONGO_DISALLOW_COPYING(CollectionCloner); 69 70 public: 71 /** 72 * Callback completion guard for CollectionCloner. 73 */ 74 using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; 75 using OnCompletionGuard = CallbackCompletionGuard<Status>; 76 77 struct Stats { 78 static constexpr StringData kDocumentsToCopyFieldName = "documentsToCopy"_sd; 79 static constexpr StringData kDocumentsCopiedFieldName = "documentsCopied"_sd; 80 81 std::string ns; 82 Date_t start; 83 Date_t end; 84 size_t documentToCopy{0}; 85 size_t documentsCopied{0}; 86 size_t indexes{0}; 87 size_t fetchBatches{0}; 88 89 std::string toString() const; 90 BSONObj toBSON() const; 91 void append(BSONObjBuilder* builder) const; 92 }; 93 /** 94 * Type of function to schedule storage interface tasks with the executor. 95 * 96 * Used for testing only. 97 */ 98 using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>( 99 const executor::TaskExecutor::CallbackFn&)>; 100 101 /** 102 * Creates CollectionCloner task in inactive state. Use start() to activate cloner. 103 * 104 * The cloner calls 'onCompletion' when the collection cloning has completed or failed. 105 * 106 * 'onCompletion' will be called exactly once. 107 * 108 * Takes ownership of the passed StorageInterface object. 109 */ 110 CollectionCloner(executor::TaskExecutor* executor, 111 OldThreadPool* dbWorkThreadPool, 112 const HostAndPort& source, 113 const NamespaceString& sourceNss, 114 const CollectionOptions& options, 115 const CallbackFn& onCompletion, 116 StorageInterface* storageInterface, 117 const int batchSize, 118 const int maxNumClonerCursors); 119 120 virtual ~CollectionCloner(); 121 122 const NamespaceString& getSourceNamespace() const; 123 124 bool isActive() const override; 125 126 Status startup() noexcept override; 127 128 void shutdown() override; 129 130 void join() override; 131 132 CollectionCloner::Stats getStats() const; 133 134 // 135 // Testing only functions below. 136 // 137 138 /** 139 * Waits for database worker to complete. 140 * Returns immediately if collection cloner is not active. 141 * 142 * For testing only. 143 */ 144 void waitForDbWorker(); 145 146 /** 147 * Overrides how executor schedules database work. 148 * 149 * For testing only. 150 */ 151 void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn); 152 153 /** 154 * Returns the documents currently stored in the '_documents' buffer that is intended 155 * to be inserted through the collection loader. 156 * 157 * For testing only. 158 */ 159 std::vector<BSONObj> getDocumentsToInsert_forTest(); 160 161 private: 162 bool _isActive_inlock() const; 163 164 /** 165 * Returns whether the CollectionCloner is in shutdown. 166 */ 167 bool _isShuttingDown() const; 168 169 /** 170 * Cancels all outstanding work. 171 * Used by shutdown() and CompletionGuard::setResultAndCancelRemainingWork(). 172 */ 173 void _cancelRemainingWork_inlock(); 174 175 /** 176 * Read number of documents in collection from count result. 177 */ 178 void _countCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& args); 179 180 /** 181 * Read index specs from listIndexes result. 182 */ 183 void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, 184 Fetcher::NextAction* nextAction, 185 BSONObjBuilder* getMoreBob); 186 187 /** 188 * Request storage interface to create collection. 189 * 190 * Called multiple times if there are more than one batch of responses from listIndexes 191 * cursor. 192 * 193 * 'nextAction' is an in/out arg indicating the next action planned and to be taken 194 * by the fetcher. 195 */ 196 void _beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& callbackData); 197 198 /** 199 * The possible command types that can be used to establish the initial cursors on the 200 * remote collection. 201 */ 202 enum EstablishCursorsCommand { Find, ParallelCollScan }; 203 204 /** 205 * Parses the cursor responses from the 'find' or 'parallelCollectionScan' command 206 * and passes them into the 'AsyncResultsMerger'. 207 */ 208 void _establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd, 209 EstablishCursorsCommand cursorCommand); 210 211 /** 212 * Parses the response from a 'parallelCollectionScan' command into a vector of cursor 213 * elements. 214 */ 215 StatusWith<std::vector<BSONElement>> _parseParallelCollectionScanResponse(BSONObj resp); 216 217 /** 218 * Takes a cursors buffer and parses the 'parallelCollectionScan' response into cursor 219 * responses that are pushed onto the buffer. 220 */ 221 Status _parseCursorResponse(BSONObj response, 222 std::vector<CursorResponse>* cursors, 223 EstablishCursorsCommand cursorCommand); 224 225 /** 226 * Calls to get the next event from the 'AsyncResultsMerger'. This schedules 227 * '_handleAsyncResultsCallback' to be run when the event is signaled successfully. 228 */ 229 Status _scheduleNextARMResultsCallback(std::shared_ptr<OnCompletionGuard> onCompletionGuard); 230 231 /** 232 * Runs for each time a new batch of documents can be retrieved from the 'AsyncResultsMerger'. 233 * Buffers the documents retrieved for insertion and schedules a '_insertDocumentsCallback' 234 * to insert the contents of the buffer. 235 */ 236 void _handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs& cbd, 237 std::shared_ptr<OnCompletionGuard> onCompletionGuard); 238 239 /** 240 * Pull all ready results from the ARM into a buffer to be inserted. 241 */ 242 Status _bufferNextBatchFromArm(WithLock lock); 243 244 /** 245 * Called whenever there is a new batch of documents ready from the 'AsyncResultsMerger'. 246 * On the last batch, 'lastBatch' will be true. 247 * 248 * Each document returned will be inserted via the storage interfaceRequest storage 249 * interface. 250 */ 251 void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd, 252 bool lastBatch, 253 std::shared_ptr<OnCompletionGuard> onCompletionGuard); 254 255 /** 256 * Verifies that an error from the ARM was the result of a collection drop. If 257 * so, cloning is stopped with no error. Otherwise it is stopped with the given error. 258 */ 259 void _verifyCollectionWasDropped(const stdx::unique_lock<stdx::mutex>& lk, 260 Status batchStatus, 261 std::shared_ptr<OnCompletionGuard> onCompletionGuard, 262 OperationContext* opCtx); 263 264 /** 265 * Reports completion status. 266 * Commits/aborts collection building. 267 * Sets cloner to inactive. 268 */ 269 void _finishCallback(const Status& status); 270 271 // 272 // All member variables are labeled with one of the following codes indicating the 273 // synchronization rules for accessing them. 274 // 275 // (R) Read-only in concurrent operation; no synchronization required. 276 // (M) Reads and writes guarded by _mutex 277 // (S) Self-synchronizing; access in any way from any context. 278 // (RT) Read-only in concurrent operation; synchronized externally by tests 279 // 280 mutable stdx::mutex _mutex; 281 mutable stdx::condition_variable _condition; // (M) 282 executor::TaskExecutor* _executor; // (R) Not owned by us. 283 OldThreadPool* _dbWorkThreadPool; // (R) Not owned by us. 284 HostAndPort _source; // (R) 285 NamespaceString _sourceNss; // (R) 286 NamespaceString _destNss; // (R) 287 CollectionOptions _options; // (R) 288 std::unique_ptr<CollectionBulkLoader> _collLoader; // (M) 289 CallbackFn _onCompletion; // (M) Invoked once when cloning completes or fails. 290 StorageInterface* _storageInterface; // (R) Not owned by us. 291 RemoteCommandRetryScheduler _countScheduler; // (S) 292 Fetcher _listIndexesFetcher; // (S) 293 std::vector<BSONObj> _indexSpecs; // (M) 294 BSONObj _idIndexSpec; // (M) 295 std::vector<BSONObj> 296 _documentsToInsert; // (M) Documents read from 'AsyncResultsMerger' to insert. 297 TaskRunner _dbWorkTaskRunner; // (R) 298 ScheduleDbWorkFn 299 _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. 300 Stats _stats; // (M) stats for this instance. 301 ProgressMeter _progressMeter; // (M) progress meter for this instance. 302 const int _collectionCloningBatchSize; // (R) The size of the batches of documents returned in 303 // collection cloning. 304 305 // (R) The maximum number of cursors to use in the collection cloning process. 306 const int _maxNumClonerCursors; 307 // (M) Component responsible for fetching the documents from the collection cloner cursor(s). 308 std::unique_ptr<AsyncResultsMerger> _arm; 309 // (R) The cursor parameters used by the 'AsyncResultsMerger'. 310 std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams; 311 312 // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'. 313 executor::TaskExecutor::EventHandle _killArmHandle; 314 315 // (M) Scheduler used to establish the initial cursor or set of cursors. 316 std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler; 317 318 // (M) Scheduler used to determine if a cursor was closed because the collection was dropped. 319 std::unique_ptr<RemoteCommandRetryScheduler> _verifyCollectionDroppedScheduler; 320 321 // State transitions: 322 // PreStart --> Running --> ShuttingDown --> Complete 323 // It is possible to skip intermediate states. For example, 324 // Calling shutdown() when the cloner has not started will transition from PreStart directly 325 // to Complete. 326 enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; 327 State _state = State::kPreStart; // (M) 328 }; 329 330 } // namespace repl 331 } // namespace mongo 332