1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <memory>
34 #include <string>
35 #include <vector>
36 
37 #include "mongo/base/disallow_copying.h"
38 #include "mongo/base/status.h"
39 #include "mongo/base/string_data.h"
40 #include "mongo/bson/bsonobj.h"
41 #include "mongo/client/fetcher.h"
42 #include "mongo/client/remote_command_retry_scheduler.h"
43 #include "mongo/db/catalog/collection_options.h"
44 #include "mongo/db/namespace_string.h"
45 #include "mongo/db/operation_context.h"
46 #include "mongo/db/repl/base_cloner.h"
47 #include "mongo/db/repl/callback_completion_guard.h"
48 #include "mongo/db/repl/storage_interface.h"
49 #include "mongo/db/repl/task_runner.h"
50 #include "mongo/executor/task_executor.h"
51 #include "mongo/s/query/async_results_merger.h"
52 #include "mongo/stdx/condition_variable.h"
53 #include "mongo/stdx/functional.h"
54 #include "mongo/stdx/mutex.h"
55 #include "mongo/util/concurrency/old_thread_pool.h"
56 #include "mongo/util/net/hostandport.h"
57 #include "mongo/util/progress_meter.h"
58 
59 namespace mongo {
60 
61 class OldThreadPool;
62 
63 namespace repl {
64 
65 class StorageInterface;
66 
67 class CollectionCloner : public BaseCloner {
68     MONGO_DISALLOW_COPYING(CollectionCloner);
69 
70 public:
71     /**
72      * Callback completion guard for CollectionCloner.
73      */
74     using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
75     using OnCompletionGuard = CallbackCompletionGuard<Status>;
76 
77     struct Stats {
78         static constexpr StringData kDocumentsToCopyFieldName = "documentsToCopy"_sd;
79         static constexpr StringData kDocumentsCopiedFieldName = "documentsCopied"_sd;
80 
81         std::string ns;
82         Date_t start;
83         Date_t end;
84         size_t documentToCopy{0};
85         size_t documentsCopied{0};
86         size_t indexes{0};
87         size_t fetchBatches{0};
88 
89         std::string toString() const;
90         BSONObj toBSON() const;
91         void append(BSONObjBuilder* builder) const;
92     };
93     /**
94      * Type of function to schedule storage interface tasks with the executor.
95      *
96      * Used for testing only.
97      */
98     using ScheduleDbWorkFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
99         const executor::TaskExecutor::CallbackFn&)>;
100 
101     /**
102      * Creates CollectionCloner task in inactive state. Use start() to activate cloner.
103      *
104      * The cloner calls 'onCompletion' when the collection cloning has completed or failed.
105      *
106      * 'onCompletion' will be called exactly once.
107      *
108      * Takes ownership of the passed StorageInterface object.
109      */
110     CollectionCloner(executor::TaskExecutor* executor,
111                      OldThreadPool* dbWorkThreadPool,
112                      const HostAndPort& source,
113                      const NamespaceString& sourceNss,
114                      const CollectionOptions& options,
115                      const CallbackFn& onCompletion,
116                      StorageInterface* storageInterface,
117                      const int batchSize,
118                      const int maxNumClonerCursors);
119 
120     virtual ~CollectionCloner();
121 
122     const NamespaceString& getSourceNamespace() const;
123 
124     bool isActive() const override;
125 
126     Status startup() noexcept override;
127 
128     void shutdown() override;
129 
130     void join() override;
131 
132     CollectionCloner::Stats getStats() const;
133 
134     //
135     // Testing only functions below.
136     //
137 
138     /**
139      * Waits for database worker to complete.
140      * Returns immediately if collection cloner is not active.
141      *
142      * For testing only.
143      */
144     void waitForDbWorker();
145 
146     /**
147      * Overrides how executor schedules database work.
148      *
149      * For testing only.
150      */
151     void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
152 
153     /**
154      * Returns the documents currently stored in the '_documents' buffer that is intended
155      * to be inserted through the collection loader.
156      *
157      * For testing only.
158      */
159     std::vector<BSONObj> getDocumentsToInsert_forTest();
160 
161 private:
162     bool _isActive_inlock() const;
163 
164     /**
165      * Returns whether the CollectionCloner is in shutdown.
166      */
167     bool _isShuttingDown() const;
168 
169     /**
170      * Cancels all outstanding work.
171      * Used by shutdown() and CompletionGuard::setResultAndCancelRemainingWork().
172      */
173     void _cancelRemainingWork_inlock();
174 
175     /**
176      * Read number of documents in collection from count result.
177      */
178     void _countCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& args);
179 
180     /**
181      * Read index specs from listIndexes result.
182      */
183     void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
184                               Fetcher::NextAction* nextAction,
185                               BSONObjBuilder* getMoreBob);
186 
187     /**
188      * Request storage interface to create collection.
189      *
190      * Called multiple times if there are more than one batch of responses from listIndexes
191      * cursor.
192      *
193      * 'nextAction' is an in/out arg indicating the next action planned and to be taken
194      *  by the fetcher.
195      */
196     void _beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& callbackData);
197 
198     /**
199      * The possible command types that can be used to establish the initial cursors on the
200      * remote collection.
201      */
202     enum EstablishCursorsCommand { Find, ParallelCollScan };
203 
204     /**
205      * Parses the cursor responses from the 'find' or 'parallelCollectionScan' command
206      * and passes them into the 'AsyncResultsMerger'.
207      */
208     void _establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd,
209                                              EstablishCursorsCommand cursorCommand);
210 
211     /**
212      * Parses the response from a 'parallelCollectionScan' command into a vector of cursor
213      * elements.
214      */
215     StatusWith<std::vector<BSONElement>> _parseParallelCollectionScanResponse(BSONObj resp);
216 
217     /**
218      * Takes a cursors buffer and parses the 'parallelCollectionScan' response into cursor
219      * responses that are pushed onto the buffer.
220      */
221     Status _parseCursorResponse(BSONObj response,
222                                 std::vector<CursorResponse>* cursors,
223                                 EstablishCursorsCommand cursorCommand);
224 
225     /**
226      * Calls to get the next event from the 'AsyncResultsMerger'. This schedules
227      * '_handleAsyncResultsCallback' to be run when the event is signaled successfully.
228      */
229     Status _scheduleNextARMResultsCallback(std::shared_ptr<OnCompletionGuard> onCompletionGuard);
230 
231     /**
232      * Runs for each time a new batch of documents can be retrieved from the 'AsyncResultsMerger'.
233      * Buffers the documents retrieved for insertion and schedules a '_insertDocumentsCallback'
234      * to insert the contents of the buffer.
235      */
236     void _handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs& cbd,
237                                    std::shared_ptr<OnCompletionGuard> onCompletionGuard);
238 
239     /**
240      * Pull all ready results from the ARM into a buffer to be inserted.
241      */
242     Status _bufferNextBatchFromArm(WithLock lock);
243 
244     /**
245      * Called whenever there is a new batch of documents ready from the 'AsyncResultsMerger'.
246      * On the last batch, 'lastBatch' will be true.
247      *
248      * Each document returned will be inserted via the storage interfaceRequest storage
249      * interface.
250      */
251     void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd,
252                                   bool lastBatch,
253                                   std::shared_ptr<OnCompletionGuard> onCompletionGuard);
254 
255     /**
256      * Verifies that an error from the ARM was the result of a collection drop.  If
257      * so, cloning is stopped with no error.  Otherwise it is stopped with the given error.
258      */
259     void _verifyCollectionWasDropped(const stdx::unique_lock<stdx::mutex>& lk,
260                                      Status batchStatus,
261                                      std::shared_ptr<OnCompletionGuard> onCompletionGuard,
262                                      OperationContext* opCtx);
263 
264     /**
265      * Reports completion status.
266      * Commits/aborts collection building.
267      * Sets cloner to inactive.
268      */
269     void _finishCallback(const Status& status);
270 
271     //
272     // All member variables are labeled with one of the following codes indicating the
273     // synchronization rules for accessing them.
274     //
275     // (R)  Read-only in concurrent operation; no synchronization required.
276     // (M)  Reads and writes guarded by _mutex
277     // (S)  Self-synchronizing; access in any way from any context.
278     // (RT)  Read-only in concurrent operation; synchronized externally by tests
279     //
280     mutable stdx::mutex _mutex;
281     mutable stdx::condition_variable _condition;        // (M)
282     executor::TaskExecutor* _executor;                  // (R) Not owned by us.
283     OldThreadPool* _dbWorkThreadPool;                   // (R) Not owned by us.
284     HostAndPort _source;                                // (R)
285     NamespaceString _sourceNss;                         // (R)
286     NamespaceString _destNss;                           // (R)
287     CollectionOptions _options;                         // (R)
288     std::unique_ptr<CollectionBulkLoader> _collLoader;  // (M)
289     CallbackFn _onCompletion;             // (M) Invoked once when cloning completes or fails.
290     StorageInterface* _storageInterface;  // (R) Not owned by us.
291     RemoteCommandRetryScheduler _countScheduler;  // (S)
292     Fetcher _listIndexesFetcher;                  // (S)
293     std::vector<BSONObj> _indexSpecs;             // (M)
294     BSONObj _idIndexSpec;                         // (M)
295     std::vector<BSONObj>
296         _documentsToInsert;        // (M) Documents read from 'AsyncResultsMerger' to insert.
297     TaskRunner _dbWorkTaskRunner;  // (R)
298     ScheduleDbWorkFn
299         _scheduleDbWorkFn;         // (RT) Function for scheduling database work using the executor.
300     Stats _stats;                  // (M) stats for this instance.
301     ProgressMeter _progressMeter;  // (M) progress meter for this instance.
302     const int _collectionCloningBatchSize;  // (R) The size of the batches of documents returned in
303                                             // collection cloning.
304 
305     // (R) The maximum number of cursors to use in the collection cloning process.
306     const int _maxNumClonerCursors;
307     // (M) Component responsible for fetching the documents from the collection cloner cursor(s).
308     std::unique_ptr<AsyncResultsMerger> _arm;
309     // (R) The cursor parameters used by the 'AsyncResultsMerger'.
310     std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams;
311 
312     // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'.
313     executor::TaskExecutor::EventHandle _killArmHandle;
314 
315     // (M) Scheduler used to establish the initial cursor or set of cursors.
316     std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler;
317 
318     // (M) Scheduler used to determine if a cursor was closed because the collection was dropped.
319     std::unique_ptr<RemoteCommandRetryScheduler> _verifyCollectionDroppedScheduler;
320 
321     // State transitions:
322     // PreStart --> Running --> ShuttingDown --> Complete
323     // It is possible to skip intermediate states. For example,
324     // Calling shutdown() when the cloner has not started will transition from PreStart directly
325     // to Complete.
326     enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
327     State _state = State::kPreStart;  // (M)
328 };
329 
330 }  // namespace repl
331 }  // namespace mongo
332