1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <iosfwd>
34 #include <list>
35 #include <string>
36 #include <vector>
37 
38 #include "mongo/base/disallow_copying.h"
39 #include "mongo/base/status.h"
40 #include "mongo/bson/bsonobj.h"
41 #include "mongo/client/fetcher.h"
42 #include "mongo/db/namespace_string.h"
43 #include "mongo/db/repl/base_cloner.h"
44 #include "mongo/db/repl/collection_cloner.h"
45 #include "mongo/executor/task_executor.h"
46 #include "mongo/stdx/condition_variable.h"
47 #include "mongo/stdx/mutex.h"
48 #include "mongo/util/net/hostandport.h"
49 
50 namespace mongo {
51 
52 class OldThreadPool;
53 
54 namespace repl {
55 namespace {
56 
57 using UniqueLock = stdx::unique_lock<stdx::mutex>;
58 
59 }  // namespace
60 
61 class StorageInterface;
62 
63 class DatabaseCloner : public BaseCloner {
64     MONGO_DISALLOW_COPYING(DatabaseCloner);
65 
66 public:
67     struct Stats {
68         std::string dbname;
69         Date_t start;
70         Date_t end;
71         size_t collections{0};
72         size_t clonedCollections{0};
73         std::vector<CollectionCloner::Stats> collectionStats;
74 
75         std::string toString() const;
76         BSONObj toBSON() const;
77         void append(BSONObjBuilder* builder) const;
78     };
79 
80     /**
81      * Predicate used on the collection info objects returned by listCollections.
82      * Each collection info is represented by a document in the following format:
83      * {
84      *     name: <collection name>,
85      *     options: <collection options>
86      * }
87      *
88      * Returns true if the collection described by the info object should be cloned.
89      * Returns false if the collection should be ignored.
90      */
91     using ListCollectionsPredicateFn = stdx::function<bool(const BSONObj&)>;
92 
93     /**
94      * Callback function to report progress of collection cloning. Arguments are:
95      *     - status from the collection cloner's 'onCompletion' callback.
96      *     - source namespace of the collection cloner that completed (or failed).
97      *
98      * Called exactly once for every collection cloner started by the the database cloner.
99      */
100     using CollectionCallbackFn = stdx::function<void(const Status&, const NamespaceString&)>;
101 
102     /**
103      * Type of function to start a collection cloner.
104      */
105     using StartCollectionClonerFn = stdx::function<Status(CollectionCloner&)>;
106 
107     /**
108      * Creates DatabaseCloner task in inactive state. Use start() to activate cloner.
109      *
110      * The cloner calls 'onCompletion' when the database cloning has completed or failed.
111      *
112      * 'onCompletion' will be called exactly once.
113      *
114      * Takes ownership of the passed StorageInterface object.
115      *
116      * 'listCollectionsFilter' will be extended to include collections only, filtering out views.
117      */
118     DatabaseCloner(executor::TaskExecutor* executor,
119                    OldThreadPool* dbWorkThreadPool,
120                    const HostAndPort& source,
121                    const std::string& dbname,
122                    const BSONObj& listCollectionsFilter,
123                    const ListCollectionsPredicateFn& listCollectionsPredicate,
124                    StorageInterface* storageInterface,
125                    const CollectionCallbackFn& collectionWork,
126                    const CallbackFn& onCompletion);
127 
128     virtual ~DatabaseCloner();
129 
130     /**
131      * Returns collection info objects read from listCollections result and will not include views.
132      */
133     const std::vector<BSONObj>& getCollectionInfos_forTest() const;
134 
135     bool isActive() const override;
136 
137     Status startup() noexcept override;
138 
139     void shutdown() override;
140 
141     void join() override;
142 
143     DatabaseCloner::Stats getStats() const;
144 
145     std::string getDBName() const;
146 
147     //
148     // Testing only functions below.
149     //
150 
151     /**
152      * Overrides how executor schedules database work.
153      *
154      * For testing only.
155      */
156     void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
157 
158     /**
159      * Overrides how executor starts a collection cloner.
160      *
161      * For testing only
162      */
163     void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner);
164 
165     // State transitions:
166     // PreStart --> Running --> ShuttingDown --> Complete
167     // It is possible to skip intermediate states. For example,
168     // Calling shutdown() when the cloner has not started will transition from PreStart directly
169     // to Complete.
170     // This enum class is made public for testing.
171     enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
172 
173     /**
174      * Returns current database cloner state.
175      * For testing only.
176      */
177     State getState_forTest() const;
178 
179 private:
180     bool _isActive_inlock() const;
181 
182     /**
183      * Returns whether the DatabaseCloner is in shutdown.
184      */
185     bool _isShuttingDown() const;
186 
187     /**
188      * Read collection names and options from listCollections result.
189      */
190     void _listCollectionsCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
191                                   Fetcher::NextAction* nextAction,
192                                   BSONObjBuilder* getMoreBob);
193 
194     /**
195      * Forwards collection cloner result to client.
196      * Starts a new cloner on a different collection.
197      */
198     void _collectionClonerCallback(const Status& status, const NamespaceString& nss);
199 
200     /**
201      * Reports completion status.
202      * Sets cloner to inactive.
203      */
204     void _finishCallback(const Status& status);
205 
206     /**
207      * Calls the above method after unlocking.
208      */
209     void _finishCallback_inlock(UniqueLock& lk, const Status& status);
210 
211     //
212     // All member variables are labeled with one of the following codes indicating the
213     // synchronization rules for accessing them.
214     //
215     // (R)  Read-only in concurrent operation; no synchronization required.
216     // (M)  Reads and writes guarded by _mutex
217     // (S)  Self-synchronizing; access in any way from any context.
218     // (RT)  Read-only in concurrent operation; synchronized externally by tests
219     //
220     mutable stdx::mutex _mutex;
221     mutable stdx::condition_variable _condition;                 // (M)
222     executor::TaskExecutor* _executor;                           // (R)
223     OldThreadPool* _dbWorkThreadPool;                            // (R)
224     const HostAndPort _source;                                   // (R)
225     const std::string _dbname;                                   // (R)
226     const BSONObj _listCollectionsFilter;                        // (R)
227     const ListCollectionsPredicateFn _listCollectionsPredicate;  // (R)
228     StorageInterface* _storageInterface;                         // (R)
229     CollectionCallbackFn
230         _collectionWork;       // (R) Invoked once for every successfully started collection cloner.
231     CallbackFn _onCompletion;  // (R) Invoked once when cloning completes or fails.
232     Fetcher _listCollectionsFetcher;  // (R) Fetcher instance for running listCollections command.
233     // Collection info objects returned from listCollections.
234     // Format of each document:
235     // {
236     //     name: <collection name>,
237     //     options: <collection options>
238     // }
239     // Holds all collection infos from listCollections.
240     std::vector<BSONObj> _collectionInfos;                               // (M)
241     std::vector<NamespaceString> _collectionNamespaces;                  // (M)
242     std::list<CollectionCloner> _collectionCloners;                      // (M)
243     std::list<CollectionCloner>::iterator _currentCollectionClonerIter;  // (M)
244     std::vector<std::pair<Status, NamespaceString>> _failedNamespaces;   // (M)
245     CollectionCloner::ScheduleDbWorkFn
246         _scheduleDbWorkFn;  // (RT) Function for scheduling database work using the executor.
247     StartCollectionClonerFn _startCollectionCloner;  // (RT)
248     Stats _stats;                                    // (M) Stats about what this instance did.
249 
250     // Current database cloner state. See comments for State enum class for details.
251     State _state = State::kPreStart;  // (M)
252 };
253 
254 /**
255  * Insertion operator for DatabaseCloner::State. Formats database cloner state for output stream.
256  * For testing only.
257  */
258 std::ostream& operator<<(std::ostream& os, const DatabaseCloner::State& state);
259 
260 }  // namespace repl
261 }  // namespace mongo
262