1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/repl/collection_cloner.h"
36 
37 #include <utility>
38 
39 #include "mongo/base/string_data.h"
40 #include "mongo/bson/util/bson_extract.h"
41 #include "mongo/client/remote_command_retry_scheduler.h"
42 #include "mongo/db/catalog/collection_options.h"
43 #include "mongo/db/namespace_string.h"
44 #include "mongo/db/repl/storage_interface.h"
45 #include "mongo/db/repl/storage_interface_mock.h"
46 #include "mongo/db/server_parameters.h"
47 #include "mongo/rpc/get_status_from_command_result.h"
48 #include "mongo/s/query/cluster_client_cursor_params.h"
49 #include "mongo/util/assert_util.h"
50 #include "mongo/util/destructor_guard.h"
51 #include "mongo/util/fail_point_service.h"
52 #include "mongo/util/log.h"
53 #include "mongo/util/mongoutils/str.h"
54 
55 namespace mongo {
56 namespace repl {
57 namespace {
58 
59 using LockGuard = stdx::lock_guard<stdx::mutex>;
60 using UniqueLock = stdx::unique_lock<stdx::mutex>;
61 
62 constexpr auto kCountResponseDocumentCountFieldName = "n"_sd;
63 
64 const int kProgressMeterSecondsBetween = 60;
65 const int kProgressMeterCheckInterval = 128;
66 
67 // The number of attempts for the count command, which gets the document count.
68 MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionCountAttempts, int, 3);
69 // The number of attempts for the listIndexes commands.
70 MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListIndexesAttempts, int, 3);
71 // The number of attempts for the find command, which gets the data.
72 MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionFindAttempts, int, 3);
73 }  // namespace
74 
75 // Failpoint which causes initial sync to hang before establishing its cursor to clone the
76 // 'namespace' collection.
77 MONGO_FP_DECLARE(initialSyncHangBeforeCollectionClone);
78 
79 // Failpoint which causes initial sync to hang when it has cloned 'numDocsToClone' documents to
80 // collection 'namespace'.
81 MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone);
82 
83 // Failpoint which causes initial sync to hang after handling the next batch of results from the
84 // 'AsyncResultsMerger', optionally limited to a specific collection.
85 MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse);
86 
87 // Failpoint which causes initial sync to hang before establishing the cursors (but after
88 // listIndexes), optionally limited to a specific collection.
89 MONGO_FP_DECLARE(initialSyncHangCollectionClonerBeforeEstablishingCursor);
90 
makeCommandWithUUIDorCollectionName(StringData command,OptionalCollectionUUID uuid,const NamespaceString & nss)91 BSONObj makeCommandWithUUIDorCollectionName(StringData command,
92                                             OptionalCollectionUUID uuid,
93                                             const NamespaceString& nss) {
94     BSONObjBuilder builder;
95     if (uuid)
96         uuid->appendToBuilder(&builder, command);
97     else
98         builder.append(command, nss.coll());
99     return builder.obj();
100 }
101 
CollectionCloner(executor::TaskExecutor * executor,OldThreadPool * dbWorkThreadPool,const HostAndPort & source,const NamespaceString & sourceNss,const CollectionOptions & options,const CallbackFn & onCompletion,StorageInterface * storageInterface,const int batchSize,const int maxNumClonerCursors)102 CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
103                                    OldThreadPool* dbWorkThreadPool,
104                                    const HostAndPort& source,
105                                    const NamespaceString& sourceNss,
106                                    const CollectionOptions& options,
107                                    const CallbackFn& onCompletion,
108                                    StorageInterface* storageInterface,
109                                    const int batchSize,
110                                    const int maxNumClonerCursors)
111     : _executor(executor),
112       _dbWorkThreadPool(dbWorkThreadPool),
113       _source(source),
114       _sourceNss(sourceNss),
115       _destNss(_sourceNss),
116       _options(options),
117       _onCompletion(onCompletion),
118       _storageInterface(storageInterface),
119       _countScheduler(_executor,
120                       RemoteCommandRequest(
121                           _source,
122                           _sourceNss.db().toString(),
123                           makeCommandWithUUIDorCollectionName("count", _options.uuid, sourceNss),
124                           ReadPreferenceSetting::secondaryPreferredMetadata(),
125                           nullptr,
126                           RemoteCommandRequest::kNoTimeout),
127                       stdx::bind(&CollectionCloner::_countCallback, this, stdx::placeholders::_1),
128                       RemoteCommandRetryScheduler::makeRetryPolicy(
129                           numInitialSyncCollectionCountAttempts.load(),
130                           executor::RemoteCommandRequest::kNoTimeout,
131                           RemoteCommandRetryScheduler::kAllRetriableErrors)),
132       _listIndexesFetcher(
133           _executor,
134           _source,
135           _sourceNss.db().toString(),
136           makeCommandWithUUIDorCollectionName("listIndexes", _options.uuid, sourceNss),
137           stdx::bind(&CollectionCloner::_listIndexesCallback,
138                      this,
139                      stdx::placeholders::_1,
140                      stdx::placeholders::_2,
141                      stdx::placeholders::_3),
142           ReadPreferenceSetting::secondaryPreferredMetadata(),
143           RemoteCommandRequest::kNoTimeout /* find network timeout */,
144           RemoteCommandRequest::kNoTimeout /* getMore network timeout */,
145           RemoteCommandRetryScheduler::makeRetryPolicy(
146               numInitialSyncListIndexesAttempts.load(),
147               executor::RemoteCommandRequest::kNoTimeout,
148               RemoteCommandRetryScheduler::kAllRetriableErrors)),
149       _indexSpecs(),
150       _documentsToInsert(),
151       _dbWorkTaskRunner(_dbWorkThreadPool),
152       _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) {
153           auto task = [ this, work ](OperationContext * opCtx,
154                                      const Status& status) noexcept->TaskRunner::NextAction {
155               try {
156                   work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx));
157               } catch (...) {
158                   _finishCallback(exceptionToStatus());
159               }
160               return TaskRunner::NextAction::kDisposeOperationContext;
161           };
162           _dbWorkTaskRunner.schedule(task);
163           return executor::TaskExecutor::CallbackHandle();
164       }),
165       _progressMeter(1U,  // total will be replaced with count command result.
166                      kProgressMeterSecondsBetween,
167                      kProgressMeterCheckInterval,
168                      "documents copied",
169                      str::stream() << _sourceNss.toString() << " collection clone progress"),
170       _collectionCloningBatchSize(batchSize),
171       _maxNumClonerCursors(maxNumClonerCursors) {
172     // Fetcher throws an exception on null executor.
173     invariant(executor);
174     uassert(ErrorCodes::BadValue,
175             "invalid collection namespace: " + sourceNss.ns(),
176             sourceNss.isValid());
177     uassertStatusOK(options.validateForStorage());
178     uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
179     uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface);
180     _stats.ns = _sourceNss.ns();
181 }
182 
~CollectionCloner()183 CollectionCloner::~CollectionCloner() {
184     DESTRUCTOR_GUARD(shutdown(); join(););
185 }
186 
getSourceNamespace() const187 const NamespaceString& CollectionCloner::getSourceNamespace() const {
188     return _sourceNss;
189 }
190 
isActive() const191 bool CollectionCloner::isActive() const {
192     LockGuard lk(_mutex);
193     return _isActive_inlock();
194 }
195 
_isActive_inlock() const196 bool CollectionCloner::_isActive_inlock() const {
197     return State::kRunning == _state || State::kShuttingDown == _state;
198 }
199 
_isShuttingDown() const200 bool CollectionCloner::_isShuttingDown() const {
201     stdx::lock_guard<stdx::mutex> lock(_mutex);
202     return State::kShuttingDown == _state;
203 }
204 
startup()205 Status CollectionCloner::startup() noexcept {
206     LockGuard lk(_mutex);
207     LOG(0) << "CollectionCloner::start called, on ns:" << _destNss;
208 
209     switch (_state) {
210         case State::kPreStart:
211             _state = State::kRunning;
212             break;
213         case State::kRunning:
214             return Status(ErrorCodes::InternalError, "collection cloner already started");
215         case State::kShuttingDown:
216             return Status(ErrorCodes::ShutdownInProgress, "collection cloner shutting down");
217         case State::kComplete:
218             return Status(ErrorCodes::ShutdownInProgress, "collection cloner completed");
219     }
220 
221     _stats.start = _executor->now();
222     Status scheduleResult = _countScheduler.startup();
223     if (!scheduleResult.isOK()) {
224         _state = State::kComplete;
225         return scheduleResult;
226     }
227 
228     return Status::OK();
229 }
230 
shutdown()231 void CollectionCloner::shutdown() {
232     stdx::lock_guard<stdx::mutex> lock(_mutex);
233     switch (_state) {
234         case State::kPreStart:
235             // Transition directly from PreStart to Complete if not started yet.
236             _state = State::kComplete;
237             return;
238         case State::kRunning:
239             _state = State::kShuttingDown;
240             break;
241         case State::kShuttingDown:
242         case State::kComplete:
243             // Nothing to do if we are already in ShuttingDown or Complete state.
244             return;
245     }
246     _cancelRemainingWork_inlock();
247 }
248 
_cancelRemainingWork_inlock()249 void CollectionCloner::_cancelRemainingWork_inlock() {
250     if (_arm) {
251         // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The
252         // TaskExecutor should never have an OperationContext attached to the Client, and the
253         // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know
254         // which situation we're in, so have to handle both.
255         auto& client = cc();
256         if (auto opCtx = client.getOperationContext()) {
257             _killArmHandle = _arm->kill(opCtx);
258         } else {
259             auto newOpCtx = client.makeOperationContext();
260             _killArmHandle = _arm->kill(newOpCtx.get());
261         }
262     }
263     _countScheduler.shutdown();
264     _listIndexesFetcher.shutdown();
265     if (_establishCollectionCursorsScheduler) {
266         _establishCollectionCursorsScheduler->shutdown();
267     }
268     if (_verifyCollectionDroppedScheduler) {
269         _verifyCollectionDroppedScheduler->shutdown();
270     }
271     _dbWorkTaskRunner.cancel();
272 }
273 
getStats() const274 CollectionCloner::Stats CollectionCloner::getStats() const {
275     stdx::unique_lock<stdx::mutex> lk(_mutex);
276     return _stats;
277 }
278 
join()279 void CollectionCloner::join() {
280     stdx::unique_lock<stdx::mutex> lk(_mutex);
281     if (_killArmHandle) {
282         _executor->waitForEvent(_killArmHandle);
283     }
284     _condition.wait(lk, [this]() { return !_isActive_inlock(); });
285 }
286 
waitForDbWorker()287 void CollectionCloner::waitForDbWorker() {
288     if (!isActive()) {
289         return;
290     }
291     _dbWorkTaskRunner.join();
292 }
293 
setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn & scheduleDbWorkFn)294 void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn) {
295     LockGuard lk(_mutex);
296     _scheduleDbWorkFn = scheduleDbWorkFn;
297 }
298 
getDocumentsToInsert_forTest()299 std::vector<BSONObj> CollectionCloner::getDocumentsToInsert_forTest() {
300     LockGuard lk(_mutex);
301     return _documentsToInsert;
302 }
303 
_countCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs & args)304 void CollectionCloner::_countCallback(
305     const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
306 
307     // No need to reword status reason in the case of cancellation.
308     if (ErrorCodes::CallbackCanceled == args.response.status) {
309         _finishCallback(args.response.status);
310         return;
311     }
312 
313     if (!args.response.status.isOK()) {
314         _finishCallback({args.response.status.code(),
315                          str::stream() << "During count call on collection '" << _sourceNss.ns()
316                                        << "' from "
317                                        << _source.toString()
318                                        << ", there was an error '"
319                                        << args.response.status.reason()
320                                        << "'"});
321         return;
322     }
323 
324     long long count = 0;
325     Status commandStatus = getStatusFromCommandResult(args.response.data);
326     if (commandStatus == ErrorCodes::NamespaceNotFound && _options.uuid) {
327         // Querying by a non-existing collection by UUID returns an error. Treat same as
328         // behavior of find by namespace and use count == 0.
329     } else if (!commandStatus.isOK()) {
330         _finishCallback({commandStatus.code(),
331                          str::stream() << "During count call on collection '" << _sourceNss.ns()
332                                        << "' from "
333                                        << _source.toString()
334                                        << ", there was a command error '"
335                                        << commandStatus.reason()
336                                        << "'"});
337         return;
338     } else {
339         auto countStatus = bsonExtractIntegerField(
340             args.response.data, kCountResponseDocumentCountFieldName, &count);
341         if (!countStatus.isOK()) {
342             _finishCallback({countStatus.code(),
343                              str::stream()
344                                  << "There was an error parsing document count from count "
345                                     "command result on collection "
346                                  << _sourceNss.ns()
347                                  << " from "
348                                  << _source.toString()
349                                  << ": "
350                                  << countStatus.reason()});
351             return;
352         }
353     }
354 
355     // The count command may return a negative value after an unclean shutdown,
356     // so we set it to zero here to avoid aborting the collection clone.
357     // Note that this count value is only used for reporting purposes.
358     if (count < 0) {
359         warning() << "Count command returned negative value. Updating to 0 to allow progress "
360                      "meter to function properly. ";
361         count = 0;
362     }
363 
364     {
365         LockGuard lk(_mutex);
366         _stats.documentToCopy = count;
367         _progressMeter.setTotalWhileRunning(static_cast<unsigned long long>(count));
368     }
369 
370     auto scheduleStatus = _listIndexesFetcher.schedule();
371     if (!scheduleStatus.isOK()) {
372         _finishCallback(scheduleStatus);
373         return;
374     }
375 }
376 
_listIndexesCallback(const Fetcher::QueryResponseStatus & fetchResult,Fetcher::NextAction * nextAction,BSONObjBuilder * getMoreBob)377 void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult,
378                                             Fetcher::NextAction* nextAction,
379                                             BSONObjBuilder* getMoreBob) {
380     const bool collectionIsEmpty = fetchResult == ErrorCodes::NamespaceNotFound;
381     if (collectionIsEmpty) {
382         // Schedule collection creation and finish callback.
383         auto&& scheduleResult =
384             _scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackArgs& cbd) {
385                 if (!cbd.status.isOK()) {
386                     _finishCallback(cbd.status);
387                     return;
388                 }
389                 auto opCtx = cbd.opCtx;
390                 UnreplicatedWritesBlock uwb(opCtx);
391                 auto&& createStatus =
392                     _storageInterface->createCollection(opCtx, _destNss, _options);
393                 _finishCallback(createStatus);
394             });
395         if (!scheduleResult.isOK()) {
396             _finishCallback(scheduleResult.getStatus());
397         }
398         return;
399     };
400     if (!fetchResult.isOK()) {
401         Status newStatus{fetchResult.getStatus().code(),
402                          str::stream() << "During listIndexes call on collection '"
403                                        << _sourceNss.ns()
404                                        << "' there was an error '"
405                                        << fetchResult.getStatus().reason()
406                                        << "'"};
407 
408         _finishCallback(newStatus);
409         return;
410     }
411 
412     auto batchData(fetchResult.getValue());
413     auto&& documents = batchData.documents;
414 
415     if (documents.empty()) {
416         warning() << "No indexes found for collection " << _sourceNss.ns() << " while cloning from "
417                   << _source;
418     }
419 
420     UniqueLock lk(_mutex);
421     // When listing indexes by UUID, the sync source may use a different name for the collection
422     // as result of renaming or two-phase drop. As the index spec also includes a 'ns' field, this
423     // must be rewritten.
424     BSONObjBuilder nsFieldReplacementBuilder;
425     nsFieldReplacementBuilder.append("ns", _sourceNss.ns());
426     BSONElement nsFieldReplacementElem = nsFieldReplacementBuilder.done().firstElement();
427 
428     // We may be called with multiple batches leading to a need to grow _indexSpecs.
429     _indexSpecs.reserve(_indexSpecs.size() + documents.size());
430     for (auto&& doc : documents) {
431         // The addField replaces the 'ns' field with the correct name, see above.
432         if (StringData("_id_") == doc["name"].str()) {
433             _idIndexSpec = doc.addField(nsFieldReplacementElem);
434             continue;
435         }
436         _indexSpecs.push_back(doc.addField(nsFieldReplacementElem));
437     }
438     lk.unlock();
439 
440     // The fetcher will continue to call with kGetMore until an error or the last batch.
441     if (*nextAction == Fetcher::NextAction::kGetMore) {
442         invariant(getMoreBob);
443         getMoreBob->append("getMore", batchData.cursorId);
444         getMoreBob->append("collection", batchData.nss.coll());
445         return;
446     }
447 
448     // We have all of the indexes now, so we can start cloning the collection data.
449     auto&& scheduleResult = _scheduleDbWorkFn(
450         stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1));
451     if (!scheduleResult.isOK()) {
452         _finishCallback(scheduleResult.getStatus());
453         return;
454     }
455 }
456 
_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs & cbd)457 void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) {
458     if (!cbd.status.isOK()) {
459         _finishCallback(cbd.status);
460         return;
461     }
462     MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerBeforeEstablishingCursor, nssData) {
463         const BSONObj& data = nssData.getData();
464         auto nss = data["nss"].str();
465         // Only hang when cloning the specified collection, or if no collection was specified.
466         if (nss.empty() || _destNss.toString() == nss) {
467             while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerBeforeEstablishingCursor) &&
468                    !_isShuttingDown()) {
469                 log() << "initialSyncHangCollectionClonerBeforeEstablishingCursor fail point "
470                          "enabled for "
471                       << _destNss.toString() << ". Blocking until fail point is disabled.";
472                 mongo::sleepsecs(1);
473             }
474         }
475     }
476     if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) {
477         warning()
478             << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:"
479             << this->_sourceNss;
480     }
481 
482     auto collectionBulkLoader = _storageInterface->createCollectionForBulkLoading(
483         _destNss, _options, _idIndexSpec, _indexSpecs);
484 
485     if (!collectionBulkLoader.isOK()) {
486         _finishCallback(collectionBulkLoader.getStatus());
487         return;
488     }
489 
490     _stats.indexes = _indexSpecs.size();
491     if (!_idIndexSpec.isEmpty()) {
492         ++_stats.indexes;
493     }
494 
495     _collLoader = std::move(collectionBulkLoader.getValue());
496 
497     BSONObjBuilder cmdObj;
498     EstablishCursorsCommand cursorCommand;
499     // The 'find' command is used when the number of cloning cursors is 1 to ensure
500     // the correctness of the collection cloning process until 'parallelCollectionScan'
501     // can be tested more extensively in context of initial sync.
502     if (_maxNumClonerCursors == 1) {
503         cmdObj.appendElements(
504             makeCommandWithUUIDorCollectionName("find", _options.uuid, _sourceNss));
505         cmdObj.append("noCursorTimeout", true);
506         // Set batchSize to be 0 to establish the cursor without fetching any documents,
507         // similar to the response format of 'parallelCollectionScan'.
508         cmdObj.append("batchSize", 0);
509         cursorCommand = Find;
510     } else {
511         cmdObj.appendElements(makeCommandWithUUIDorCollectionName(
512             "parallelCollectionScan", _options.uuid, _sourceNss));
513         cmdObj.append("numCursors", _maxNumClonerCursors);
514         cursorCommand = ParallelCollScan;
515     }
516 
517     Client::initThreadIfNotAlready();
518     auto opCtx = cc().getOperationContext();
519 
520     MONGO_FAIL_POINT_BLOCK(initialSyncHangBeforeCollectionClone, options) {
521         const BSONObj& data = options.getData();
522         if (data["namespace"].String() == _destNss.ns()) {
523             log() << "initial sync - initialSyncHangBeforeCollectionClone fail point "
524                      "enabled. Blocking until fail point is disabled.";
525             while (MONGO_FAIL_POINT(initialSyncHangBeforeCollectionClone) && !_isShuttingDown()) {
526                 mongo::sleepsecs(1);
527             }
528         }
529     }
530 
531     _establishCollectionCursorsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>(
532         _executor,
533         RemoteCommandRequest(_source,
534                              _sourceNss.db().toString(),
535                              cmdObj.obj(),
536                              ReadPreferenceSetting::secondaryPreferredMetadata(),
537                              opCtx,
538                              RemoteCommandRequest::kNoTimeout),
539         stdx::bind(&CollectionCloner::_establishCollectionCursorsCallback,
540                    this,
541                    stdx::placeholders::_1,
542                    cursorCommand),
543         RemoteCommandRetryScheduler::makeRetryPolicy(
544             numInitialSyncCollectionFindAttempts.load(),
545             executor::RemoteCommandRequest::kNoTimeout,
546             RemoteCommandRetryScheduler::kAllRetriableErrors));
547     auto scheduleStatus = _establishCollectionCursorsScheduler->startup();
548     LOG(1) << "Attempting to establish cursors with maxNumClonerCursors: " << _maxNumClonerCursors;
549 
550     if (!scheduleStatus.isOK()) {
551         _establishCollectionCursorsScheduler.reset();
552         _finishCallback(scheduleStatus);
553         return;
554     }
555 }
556 
_parseCursorResponse(BSONObj response,std::vector<CursorResponse> * cursors,EstablishCursorsCommand cursorCommand)557 Status CollectionCloner::_parseCursorResponse(BSONObj response,
558                                               std::vector<CursorResponse>* cursors,
559                                               EstablishCursorsCommand cursorCommand) {
560     switch (cursorCommand) {
561         case Find: {
562             StatusWith<CursorResponse> findResponse = CursorResponse::parseFromBSON(response);
563             if (!findResponse.isOK()) {
564                 Status errorStatus{findResponse.getStatus().code(),
565                                    str::stream()
566                                        << "While parsing the 'find' query against collection '"
567                                        << _sourceNss.ns()
568                                        << "' there was an error '"
569                                        << findResponse.getStatus().reason()
570                                        << "'"};
571                 return errorStatus;
572             }
573             cursors->push_back(std::move(findResponse.getValue()));
574             break;
575         }
576         case ParallelCollScan: {
577             auto cursorElements = _parseParallelCollectionScanResponse(response);
578             if (!cursorElements.isOK()) {
579                 return cursorElements.getStatus();
580             }
581             std::vector<BSONElement> cursorsArray;
582             cursorsArray = cursorElements.getValue();
583             // Parse each BSONElement into a 'CursorResponse' object.
584             for (BSONElement cursor : cursorsArray) {
585                 if (!cursor.isABSONObj()) {
586                     Status errorStatus(
587                         ErrorCodes::FailedToParse,
588                         "The 'cursor' field in the list of cursor responses is not a "
589                         "valid BSON Object");
590                     return errorStatus;
591                 }
592                 const BSONObj cursorObj = cursor.Obj().getOwned();
593                 StatusWith<CursorResponse> parallelCollScanResponse =
594                     CursorResponse::parseFromBSON(cursorObj);
595                 if (!parallelCollScanResponse.isOK()) {
596                     return parallelCollScanResponse.getStatus();
597                 }
598                 cursors->push_back(std::move(parallelCollScanResponse.getValue()));
599             }
600             break;
601         }
602         default: {
603             Status errorStatus(
604                 ErrorCodes::FailedToParse,
605                 "The command used to establish the collection cloner cursors is not valid.");
606             return errorStatus;
607         }
608     }
609     return Status::OK();
610 }
611 
_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs & rcbd,EstablishCursorsCommand cursorCommand)612 void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd,
613                                                            EstablishCursorsCommand cursorCommand) {
614     if (_state == State::kShuttingDown) {
615         Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."};
616         _finishCallback(shuttingDownStatus);
617         return;
618     }
619     auto response = rcbd.response;
620     if (!response.isOK()) {
621         _finishCallback(response.status);
622         return;
623     }
624     Status commandStatus = getStatusFromCommandResult(response.data);
625     if (commandStatus == ErrorCodes::NamespaceNotFound) {
626         _finishCallback(Status::OK());
627         return;
628     }
629     if (!commandStatus.isOK()) {
630         Status newStatus{commandStatus.code(),
631                          str::stream() << "While querying collection '" << _sourceNss.ns()
632                                        << "' there was an error '"
633                                        << commandStatus.reason()
634                                        << "'"};
635         _finishCallback(commandStatus);
636         return;
637     }
638 
639     std::vector<CursorResponse> cursorResponses;
640     Status parseResponseStatus =
641         _parseCursorResponse(response.data, &cursorResponses, cursorCommand);
642     if (!parseResponseStatus.isOK()) {
643         _finishCallback(parseResponseStatus);
644         return;
645     }
646     LOG(1) << "Collection cloner running with " << cursorResponses.size()
647            << " cursors established.";
648 
649     // Initialize the 'AsyncResultsMerger'(ARM).
650     std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
651     for (auto&& cursorResponse : cursorResponses) {
652         // A placeholder 'ShardId' is used until the ARM is made less sharding specific.
653         remoteCursors.emplace_back(
654             ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse));
655     }
656 
657     // An empty list of authenticated users is passed into the cluster parameters
658     // as user information is not used in the ARM in context of collection cloning.
659     _clusterClientCursorParams =
660         stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator());
661     _clusterClientCursorParams->remotes = std::move(remoteCursors);
662     if (_collectionCloningBatchSize > 0)
663         _clusterClientCursorParams->batchSize = _collectionCloningBatchSize;
664     // Client::initThreadIfNotAlready();
665     auto opCtx = cc().makeOperationContext();
666     _arm = stdx::make_unique<AsyncResultsMerger>(
667         opCtx.get(), _executor, _clusterClientCursorParams.get());
668     _arm->detachFromOperationContext();
669     opCtx.reset();
670 
671     // This completion guard invokes _finishCallback on destruction.
672     auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
673     auto finishCallbackFn = [this](const Status& status) { _finishCallback(status); };
674     auto onCompletionGuard =
675         std::make_shared<OnCompletionGuard>(cancelRemainingWorkInLock, finishCallbackFn);
676 
677     // Lock guard must be declared after completion guard. If there is an error in this function
678     // that will cause the destructor of the completion guard to run, the destructor must be run
679     // outside the mutex. This is a necessary condition to invoke _finishCallback.
680     stdx::lock_guard<stdx::mutex> lock(_mutex);
681     Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
682     if (!scheduleStatus.isOK()) {
683         onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
684         return;
685     }
686 }
687 
_parseParallelCollectionScanResponse(BSONObj resp)688 StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionScanResponse(
689     BSONObj resp) {
690     if (!resp.hasField("cursors")) {
691         return Status(ErrorCodes::CursorNotFound,
692                       "The 'parallelCollectionScan' response does not contain a 'cursors' field.");
693     }
694     BSONElement response = resp["cursors"];
695     if (response.type() == BSONType::Array) {
696         return response.Array();
697     } else {
698         return Status(
699             ErrorCodes::FailedToParse,
700             "The 'parallelCollectionScan' response is unable to be transformed into an array.");
701     }
702 }
703 
_bufferNextBatchFromArm(WithLock lock)704 Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
705     // We expect this callback to execute in a thread from a TaskExecutor which will not have an
706     // OperationContext populated. We must make one ourselves.
707     auto opCtx = cc().makeOperationContext();
708     _arm->reattachToOperationContext(opCtx.get());
709     while (_arm->ready()) {
710         auto armResultStatus = _arm->nextReady();
711         if (!armResultStatus.getStatus().isOK()) {
712             return armResultStatus.getStatus();
713         }
714         if (armResultStatus.getValue().isEOF()) {
715             // We have reached the end of the batch.
716             break;
717         } else {
718             auto queryResult = armResultStatus.getValue().getResult();
719             _documentsToInsert.push_back(std::move(*queryResult));
720         }
721     }
722     _arm->detachFromOperationContext();
723 
724     return Status::OK();
725 }
726 
_scheduleNextARMResultsCallback(std::shared_ptr<OnCompletionGuard> onCompletionGuard)727 Status CollectionCloner::_scheduleNextARMResultsCallback(
728     std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
729     // We expect this callback to execute in a thread from a TaskExecutor which will not have an
730     // OperationContext populated. We must make one ourselves.
731     auto opCtx = cc().makeOperationContext();
732     _arm->reattachToOperationContext(opCtx.get());
733     auto nextEvent = _arm->nextEvent();
734     _arm->detachFromOperationContext();
735     if (!nextEvent.isOK()) {
736         return nextEvent.getStatus();
737     }
738     auto event = nextEvent.getValue();
739     auto handleARMResultsOnNextEvent =
740         _executor->onEvent(event,
741                            stdx::bind(&CollectionCloner::_handleARMResultsCallback,
742                                       this,
743                                       stdx::placeholders::_1,
744                                       onCompletionGuard));
745     return handleARMResultsOnNextEvent.getStatus();
746 }
747 
_handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs & cbd,std::shared_ptr<OnCompletionGuard> onCompletionGuard)748 void CollectionCloner::_handleARMResultsCallback(
749     const executor::TaskExecutor::CallbackArgs& cbd,
750     std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
751     auto setResultAndCancelRemainingWork = [this](std::shared_ptr<OnCompletionGuard> guard,
752                                                   Status status) {
753         stdx::lock_guard<stdx::mutex> lock(_mutex);
754         guard->setResultAndCancelRemainingWork_inlock(lock, status);
755         return;
756     };
757 
758     if (!cbd.status.isOK()) {
759         // Wait for active inserts to complete.
760         waitForDbWorker();
761         Status newStatus{cbd.status.code(),
762                          str::stream() << "While querying collection '" << _sourceNss.ns()
763                                        << "' there was an error '"
764                                        << cbd.status.reason()
765                                        << "'"};
766         setResultAndCancelRemainingWork(onCompletionGuard, cbd.status);
767         return;
768     }
769 
770     // Pull the documents from the ARM into a buffer until the entire batch has been processed.
771     bool lastBatch;
772     {
773         UniqueLock lk(_mutex);
774         auto nextBatchStatus = _bufferNextBatchFromArm(lk);
775         if (!nextBatchStatus.isOK()) {
776             if (_options.uuid && (nextBatchStatus.code() == ErrorCodes::OperationFailed ||
777                                   nextBatchStatus.code() == ErrorCodes::CursorNotFound ||
778                                   nextBatchStatus.code() == ErrorCodes::QueryPlanKilled)) {
779                 // With these errors, it's possible the collection was dropped while we were
780                 // cloning.  If so, we'll execute the drop during oplog application, so it's OK to
781                 // just stop cloning.  This is only safe if cloning by UUID; if we are cloning by
782                 // name, we have no way to detect if the collection was dropped and another
783                 // collection with the same name created in the interim.
784                 _verifyCollectionWasDropped(lk, nextBatchStatus, onCompletionGuard, cbd.opCtx);
785             } else {
786                 onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, nextBatchStatus);
787             }
788             return;
789         }
790 
791         // Check if this is the last batch of documents to clone.
792         lastBatch = _arm->remotesExhausted();
793     }
794 
795     // Schedule the next document batch insertion.
796     auto&& scheduleResult =
797         _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback,
798                                      this,
799                                      stdx::placeholders::_1,
800                                      lastBatch,
801                                      onCompletionGuard));
802     if (!scheduleResult.isOK()) {
803         Status newStatus{scheduleResult.getStatus().code(),
804                          str::stream() << "While cloning collection '" << _sourceNss.ns()
805                                        << "' there was an error '"
806                                        << scheduleResult.getStatus().reason()
807                                        << "'"};
808         setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus());
809         return;
810     }
811 
812     MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) {
813         const BSONObj& data = nssData.getData();
814         auto nss = data["nss"].str();
815         // Only hang when cloning the specified collection, or if no collection was specified.
816         if (nss.empty() || _destNss.toString() == nss) {
817             while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) &&
818                    !_isShuttingDown()) {
819                 log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point "
820                          "enabled for "
821                       << _destNss.toString() << ". Blocking until fail point is disabled.";
822                 mongo::sleepsecs(1);
823             }
824         }
825     }
826 
827     // If the remote cursors are not exhausted, schedule this callback again to handle
828     // the impending cursor response.
829     if (!lastBatch) {
830         Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
831         if (!scheduleStatus.isOK()) {
832             setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus);
833             return;
834         }
835     }
836 }
837 
_verifyCollectionWasDropped(const stdx::unique_lock<stdx::mutex> & lk,Status batchStatus,std::shared_ptr<OnCompletionGuard> onCompletionGuard,OperationContext * opCtx)838 void CollectionCloner::_verifyCollectionWasDropped(
839     const stdx::unique_lock<stdx::mutex>& lk,
840     Status batchStatus,
841     std::shared_ptr<OnCompletionGuard> onCompletionGuard,
842     OperationContext* opCtx) {
843     // If we already have a _verifyCollectionDroppedScheduler, just return; the existing
844     // scheduler will take care of cleaning up.
845     if (_verifyCollectionDroppedScheduler) {
846         return;
847     }
848     BSONObjBuilder cmdObj;
849     _options.uuid->appendToBuilder(&cmdObj, "find");
850     cmdObj.append("batchSize", 0);
851     _verifyCollectionDroppedScheduler = stdx::make_unique<RemoteCommandRetryScheduler>(
852         _executor,
853         RemoteCommandRequest(_source,
854                              _sourceNss.db().toString(),
855                              cmdObj.obj(),
856                              ReadPreferenceSetting::secondaryPreferredMetadata(),
857                              opCtx,
858                              RemoteCommandRequest::kNoTimeout),
859         [this, opCtx, batchStatus, onCompletionGuard](const RemoteCommandCallbackArgs& args) {
860             // If the attempt to determine if the collection was dropped fails for any reason other
861             // than NamespaceNotFound, return the original error code.
862             //
863             // Otherwise, if the collection was dropped, either the error will be NamespaceNotFound,
864             // or it will be a drop-pending collection and the find will succeed and give us a
865             // collection with a drop-pending name.
866             UniqueLock lk(_mutex);
867             Status finalStatus(batchStatus);
868             if (args.response.isOK()) {
869                 auto response = CursorResponse::parseFromBSON(args.response.data);
870                 if (response.getStatus().code() == ErrorCodes::NamespaceNotFound ||
871                     (response.isOK() && response.getValue().getNSS().isDropPendingNamespace())) {
872                     log() << "CollectionCloner ns: '" << _sourceNss.ns() << "' uuid: UUID(\""
873                           << *_options.uuid << "\") stopped because collection was dropped.";
874                     finalStatus = Status::OK();
875                 } else if (!response.isOK()) {
876                     log() << "CollectionCloner received an unexpected error when verifying drop of "
877                              "ns: '"
878                           << _sourceNss.ns() << "' uuid: UUID(\"" << *_options.uuid
879                           << "\"), status " << response.getStatus();
880                 }
881             } else {
882                 log() << "CollectionCloner is unable to verify drop of ns: '" << _sourceNss.ns()
883                       << "' uuid: UUID(\"" << *_options.uuid << "\"), status "
884                       << args.response.status;
885             }
886             onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, finalStatus);
887         },
888         RemoteCommandRetryScheduler::makeNoRetryPolicy());
889 
890     auto status = _verifyCollectionDroppedScheduler->startup();
891     if (!status.isOK()) {
892         log() << "CollectionCloner is unable to start verification of ns: '" << _sourceNss.ns()
893               << "' uuid: UUID(\"" << *_options.uuid << "\"), status " << status;
894         // If we can't run the command, assume this wasn't a drop and just use the original error.
895         onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, batchStatus);
896     }
897 }
898 
_insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs & cbd,bool lastBatch,std::shared_ptr<OnCompletionGuard> onCompletionGuard)899 void CollectionCloner::_insertDocumentsCallback(
900     const executor::TaskExecutor::CallbackArgs& cbd,
901     bool lastBatch,
902     std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
903     if (!cbd.status.isOK()) {
904         stdx::lock_guard<stdx::mutex> lock(_mutex);
905         onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, cbd.status);
906         return;
907     }
908 
909     UniqueLock lk(_mutex);
910     std::vector<BSONObj> docs;
911     if (_documentsToInsert.size() == 0) {
912         warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss;
913         if (lastBatch) {
914             onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
915         }
916         return;
917     }
918     _documentsToInsert.swap(docs);
919     _stats.documentsCopied += docs.size();
920     ++_stats.fetchBatches;
921     _progressMeter.hit(int(docs.size()));
922     invariant(_collLoader);
923     const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend());
924     if (!status.isOK()) {
925         onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, status);
926         return;
927     }
928 
929     MONGO_FAIL_POINT_BLOCK(initialSyncHangDuringCollectionClone, options) {
930         const BSONObj& data = options.getData();
931         if (data["namespace"].String() == _destNss.ns() &&
932             static_cast<int>(_stats.documentsCopied) >= data["numDocsToClone"].numberInt()) {
933             lk.unlock();
934             log() << "initial sync - initialSyncHangDuringCollectionClone fail point "
935                      "enabled. Blocking until fail point is disabled.";
936             while (MONGO_FAIL_POINT(initialSyncHangDuringCollectionClone) && !_isShuttingDown()) {
937                 mongo::sleepsecs(1);
938             }
939             lk.lock();
940         }
941     }
942 
943     if (lastBatch) {
944         // Clean up resources once the last batch has been copied over and set the status to OK.
945         onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
946     }
947 }
948 
_finishCallback(const Status & status)949 void CollectionCloner::_finishCallback(const Status& status) {
950     log() << "CollectionCloner ns:" << _destNss
951           << " finished cloning with status: " << redact(status);
952     // Copy the status so we can change it below if needed.
953     auto finalStatus = status;
954     bool callCollectionLoader = false;
955     decltype(_onCompletion) onCompletion;
956     {
957         LockGuard lk(_mutex);
958         invariant(_state != State::kComplete);
959 
960         callCollectionLoader = _collLoader.operator bool();
961 
962         invariant(_onCompletion);
963         std::swap(_onCompletion, onCompletion);
964     }
965     if (callCollectionLoader) {
966         if (finalStatus.isOK()) {
967             const auto loaderStatus = _collLoader->commit();
968             if (!loaderStatus.isOK()) {
969                 warning() << "Failed to commit collection indexes " << _destNss.ns() << ": "
970                           << redact(loaderStatus);
971                 finalStatus = loaderStatus;
972             }
973         }
974 
975         // This will release the resources held by the loader.
976         _collLoader.reset();
977     }
978     onCompletion(finalStatus);
979 
980     // This will release the resources held by the callback function object. '_onCompletion' is
981     // already cleared at this point and 'onCompletion' is the remaining reference to the callback
982     // function (with any implicitly held resources). To avoid any issues with destruction logic
983     // in the function object's resources accessing this CollectionCloner, we release this function
984     // object outside the lock.
985     onCompletion = {};
986 
987     LockGuard lk(_mutex);
988     _stats.end = _executor->now();
989     _progressMeter.finished();
990     _state = State::kComplete;
991     _condition.notify_all();
992     LOG(1) << "    collection: " << _destNss << ", stats: " << _stats.toString();
993 }
994 
995 constexpr StringData CollectionCloner::Stats::kDocumentsToCopyFieldName;
996 constexpr StringData CollectionCloner::Stats::kDocumentsCopiedFieldName;
997 
toString() const998 std::string CollectionCloner::Stats::toString() const {
999     return toBSON().toString();
1000 }
1001 
toBSON() const1002 BSONObj CollectionCloner::Stats::toBSON() const {
1003     BSONObjBuilder bob;
1004     bob.append("ns", ns);
1005     append(&bob);
1006     return bob.obj();
1007 }
1008 
append(BSONObjBuilder * builder) const1009 void CollectionCloner::Stats::append(BSONObjBuilder* builder) const {
1010     builder->appendNumber(kDocumentsToCopyFieldName, documentToCopy);
1011     builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied);
1012     builder->appendNumber("indexes", indexes);
1013     builder->appendNumber("fetchedBatches", fetchBatches);
1014     if (start != Date_t()) {
1015         builder->appendDate("start", start);
1016         if (end != Date_t()) {
1017             builder->appendDate("end", end);
1018             auto elapsed = end - start;
1019             long long elapsedMillis = duration_cast<Milliseconds>(elapsed).count();
1020             builder->appendNumber("elapsedMillis", elapsedMillis);
1021         }
1022     }
1023 }
1024 }  // namespace repl
1025 }  // namespace mongo
1026