1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <iosfwd> 34 #include <list> 35 #include <string> 36 #include <vector> 37 38 #include "mongo/base/disallow_copying.h" 39 #include "mongo/base/status.h" 40 #include "mongo/bson/bsonobj.h" 41 #include "mongo/client/fetcher.h" 42 #include "mongo/db/namespace_string.h" 43 #include "mongo/db/repl/base_cloner.h" 44 #include "mongo/db/repl/collection_cloner.h" 45 #include "mongo/executor/task_executor.h" 46 #include "mongo/stdx/condition_variable.h" 47 #include "mongo/stdx/mutex.h" 48 #include "mongo/util/net/hostandport.h" 49 50 namespace mongo { 51 52 class OldThreadPool; 53 54 namespace repl { 55 namespace { 56 57 using UniqueLock = stdx::unique_lock<stdx::mutex>; 58 59 } // namespace 60 61 class StorageInterface; 62 63 class DatabaseCloner : public BaseCloner { 64 MONGO_DISALLOW_COPYING(DatabaseCloner); 65 66 public: 67 struct Stats { 68 std::string dbname; 69 Date_t start; 70 Date_t end; 71 size_t collections{0}; 72 size_t clonedCollections{0}; 73 std::vector<CollectionCloner::Stats> collectionStats; 74 75 std::string toString() const; 76 BSONObj toBSON() const; 77 void append(BSONObjBuilder* builder) const; 78 }; 79 80 /** 81 * Predicate used on the collection info objects returned by listCollections. 82 * Each collection info is represented by a document in the following format: 83 * { 84 * name: <collection name>, 85 * options: <collection options> 86 * } 87 * 88 * Returns true if the collection described by the info object should be cloned. 89 * Returns false if the collection should be ignored. 90 */ 91 using ListCollectionsPredicateFn = stdx::function<bool(const BSONObj&)>; 92 93 /** 94 * Callback function to report progress of collection cloning. Arguments are: 95 * - status from the collection cloner's 'onCompletion' callback. 96 * - source namespace of the collection cloner that completed (or failed). 97 * 98 * Called exactly once for every collection cloner started by the the database cloner. 99 */ 100 using CollectionCallbackFn = stdx::function<void(const Status&, const NamespaceString&)>; 101 102 /** 103 * Type of function to start a collection cloner. 104 */ 105 using StartCollectionClonerFn = stdx::function<Status(CollectionCloner&)>; 106 107 /** 108 * Creates DatabaseCloner task in inactive state. Use start() to activate cloner. 109 * 110 * The cloner calls 'onCompletion' when the database cloning has completed or failed. 111 * 112 * 'onCompletion' will be called exactly once. 113 * 114 * Takes ownership of the passed StorageInterface object. 115 * 116 * 'listCollectionsFilter' will be extended to include collections only, filtering out views. 117 */ 118 DatabaseCloner(executor::TaskExecutor* executor, 119 OldThreadPool* dbWorkThreadPool, 120 const HostAndPort& source, 121 const std::string& dbname, 122 const BSONObj& listCollectionsFilter, 123 const ListCollectionsPredicateFn& listCollectionsPredicate, 124 StorageInterface* storageInterface, 125 const CollectionCallbackFn& collectionWork, 126 const CallbackFn& onCompletion); 127 128 virtual ~DatabaseCloner(); 129 130 /** 131 * Returns collection info objects read from listCollections result and will not include views. 132 */ 133 const std::vector<BSONObj>& getCollectionInfos_forTest() const; 134 135 bool isActive() const override; 136 137 Status startup() noexcept override; 138 139 void shutdown() override; 140 141 void join() override; 142 143 DatabaseCloner::Stats getStats() const; 144 145 std::string getDBName() const; 146 147 // 148 // Testing only functions below. 149 // 150 151 /** 152 * Overrides how executor schedules database work. 153 * 154 * For testing only. 155 */ 156 void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); 157 158 /** 159 * Overrides how executor starts a collection cloner. 160 * 161 * For testing only 162 */ 163 void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner); 164 165 // State transitions: 166 // PreStart --> Running --> ShuttingDown --> Complete 167 // It is possible to skip intermediate states. For example, 168 // Calling shutdown() when the cloner has not started will transition from PreStart directly 169 // to Complete. 170 // This enum class is made public for testing. 171 enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; 172 173 /** 174 * Returns current database cloner state. 175 * For testing only. 176 */ 177 State getState_forTest() const; 178 179 private: 180 bool _isActive_inlock() const; 181 182 /** 183 * Returns whether the DatabaseCloner is in shutdown. 184 */ 185 bool _isShuttingDown() const; 186 187 /** 188 * Read collection names and options from listCollections result. 189 */ 190 void _listCollectionsCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, 191 Fetcher::NextAction* nextAction, 192 BSONObjBuilder* getMoreBob); 193 194 /** 195 * Forwards collection cloner result to client. 196 * Starts a new cloner on a different collection. 197 */ 198 void _collectionClonerCallback(const Status& status, const NamespaceString& nss); 199 200 /** 201 * Reports completion status. 202 * Sets cloner to inactive. 203 */ 204 void _finishCallback(const Status& status); 205 206 /** 207 * Calls the above method after unlocking. 208 */ 209 void _finishCallback_inlock(UniqueLock& lk, const Status& status); 210 211 // 212 // All member variables are labeled with one of the following codes indicating the 213 // synchronization rules for accessing them. 214 // 215 // (R) Read-only in concurrent operation; no synchronization required. 216 // (M) Reads and writes guarded by _mutex 217 // (S) Self-synchronizing; access in any way from any context. 218 // (RT) Read-only in concurrent operation; synchronized externally by tests 219 // 220 mutable stdx::mutex _mutex; 221 mutable stdx::condition_variable _condition; // (M) 222 executor::TaskExecutor* _executor; // (R) 223 OldThreadPool* _dbWorkThreadPool; // (R) 224 const HostAndPort _source; // (R) 225 const std::string _dbname; // (R) 226 const BSONObj _listCollectionsFilter; // (R) 227 const ListCollectionsPredicateFn _listCollectionsPredicate; // (R) 228 StorageInterface* _storageInterface; // (R) 229 CollectionCallbackFn 230 _collectionWork; // (R) Invoked once for every successfully started collection cloner. 231 CallbackFn _onCompletion; // (R) Invoked once when cloning completes or fails. 232 Fetcher _listCollectionsFetcher; // (R) Fetcher instance for running listCollections command. 233 // Collection info objects returned from listCollections. 234 // Format of each document: 235 // { 236 // name: <collection name>, 237 // options: <collection options> 238 // } 239 // Holds all collection infos from listCollections. 240 std::vector<BSONObj> _collectionInfos; // (M) 241 std::vector<NamespaceString> _collectionNamespaces; // (M) 242 std::list<CollectionCloner> _collectionCloners; // (M) 243 std::list<CollectionCloner>::iterator _currentCollectionClonerIter; // (M) 244 std::vector<std::pair<Status, NamespaceString>> _failedNamespaces; // (M) 245 CollectionCloner::ScheduleDbWorkFn 246 _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. 247 StartCollectionClonerFn _startCollectionCloner; // (RT) 248 Stats _stats; // (M) Stats about what this instance did. 249 250 // Current database cloner state. See comments for State enum class for details. 251 State _state = State::kPreStart; // (M) 252 }; 253 254 /** 255 * Insertion operator for DatabaseCloner::State. Formats database cloner state for output stream. 256 * For testing only. 257 */ 258 std::ostream& operator<<(std::ostream& os, const DatabaseCloner::State& state); 259 260 } // namespace repl 261 } // namespace mongo 262