1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include <iosfwd>
34 #include <memory>
35 #include <ostream>
36 
37 #include "mongo/client/fetcher.h"
38 #include "mongo/db/client.h"
39 #include "mongo/db/commands/feature_compatibility_version.h"
40 #include "mongo/db/json.h"
41 #include "mongo/db/query/getmore_request.h"
42 #include "mongo/db/repl/base_cloner_test_fixture.h"
43 #include "mongo/db/repl/data_replicator_external_state_mock.h"
44 #include "mongo/db/repl/initial_syncer.h"
45 #include "mongo/db/repl/member_state.h"
46 #include "mongo/db/repl/oplog_entry.h"
47 #include "mongo/db/repl/oplog_fetcher.h"
48 #include "mongo/db/repl/optime.h"
49 #include "mongo/db/repl/replication_consistency_markers_mock.h"
50 #include "mongo/db/repl/replication_process.h"
51 #include "mongo/db/repl/replication_recovery_mock.h"
52 #include "mongo/db/repl/reporter.h"
53 #include "mongo/db/repl/storage_interface.h"
54 #include "mongo/db/repl/storage_interface_mock.h"
55 #include "mongo/db/repl/sync_source_resolver.h"
56 #include "mongo/db/repl/sync_source_selector.h"
57 #include "mongo/db/repl/sync_source_selector_mock.h"
58 #include "mongo/db/repl/task_executor_mock.h"
59 #include "mongo/db/repl/update_position_args.h"
60 #include "mongo/executor/network_interface_mock.h"
61 #include "mongo/executor/thread_pool_task_executor_test_fixture.h"
62 #include "mongo/stdx/mutex.h"
63 #include "mongo/util/concurrency/old_thread_pool.h"
64 #include "mongo/util/concurrency/thread_name.h"
65 #include "mongo/util/fail_point_service.h"
66 #include "mongo/util/mongoutils/str.h"
67 #include "mongo/util/scopeguard.h"
68 
69 #include "mongo/unittest/barrier.h"
70 #include "mongo/unittest/unittest.h"
71 
72 namespace mongo {
73 namespace repl {
74 
75 /**
76  * Insertion operator for InitialSyncer::State. Formats initial syncer state for output stream.
77  */
operator <<(std::ostream & os,const InitialSyncer::State & state)78 std::ostream& operator<<(std::ostream& os, const InitialSyncer::State& state) {
79     switch (state) {
80         case InitialSyncer::State::kPreStart:
81             return os << "PreStart";
82         case InitialSyncer::State::kRunning:
83             return os << "Running";
84         case InitialSyncer::State::kShuttingDown:
85             return os << "ShuttingDown";
86         case InitialSyncer::State::kComplete:
87             return os << "Complete";
88     }
89     MONGO_UNREACHABLE;
90 }
91 
92 }  // namespace repl
93 }  // namespace mongo
94 
95 
96 namespace {
97 
98 using namespace mongo;
99 using namespace mongo::repl;
100 
101 using executor::NetworkInterfaceMock;
102 using executor::RemoteCommandRequest;
103 using executor::RemoteCommandResponse;
104 using unittest::log;
105 
106 using LockGuard = stdx::lock_guard<stdx::mutex>;
107 using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard;
108 using UniqueLock = stdx::unique_lock<stdx::mutex>;
109 
110 struct CollectionCloneInfo {
111     CollectionMockStats stats;
112     CollectionBulkLoaderMock* loader = nullptr;
113     Status status{ErrorCodes::NotYetInitialized, ""};
114 };
115 
116 class InitialSyncerTest : public executor::ThreadPoolExecutorTest, public SyncSourceSelector {
117 public:
InitialSyncerTest()118     InitialSyncerTest() {}
119 
120     executor::ThreadPoolMock::Options makeThreadPoolMockOptions() const override;
121 
122     /**
123      * clear/reset state
124      */
reset()125     void reset() {
126         _setMyLastOptime = [this](const OpTime& opTime,
127                                   ReplicationCoordinator::DataConsistency consistency) {
128             _myLastOpTime = opTime;
129         };
130         _myLastOpTime = OpTime();
131         _syncSourceSelector = stdx::make_unique<SyncSourceSelectorMock>();
132     }
133 
134     // SyncSourceSelector
clearSyncSourceBlacklist()135     void clearSyncSourceBlacklist() override {
136         _syncSourceSelector->clearSyncSourceBlacklist();
137     }
chooseNewSyncSource(const OpTime & ot)138     HostAndPort chooseNewSyncSource(const OpTime& ot) override {
139         return _syncSourceSelector->chooseNewSyncSource(ot);
140     }
blacklistSyncSource(const HostAndPort & host,Date_t until)141     void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
142         _syncSourceSelector->blacklistSyncSource(host, until);
143     }
shouldChangeSyncSource(const HostAndPort & currentSource,const rpc::ReplSetMetadata & replMetadata,boost::optional<rpc::OplogQueryMetadata> oqMetadata)144     bool shouldChangeSyncSource(const HostAndPort& currentSource,
145                                 const rpc::ReplSetMetadata& replMetadata,
146                                 boost::optional<rpc::OplogQueryMetadata> oqMetadata) override {
147         return _syncSourceSelector->shouldChangeSyncSource(currentSource, replMetadata, oqMetadata);
148     }
149 
scheduleNetworkResponse(std::string cmdName,const BSONObj & obj)150     void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) {
151         NetworkInterfaceMock* net = getNet();
152         if (!net->hasReadyRequests()) {
153             log() << "The network doesn't have a request to process for this response: " << obj;
154         }
155         verifyNextRequestCommandName(cmdName);
156         scheduleNetworkResponse(net->getNextReadyRequest(), obj);
157     }
158 
scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi,const BSONObj & obj)159     void scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi,
160                                  const BSONObj& obj) {
161         NetworkInterfaceMock* net = getNet();
162         Milliseconds millis(0);
163         RemoteCommandResponse response(obj, BSONObj(), millis);
164         log() << "Sending response for network request:";
165         log() << "     req: " << noi->getRequest().dbname << "." << noi->getRequest().cmdObj;
166         log() << "     resp:" << response;
167 
168         net->scheduleResponse(noi, net->now(), response);
169     }
170 
scheduleNetworkResponse(std::string cmdName,Status errorStatus)171     void scheduleNetworkResponse(std::string cmdName, Status errorStatus) {
172         NetworkInterfaceMock* net = getNet();
173         if (!getNet()->hasReadyRequests()) {
174             log() << "The network doesn't have a request to process for the error: " << errorStatus;
175         }
176         verifyNextRequestCommandName(cmdName);
177         net->scheduleResponse(net->getNextReadyRequest(), net->now(), errorStatus);
178     }
179 
processNetworkResponse(std::string cmdName,const BSONObj & obj)180     void processNetworkResponse(std::string cmdName, const BSONObj& obj) {
181         scheduleNetworkResponse(cmdName, obj);
182         finishProcessingNetworkResponse();
183     }
184 
processNetworkResponse(std::string cmdName,Status errorStatus)185     void processNetworkResponse(std::string cmdName, Status errorStatus) {
186         scheduleNetworkResponse(cmdName, errorStatus);
187         finishProcessingNetworkResponse();
188     }
189 
190     /**
191      * Schedules and processes a successful response to the network request sent by InitialSyncer's
192      * last oplog entry fetcher. Also validates the find command arguments in the request.
193      */
194     void processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs);
195 
196     /**
197      * Schedules and processes a successful response to the network request sent by InitialSyncer's
198      * feature compatibility version fetcher. Includes the 'docs' provided in the response.
199      */
200     void processSuccessfulFCVFetcherResponse(std::vector<BSONObj> docs);
201 
202     /**
203      * Schedules and processes a successful response to the network request sent by InitialSyncer's
204      * feature compatibility version fetcher. Always includes a valid fCV=3.6 document in the
205      * response.
206      */
207     void processSuccessfulFCVFetcherResponse36();
208 
finishProcessingNetworkResponse()209     void finishProcessingNetworkResponse() {
210         getNet()->runReadyNetworkOperations();
211         if (getNet()->hasReadyRequests()) {
212             log() << "The network has unexpected requests to process, next req:";
213             NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest();
214             log() << req.getDiagnosticString();
215         }
216         ASSERT_FALSE(getNet()->hasReadyRequests());
217     }
218 
getInitialSyncer()219     InitialSyncer& getInitialSyncer() {
220         return *_initialSyncer;
221     }
222 
getExternalState()223     DataReplicatorExternalStateMock* getExternalState() {
224         return _externalState;
225     }
226 
getStorage()227     StorageInterface& getStorage() {
228         return *_storageInterface;
229     }
230 
getDbWorkThreadPool()231     OldThreadPool& getDbWorkThreadPool() {
232         return *_dbWorkThreadPool;
233     }
234 
235 protected:
236     struct StorageInterfaceResults {
237         bool createOplogCalled = false;
238         bool truncateCalled = false;
239         bool insertedOplogEntries = false;
240         int oplogEntriesInserted = 0;
241         bool droppedUserDBs = false;
242         std::vector<std::string> droppedCollections;
243         int documentsInsertedCount = 0;
244         bool schemaUpgraded = false;
245         OptionalCollectionUUID uuid;
246         bool getCollectionUUIDShouldFail = false;
247         bool upgradeUUIDSchemaVersionNonReplicatedShouldFail = false;
248     };
249 
250     stdx::mutex _storageInterfaceWorkDoneMutex;  // protects _storageInterfaceWorkDone.
251     StorageInterfaceResults _storageInterfaceWorkDone;
252 
setUp()253     void setUp() override {
254         executor::ThreadPoolExecutorTest::setUp();
255         _storageInterface = stdx::make_unique<StorageInterfaceMock>();
256         _storageInterface->createOplogFn = [this](OperationContext* opCtx,
257                                                   const NamespaceString& nss) {
258             LockGuard lock(_storageInterfaceWorkDoneMutex);
259             _storageInterfaceWorkDone.createOplogCalled = true;
260             _storageInterfaceWorkDone.schemaUpgraded = false;
261             _storageInterfaceWorkDone.uuid = boost::none;
262             _storageInterfaceWorkDone.getCollectionUUIDShouldFail = false;
263             _storageInterfaceWorkDone.upgradeUUIDSchemaVersionNonReplicatedShouldFail = false;
264             return Status::OK();
265         };
266         _storageInterface->truncateCollFn = [this](OperationContext* opCtx,
267                                                    const NamespaceString& nss) {
268             LockGuard lock(_storageInterfaceWorkDoneMutex);
269             _storageInterfaceWorkDone.truncateCalled = true;
270             return Status::OK();
271         };
272         _storageInterface->insertDocumentFn = [this](OperationContext* opCtx,
273                                                      const NamespaceString& nss,
274                                                      const TimestampedBSONObj& doc,
275                                                      long long term) {
276             LockGuard lock(_storageInterfaceWorkDoneMutex);
277             ++_storageInterfaceWorkDone.documentsInsertedCount;
278             return Status::OK();
279         };
280         _storageInterface->insertDocumentsFn = [this](OperationContext* opCtx,
281                                                       const NamespaceString& nss,
282                                                       const std::vector<InsertStatement>& ops) {
283             LockGuard lock(_storageInterfaceWorkDoneMutex);
284             _storageInterfaceWorkDone.insertedOplogEntries = true;
285             ++_storageInterfaceWorkDone.oplogEntriesInserted;
286             return Status::OK();
287         };
288         _storageInterface->dropCollFn = [this](OperationContext* opCtx,
289                                                const NamespaceString& nss) {
290             LockGuard lock(_storageInterfaceWorkDoneMutex);
291             _storageInterfaceWorkDone.droppedCollections.push_back(nss.ns());
292             return Status::OK();
293         };
294         _storageInterface->dropUserDBsFn = [this](OperationContext* opCtx) {
295             LockGuard lock(_storageInterfaceWorkDoneMutex);
296             _storageInterfaceWorkDone.droppedUserDBs = true;
297             return Status::OK();
298         };
299         _storageInterface->createCollectionForBulkFn =
300             [this](const NamespaceString& nss,
301                    const CollectionOptions& options,
302                    const BSONObj idIndexSpec,
303                    const std::vector<BSONObj>& secondaryIndexSpecs) {
304                 // Get collection info from map.
305                 const auto collInfo = &_collections[nss];
306                 if (collInfo->stats.initCalled) {
307                     log() << "reusing collection during test which may cause problems, ns:" << nss;
308                 }
309                 (collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats))
310                     ->init(secondaryIndexSpecs)
311                     .transitional_ignore();
312 
313                 return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
314                     std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
315             };
316         _storageInterface->getCollectionUUIDFn = [this](OperationContext* opCtx,
317                                                         const NamespaceString& nss) {
318             LockGuard lock(_storageInterfaceWorkDoneMutex);
319             if (_storageInterfaceWorkDone.getCollectionUUIDShouldFail) {
320                 // getCollectionUUID returns NamespaceNotFound if either the db or the collection is
321                 // missing.
322                 return StatusWith<OptionalCollectionUUID>(Status(
323                     ErrorCodes::NamespaceNotFound,
324                     str::stream() << "getCollectionUUID failed because namespace " << nss.ns()
325                                   << " not found."));
326             } else {
327                 return StatusWith<OptionalCollectionUUID>(_storageInterfaceWorkDone.uuid);
328             }
329         };
330 
331         _storageInterface->upgradeUUIDSchemaVersionNonReplicatedFn =
332             [this](OperationContext* opCtx) {
333                 LockGuard lock(_storageInterfaceWorkDoneMutex);
334                 if (_storageInterfaceWorkDone.upgradeUUIDSchemaVersionNonReplicatedShouldFail) {
335                     // One of the status codes a failed upgradeUUIDSchemaVersionNonReplicated call
336                     // can return is NamespaceNotFound.
337                     return Status(ErrorCodes::NamespaceNotFound,
338                                   "upgradeUUIDSchemaVersionNonReplicated failed because the "
339                                   "desired ns was not found.");
340                 } else {
341                     _storageInterfaceWorkDone.schemaUpgraded = true;
342                     return Status::OK();
343                 }
344             };
345 
346         _dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1);
347 
348         Client::initThreadIfNotAlready();
349         reset();
350 
351         launchExecutorThread();
352 
353         _replicationProcess = stdx::make_unique<ReplicationProcess>(
354             _storageInterface.get(),
355             stdx::make_unique<ReplicationConsistencyMarkersMock>(),
356             stdx::make_unique<ReplicationRecoveryMock>());
357 
358         _executorProxy = stdx::make_unique<TaskExecutorMock>(&getExecutor());
359 
360         _myLastOpTime = OpTime({3, 0}, 1);
361 
362         InitialSyncerOptions options;
363         options.initialSyncRetryWait = Milliseconds(1);
364         options.batchLimits.bytes = 512 * 1024 * 1024U;
365         options.batchLimits.ops = 5000U;
366         options.getMyLastOptime = [this]() { return _myLastOpTime; };
367         options.setMyLastOptime = [this](const OpTime& opTime,
368                                          ReplicationCoordinator::DataConsistency consistency) {
369             _setMyLastOptime(opTime, consistency);
370         };
371         options.resetOptimes = [this]() { _myLastOpTime = OpTime(); };
372         options.getSlaveDelay = []() { return Seconds(0); };
373         options.syncSourceSelector = this;
374 
375         _options = options;
376 
377         ThreadPool::Options threadPoolOptions;
378         threadPoolOptions.poolName = "replication";
379         threadPoolOptions.minThreads = 1U;
380         threadPoolOptions.maxThreads = 1U;
381         threadPoolOptions.onCreateThread = [](const std::string& threadName) {
382             Client::initThread(threadName.c_str());
383         };
384 
385         auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
386         dataReplicatorExternalState->taskExecutor = _executorProxy.get();
387         dataReplicatorExternalState->dbWorkThreadPool = &getDbWorkThreadPool();
388         dataReplicatorExternalState->currentTerm = 1LL;
389         dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime;
390         {
391             ReplSetConfig config;
392             ASSERT_OK(config.initialize(BSON("_id"
393                                              << "myset"
394                                              << "version"
395                                              << 1
396                                              << "protocolVersion"
397                                              << 1
398                                              << "members"
399                                              << BSON_ARRAY(BSON("_id" << 0 << "host"
400                                                                       << "localhost:12345"))
401                                              << "settings"
402                                              << BSON("electionTimeoutMillis" << 10000))));
403             dataReplicatorExternalState->replSetConfigResult = config;
404         }
405         _externalState = dataReplicatorExternalState.get();
406 
407         _lastApplied = getDetectableErrorStatus();
408         _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) {
409             _lastApplied = lastApplied;
410         };
411 
412         try {
413             // When creating InitialSyncer, we wrap _onCompletion so that we can override the
414             // InitialSyncer's callback behavior post-construction.
415             // See InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException.
416             _initialSyncer = stdx::make_unique<InitialSyncer>(
417                 options,
418                 std::move(dataReplicatorExternalState),
419                 _storageInterface.get(),
420                 _replicationProcess.get(),
421                 [this](const StatusWith<OpTimeWithHash>& lastApplied) {
422                     _onCompletion(lastApplied);
423                 });
424             _initialSyncer->setScheduleDbWorkFn_forTest(
425                 [this](const executor::TaskExecutor::CallbackFn& work) {
426                     return getExecutor().scheduleWork(work);
427                 });
428 
429         } catch (...) {
430             ASSERT_OK(exceptionToStatus());
431         }
432     }
433 
tearDownExecutorThread()434     void tearDownExecutorThread() {
435         if (_executorThreadShutdownComplete) {
436             return;
437         }
438         getExecutor().shutdown();
439         getExecutor().join();
440         _executorThreadShutdownComplete = true;
441     }
442 
tearDown()443     void tearDown() override {
444         tearDownExecutorThread();
445         _initialSyncer.reset();
446         _dbWorkThreadPool->join();
447         _dbWorkThreadPool.reset();
448         _replicationProcess.reset();
449         _storageInterface.reset();
450     }
451 
452     /**
453      * Note: An empty cmdName will skip validation.
454      */
verifyNextRequestCommandName(std::string cmdName)455     void verifyNextRequestCommandName(std::string cmdName) {
456         const auto net = getNet();
457         ASSERT_TRUE(net->hasReadyRequests());
458 
459         if (cmdName != "") {
460             const NetworkInterfaceMock::NetworkOperationIterator req =
461                 net->getFrontOfUnscheduledQueue();
462             const BSONObj reqBSON = req->getRequest().cmdObj;
463             const BSONElement cmdElem = reqBSON.firstElement();
464             auto reqCmdName = cmdElem.fieldNameStringData();
465             ASSERT_EQ(cmdName, reqCmdName);
466         }
467     }
468 
469     void runInitialSyncWithBadFCVResponse(std::vector<BSONObj> docs,
470                                           ErrorCodes::Error expectedError);
471     void doSuccessfulInitialSyncWithOneBatch();
472     OplogEntry doInitialSyncWithOneBatch();
473 
474     std::unique_ptr<TaskExecutorMock> _executorProxy;
475 
476     InitialSyncerOptions _options;
477     InitialSyncerOptions::SetMyLastOptimeFn _setMyLastOptime;
478     OpTime _myLastOpTime;
479     std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector;
480     std::unique_ptr<StorageInterfaceMock> _storageInterface;
481     std::unique_ptr<ReplicationProcess> _replicationProcess;
482     std::unique_ptr<OldThreadPool> _dbWorkThreadPool;
483     std::map<NamespaceString, CollectionMockStats> _collectionStats;
484     std::map<NamespaceString, CollectionCloneInfo> _collections;
485 
486     StatusWith<OpTimeWithHash> _lastApplied = Status(ErrorCodes::NotYetInitialized, "");
487     InitialSyncer::OnCompletionFn _onCompletion;
488 
489 private:
490     DataReplicatorExternalStateMock* _externalState;
491     std::unique_ptr<InitialSyncer> _initialSyncer;
492     bool _executorThreadShutdownComplete = false;
493 };
494 
makeThreadPoolMockOptions() const495 executor::ThreadPoolMock::Options InitialSyncerTest::makeThreadPoolMockOptions() const {
496     executor::ThreadPoolMock::Options options;
497     options.onCreateThread = []() { Client::initThread("InitialSyncerTest"); };
498     return options;
499 }
500 
advanceClock(NetworkInterfaceMock * net,Milliseconds duration)501 void advanceClock(NetworkInterfaceMock* net, Milliseconds duration) {
502     executor::NetworkInterfaceMock::InNetworkGuard guard(net);
503     auto when = net->now() + duration;
504     ASSERT_EQUALS(when, net->runUntil(when));
505 }
506 
makeOpCtx()507 ServiceContext::UniqueOperationContext makeOpCtx() {
508     return cc().makeOperationContext();
509 }
510 
511 /**
512  * Generates a replSetGetRBID response.
513  */
makeRollbackCheckerResponse(int rollbackId)514 BSONObj makeRollbackCheckerResponse(int rollbackId) {
515     return BSON("ok" << 1 << "rbid" << rollbackId);
516 }
517 
518 /**
519  * Generates a cursor response for a Fetcher to consume.
520  */
makeCursorResponse(CursorId cursorId,const NamespaceString & nss,std::vector<BSONObj> docs,bool isFirstBatch=true,int rbid=1)521 RemoteCommandResponse makeCursorResponse(CursorId cursorId,
522                                          const NamespaceString& nss,
523                                          std::vector<BSONObj> docs,
524                                          bool isFirstBatch = true,
525                                          int rbid = 1) {
526     OpTime futureOpTime(Timestamp(1000, 1000), 1000);
527     rpc::OplogQueryMetadata oqMetadata(futureOpTime, futureOpTime, rbid, 0, 0);
528     BSONObjBuilder metadataBob;
529     ASSERT_OK(oqMetadata.writeToMetadata(&metadataBob));
530     auto metadataObj = metadataBob.obj();
531 
532     BSONObjBuilder bob;
533     {
534         BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
535         cursorBob.append("id", cursorId);
536         cursorBob.append("ns", nss.toString());
537         {
538             BSONArrayBuilder batchBob(
539                 cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch"));
540             for (const auto& doc : docs) {
541                 batchBob.append(doc);
542             }
543         }
544     }
545     bob.append("ok", 1);
546     return {bob.obj(), metadataObj, Milliseconds(0)};
547 }
548 
549 /**
550  * Generates a listDatabases response for a DatabasesCloner to consume.
551  */
makeListDatabasesResponse(std::vector<std::string> databaseNames)552 BSONObj makeListDatabasesResponse(std::vector<std::string> databaseNames) {
553     BSONObjBuilder bob;
554     {
555         BSONArrayBuilder databasesBob(bob.subarrayStart("databases"));
556         for (const auto& name : databaseNames) {
557             BSONObjBuilder nameBob(databasesBob.subobjStart());
558             nameBob.append("name", name);
559         }
560     }
561     bob.append("ok", 1);
562     return bob.obj();
563 }
564 
565 /**
566  * Generates oplog entries with the given number used for the timestamp.
567  */
makeOplogEntry(int t,OpTypeEnum opType=OpTypeEnum::kInsert,int version=OplogEntry::kOplogVersion)568 OplogEntry makeOplogEntry(int t,
569                           OpTypeEnum opType = OpTypeEnum::kInsert,
570                           int version = OplogEntry::kOplogVersion) {
571     BSONObj oField = BSON("_id" << t << "a" << t);
572     if (opType == OpTypeEnum::kCommand) {
573         // Insert an arbitrary command name so that the oplog entry is valid.
574         oField = BSON("dropIndexes"
575                       << "a_1");
576     }
577     return OplogEntry(OpTime(Timestamp(t, 1), 1),  // optime
578                       static_cast<long long>(t),   // hash
579                       opType,                      // op type
580                       NamespaceString("a.a"),      // namespace
581                       boost::none,                 // uuid
582                       boost::none,                 // fromMigrate
583                       version,                     // version
584                       oField,                      // o
585                       boost::none,                 // o2
586                       {},                          // sessionInfo
587                       boost::none,                 // upsert
588                       boost::none,                 // wall clock time
589                       boost::none,                 // statement id
590                       boost::none,   // optime of previous write within same transaction
591                       boost::none,   // pre-image optime
592                       boost::none);  // post-image optime
593 }
594 
makeOplogEntryObj(int t,OpTypeEnum opType=OpTypeEnum::kInsert,int version=OplogEntry::kOplogVersion)595 BSONObj makeOplogEntryObj(int t,
596                           OpTypeEnum opType = OpTypeEnum::kInsert,
597                           int version = OplogEntry::kOplogVersion) {
598     return makeOplogEntry(t, opType, version).toBSON();
599 }
600 
processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs)601 void InitialSyncerTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs) {
602     auto net = getNet();
603     auto request = assertRemoteCommandNameEquals(
604         "find",
605         net->scheduleSuccessfulResponse(makeCursorResponse(0LL, _options.localOplogNS, docs)));
606     ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
607     ASSERT_TRUE(request.cmdObj.hasField("sort"));
608     ASSERT_EQUALS(mongo::BSONType::Object, request.cmdObj["sort"].type());
609     ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort"));
610     net->runReadyNetworkOperations();
611 }
612 
assertFCVRequest(RemoteCommandRequest request)613 void assertFCVRequest(RemoteCommandRequest request) {
614     ASSERT_EQUALS(nsToDatabaseSubstring(FeatureCompatibilityVersion::kCollection), request.dbname)
615         << request.toString();
616     ASSERT_EQUALS(nsToCollectionSubstring(FeatureCompatibilityVersion::kCollection),
617                   request.cmdObj.getStringField("find"));
618     ASSERT_BSONOBJ_EQ(BSON("_id" << FeatureCompatibilityVersion::kParameterName),
619                       request.cmdObj.getObjectField("filter"));
620 }
621 
processSuccessfulFCVFetcherResponse36()622 void InitialSyncerTest::processSuccessfulFCVFetcherResponse36() {
623     auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
624                             << FeatureCompatibilityVersionCommandParser::kVersion36)};
625     processSuccessfulFCVFetcherResponse(docs);
626 }
627 
processSuccessfulFCVFetcherResponse(std::vector<BSONObj> docs)628 void InitialSyncerTest::processSuccessfulFCVFetcherResponse(std::vector<BSONObj> docs) {
629     auto net = getNet();
630     auto request = assertRemoteCommandNameEquals(
631         "find",
632         net->scheduleSuccessfulResponse(makeCursorResponse(
633             0LL, NamespaceString(FeatureCompatibilityVersion::kCollection), docs)));
634     assertFCVRequest(request);
635     net->runReadyNetworkOperations();
636 }
637 
TEST_F(InitialSyncerTest,InvalidConstruction)638 TEST_F(InitialSyncerTest, InvalidConstruction) {
639     InitialSyncerOptions options;
640     options.getMyLastOptime = []() { return OpTime(); };
641     options.setMyLastOptime = [](const OpTime&,
642                                  ReplicationCoordinator::DataConsistency consistency) {};
643     options.resetOptimes = []() {};
644     options.getSlaveDelay = []() { return Seconds(0); };
645     options.syncSourceSelector = this;
646     auto callback = [](const StatusWith<OpTimeWithHash>&) {};
647 
648     // Null task executor in external state.
649     {
650         auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
651         ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
652                                                   std::move(dataReplicatorExternalState),
653                                                   _storageInterface.get(),
654                                                   _replicationProcess.get(),
655                                                   callback),
656                                     AssertionException,
657                                     ErrorCodes::BadValue,
658                                     "task executor cannot be null");
659     }
660 
661     // Null callback function.
662     {
663         auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
664         dataReplicatorExternalState->taskExecutor = &getExecutor();
665         ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
666                                                   std::move(dataReplicatorExternalState),
667                                                   _storageInterface.get(),
668                                                   _replicationProcess.get(),
669                                                   InitialSyncer::OnCompletionFn()),
670                                     AssertionException,
671                                     ErrorCodes::BadValue,
672                                     "callback function cannot be null");
673     }
674 }
675 
TEST_F(InitialSyncerTest,CreateDestroy)676 TEST_F(InitialSyncerTest, CreateDestroy) {}
677 
678 const std::uint32_t maxAttempts = 1U;
679 
TEST_F(InitialSyncerTest,StartupReturnsIllegalOperationIfAlreadyActive)680 TEST_F(InitialSyncerTest, StartupReturnsIllegalOperationIfAlreadyActive) {
681     auto initialSyncer = &getInitialSyncer();
682     auto opCtx = makeOpCtx();
683     ASSERT_FALSE(initialSyncer->isActive());
684     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
685     ASSERT_TRUE(initialSyncer->isActive());
686     ASSERT_EQUALS(ErrorCodes::IllegalOperation, initialSyncer->startup(opCtx.get(), maxAttempts));
687     ASSERT_TRUE(initialSyncer->isActive());
688 }
689 
TEST_F(InitialSyncerTest,StartupReturnsShutdownInProgressIfInitialSyncerIsShuttingDown)690 TEST_F(InitialSyncerTest, StartupReturnsShutdownInProgressIfInitialSyncerIsShuttingDown) {
691     auto initialSyncer = &getInitialSyncer();
692     auto opCtx = makeOpCtx();
693     ASSERT_FALSE(initialSyncer->isActive());
694     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
695     ASSERT_TRUE(initialSyncer->isActive());
696     // SyncSourceSelector returns an invalid sync source so InitialSyncer is stuck waiting for
697     // another sync source in 'Options::syncSourceRetryWait' ms.
698     ASSERT_OK(initialSyncer->shutdown());
699     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
700 }
701 
TEST_F(InitialSyncerTest,StartupReturnsShutdownInProgressIfExecutorIsShutdown)702 TEST_F(InitialSyncerTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) {
703     auto initialSyncer = &getInitialSyncer();
704     auto opCtx = makeOpCtx();
705     getExecutor().shutdown();
706     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
707     ASSERT_FALSE(initialSyncer->isActive());
708 
709     // Cannot startup initial syncer again since it's in the Complete state.
710     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
711 }
712 
TEST_F(InitialSyncerTest,ShutdownTransitionsStateToCompleteIfCalledBeforeStartup)713 TEST_F(InitialSyncerTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) {
714     auto initialSyncer = &getInitialSyncer();
715     auto opCtx = makeOpCtx();
716     ASSERT_OK(initialSyncer->shutdown());
717     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
718     // Initial syncer is inactive when it's in the Complete state.
719     ASSERT_FALSE(initialSyncer->isActive());
720 }
721 
TEST_F(InitialSyncerTest,StartupSetsInitialSyncFlagOnSuccess)722 TEST_F(InitialSyncerTest, StartupSetsInitialSyncFlagOnSuccess) {
723     auto initialSyncer = &getInitialSyncer();
724     auto opCtx = makeOpCtx();
725 
726     // Initial sync flag should not be set before starting.
727     ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
728 
729     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
730     ASSERT_TRUE(initialSyncer->isActive());
731 
732     // Initial sync flag should be set.
733     ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
734 }
735 
TEST_F(InitialSyncerTest,StartupSetsInitialDataTimestampAndStableTimestampOnSuccess)736 TEST_F(InitialSyncerTest, StartupSetsInitialDataTimestampAndStableTimestampOnSuccess) {
737     auto initialSyncer = &getInitialSyncer();
738     auto opCtx = makeOpCtx();
739 
740     // Set initial data timestamp forward first.
741     auto serviceCtx = opCtx.get()->getServiceContext();
742     _storageInterface->setInitialDataTimestamp(serviceCtx, Timestamp(5, 5));
743     _storageInterface->setStableTimestamp(serviceCtx, Timestamp(6, 6));
744 
745     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
746     ASSERT_TRUE(initialSyncer->isActive());
747 
748     ASSERT_EQUALS(Timestamp::kAllowUnstableCheckpointsSentinel,
749                   _storageInterface->getInitialDataTimestamp());
750     ASSERT_EQUALS(Timestamp::min(), _storageInterface->getStableTimestamp());
751 }
752 
TEST_F(InitialSyncerTest,InitialSyncerReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup)753 TEST_F(InitialSyncerTest, InitialSyncerReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup) {
754     auto initialSyncer = &getInitialSyncer();
755     auto opCtx = makeOpCtx();
756 
757     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
758     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
759 
760     // This will cancel the _startInitialSyncAttemptCallback() task scheduled by startup().
761     ASSERT_OK(initialSyncer->shutdown());
762 
763     // Depending on which InitialSyncer stage (_chooseSyncSource or _rollbackCheckerResetCallback)
764     // was interrupted by shutdown(), we may have to request the network interface to deliver
765     // cancellation signals to the InitialSyncer callbacks in for InitialSyncer to run to
766     // completion.
767     executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
768 
769     initialSyncer->join();
770 
771     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
772 }
773 
TEST_F(InitialSyncerTest,InitialSyncerRetriesSyncSourceSelectionIfChooseNewSyncSourceReturnsInvalidSyncSource)774 TEST_F(InitialSyncerTest,
775        InitialSyncerRetriesSyncSourceSelectionIfChooseNewSyncSourceReturnsInvalidSyncSource) {
776     auto initialSyncer = &getInitialSyncer();
777     auto opCtx = makeOpCtx();
778 
779     // Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
780     // because InitialSyncer will look for a valid sync source immediately after startup.
781     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
782 
783     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
784 
785     // Run first sync source selection attempt.
786     executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
787 
788     // InitialSyncer will not drop user databases while looking for a valid sync source.
789     ASSERT_FALSE(_storageInterfaceWorkDone.droppedUserDBs);
790 
791     // First sync source selection attempt failed. Update SyncSourceSelectorMock to return valid
792     // sync source next time chooseNewSyncSource() is called.
793     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
794 
795     // Advance clock until the next sync source selection attempt.
796     advanceClock(getNet(), _options.syncSourceRetryWait);
797 
798     // DataReplictor drops user databases after obtaining a valid sync source.
799     ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
800 }
801 
802 const std::uint32_t chooseSyncSourceMaxAttempts = 10U;
803 
804 /**
805  * Advances executor clock so that InitialSyncer exhausts all 'chooseSyncSourceMaxAttempts' (server
806  * parameter numInitialSyncConnectAttempts) sync source selection attempts.
807  * If SyncSourceSelectorMock keeps returning an invalid sync source, InitialSyncer will retry every
808  * '_options.syncSourceRetryWait' ms up to a maximum of 'chooseSyncSourceMaxAttempts' attempts.
809  */
_simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock * net,Milliseconds syncSourceRetryWait)810 void _simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock* net,
811                                       Milliseconds syncSourceRetryWait) {
812     advanceClock(net, int(chooseSyncSourceMaxAttempts - 1) * syncSourceRetryWait);
813 }
814 
TEST_F(InitialSyncerTest,InitialSyncerReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts)815 TEST_F(
816     InitialSyncerTest,
817     InitialSyncerReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts) {
818     auto initialSyncer = &getInitialSyncer();
819     auto opCtx = makeOpCtx();
820 
821     // Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
822     // because InitialSyncer will look for a valid sync source immediately after startup.
823     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
824 
825     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
826 
827     _simulateChooseSyncSourceFailure(getNet(), _options.syncSourceRetryWait);
828 
829     initialSyncer->join();
830 
831     ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
832 }
833 
834 // Confirms that InitialSyncer keeps retrying initial sync.
835 // Make every initial sync attempt fail early by having the sync source selector always return an
836 // invalid sync source.
TEST_F(InitialSyncerTest,InitialSyncerRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError)837 TEST_F(InitialSyncerTest,
838        InitialSyncerRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError) {
839     auto initialSyncer = &getInitialSyncer();
840     auto opCtx = makeOpCtx();
841 
842     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
843 
844     const std::uint32_t initialSyncMaxAttempts = 3U;
845     ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
846 
847     auto net = getNet();
848     for (std::uint32_t i = 0; i < initialSyncMaxAttempts; ++i) {
849         _simulateChooseSyncSourceFailure(net, _options.syncSourceRetryWait);
850         advanceClock(net, _options.initialSyncRetryWait);
851     }
852 
853     initialSyncer->join();
854 
855     ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
856 
857     // Check number of failed attempts in stats.
858     auto progress = initialSyncer->getInitialSyncProgress();
859     unittest::log() << "Progress after " << initialSyncMaxAttempts
860                     << " failed attempts: " << progress;
861     ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), int(initialSyncMaxAttempts))
862         << progress;
863     ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), int(initialSyncMaxAttempts))
864         << progress;
865 }
866 
TEST_F(InitialSyncerTest,InitialSyncerResetsOptimesOnNewAttempt)867 TEST_F(InitialSyncerTest, InitialSyncerResetsOptimesOnNewAttempt) {
868     auto initialSyncer = &getInitialSyncer();
869     auto opCtx = makeOpCtx();
870 
871     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
872 
873     // Set the last optime to an arbitrary nonzero value. The value of the 'consistency' argument
874     // doesn't matter.
875     auto origOptime = OpTime(Timestamp(1000, 1), 1);
876     _setMyLastOptime(origOptime, ReplicationCoordinator::DataConsistency::Inconsistent);
877 
878     // Start initial sync.
879     const std::uint32_t initialSyncMaxAttempts = 1U;
880     ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
881 
882     auto net = getNet();
883 
884     // Simulate a failed initial sync attempt
885     _simulateChooseSyncSourceFailure(net, _options.syncSourceRetryWait);
886     advanceClock(net, _options.initialSyncRetryWait);
887 
888     initialSyncer->join();
889 
890     // Make sure the initial sync attempt reset optimes.
891     ASSERT_EQUALS(OpTime(), _options.getMyLastOptime());
892 }
893 
TEST_F(InitialSyncerTest,InitialSyncerReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection)894 TEST_F(InitialSyncerTest,
895        InitialSyncerReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection) {
896     auto initialSyncer = &getInitialSyncer();
897     auto opCtx = makeOpCtx();
898 
899     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
900     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
901 
902     auto net = getNet();
903     {
904         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
905         auto when = net->now() + _options.syncSourceRetryWait / 2;
906         ASSERT_GREATER_THAN(when, net->now());
907         ASSERT_EQUALS(when, net->runUntil(when));
908     }
909 
910     // This will cancel the _chooseSyncSourceCallback() task scheduled at getNet()->now() +
911     // '_options.syncSourceRetryWait'.
912     ASSERT_OK(initialSyncer->shutdown());
913 
914     initialSyncer->join();
915 
916     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
917 }
918 
TEST_F(InitialSyncerTest,InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback)919 TEST_F(InitialSyncerTest,
920        InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback) {
921     auto initialSyncer = &getInitialSyncer();
922     auto opCtx = makeOpCtx();
923 
924     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
925     _executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
926     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
927 
928     initialSyncer->join();
929 
930     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
931 }
932 
TEST_F(InitialSyncerTest,InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt)933 TEST_F(InitialSyncerTest,
934        InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt) {
935     auto initialSyncer = &getInitialSyncer();
936     auto opCtx = makeOpCtx();
937 
938     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
939 
940     ASSERT_EQUALS(InitialSyncer::State::kPreStart, initialSyncer->getState_forTest());
941 
942     ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
943     ASSERT_EQUALS(InitialSyncer::State::kRunning, initialSyncer->getState_forTest());
944 
945     // Advance clock so that we run all but the last sync source callback.
946     auto net = getNet();
947     advanceClock(net, int(chooseSyncSourceMaxAttempts - 2) * _options.syncSourceRetryWait);
948 
949     // Last choose sync source attempt should now be scheduled. Advance clock so we fail last
950     // choose sync source attempt which cause the next initial sync attempt to be scheduled.
951     _executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
952     advanceClock(net, _options.syncSourceRetryWait);
953 
954     initialSyncer->join();
955 
956     ASSERT_EQUALS(InitialSyncer::State::kComplete, initialSyncer->getState_forTest());
957     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
958 }
959 
960 // This test verifies that the initial syncer will still transition to a complete state even if
961 // the completion callback function throws an exception.
TEST_F(InitialSyncerTest,InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException)962 TEST_F(InitialSyncerTest, InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException) {
963     auto initialSyncer = &getInitialSyncer();
964     auto opCtx = makeOpCtx();
965 
966     _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) {
967         _lastApplied = lastApplied;
968         uassert(ErrorCodes::InternalError, "", false);
969     };
970 
971     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
972     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
973 
974     ASSERT_OK(initialSyncer->shutdown());
975     initialSyncer->join();
976 
977     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
978 }
979 
980 class SharedCallbackState {
981     MONGO_DISALLOW_COPYING(SharedCallbackState);
982 
983 public:
SharedCallbackState(bool * sharedCallbackStateDestroyed)984     explicit SharedCallbackState(bool* sharedCallbackStateDestroyed)
985         : _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {}
~SharedCallbackState()986     ~SharedCallbackState() {
987         *_sharedCallbackStateDestroyed = true;
988     }
989 
990 private:
991     bool* _sharedCallbackStateDestroyed;
992 };
993 
TEST_F(InitialSyncerTest,InitialSyncerResetsOnCompletionCallbackFunctionPointerUponCompletion)994 TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointerUponCompletion) {
995     bool sharedCallbackStateDestroyed = false;
996     auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed);
997     decltype(_lastApplied) lastApplied = getDetectableErrorStatus();
998 
999     auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
1000     dataReplicatorExternalState->taskExecutor = &getExecutor();
1001     auto initialSyncer = stdx::make_unique<InitialSyncer>(
1002         _options,
1003         std::move(dataReplicatorExternalState),
1004         _storageInterface.get(),
1005         _replicationProcess.get(),
1006         [&lastApplied, sharedCallbackData](const StatusWith<OpTimeWithHash>& result) {
1007             lastApplied = result;
1008         });
1009     ON_BLOCK_EXIT([this]() { getExecutor().shutdown(); });
1010 
1011     auto opCtx = makeOpCtx();
1012 
1013     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1014 
1015     sharedCallbackData.reset();
1016     ASSERT_FALSE(sharedCallbackStateDestroyed);
1017 
1018     ASSERT_OK(initialSyncer->shutdown());
1019 
1020     // Depending on which InitialSyncer stage (_chooseSyncSource or _rollbackCheckerResetCallback)
1021     // was interrupted by shutdown(), we may have to request the network interface to deliver
1022     // cancellation signals to the InitialSyncer callbacks in for InitialSyncer to run to
1023     // completion.
1024     executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
1025 
1026     initialSyncer->join();
1027 
1028     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, lastApplied);
1029 
1030     // InitialSyncer should reset 'InitialSyncer::_onCompletion' after running callback function
1031     // for the last time before becoming inactive.
1032     // This ensures that we release resources associated with 'InitialSyncer::_onCompletion'.
1033     ASSERT_TRUE(sharedCallbackStateDestroyed);
1034 }
1035 
TEST_F(InitialSyncerTest,InitialSyncerTruncatesOplogAndDropsReplicatedDatabases)1036 TEST_F(InitialSyncerTest, InitialSyncerTruncatesOplogAndDropsReplicatedDatabases) {
1037     // We are not interested in proceeding beyond the dropUserDB stage so we inject a failure
1038     // after setting '_storageInterfaceWorkDone.droppedUserDBs' to true.
1039     auto oldDropUserDBsFn = _storageInterface->dropUserDBsFn;
1040     _storageInterface->dropUserDBsFn = [oldDropUserDBsFn](OperationContext* opCtx) {
1041         ASSERT_OK(oldDropUserDBsFn(opCtx));
1042         return Status(ErrorCodes::OperationFailed, "drop userdbs failed");
1043     };
1044 
1045     auto initialSyncer = &getInitialSyncer();
1046     auto opCtx = makeOpCtx();
1047 
1048     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1049     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1050 
1051     initialSyncer->join();
1052     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1053 
1054     LockGuard lock(_storageInterfaceWorkDoneMutex);
1055     ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
1056     ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
1057 }
1058 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughGetRollbackIdScheduleError)1059 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) {
1060     auto initialSyncer = &getInitialSyncer();
1061     auto opCtx = makeOpCtx();
1062 
1063     // replSetGetRBID is the first remote command to be scheduled by the initial syncer after
1064     // creating the oplog collection.
1065     executor::RemoteCommandRequest request;
1066     _executorProxy->shouldFailScheduleRemoteCommandRequest =
1067         [&request](const executor::RemoteCommandRequest& requestToSend) {
1068             request = requestToSend;
1069             return true;
1070         };
1071 
1072     HostAndPort syncSource("localhost", 12345);
1073     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1074     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1075 
1076     initialSyncer->join();
1077     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1078 
1079     ASSERT_EQUALS("admin", request.dbname);
1080     assertRemoteCommandNameEquals("replSetGetRBID", request);
1081     ASSERT_EQUALS(syncSource, request.target);
1082 }
1083 
TEST_F(InitialSyncerTest,InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown)1084 TEST_F(
1085     InitialSyncerTest,
1086     InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) {
1087     // The rollback id request is sent immediately after oplog truncation. We shut the task executor
1088     // down before returning from truncate() to make the scheduleRemoteCommand() call for
1089     // replSetGetRBID fail.
1090     auto oldTruncateCollFn = _storageInterface->truncateCollFn;
1091     _storageInterface->truncateCollFn = [oldTruncateCollFn, this](OperationContext* opCtx,
1092                                                                   const NamespaceString& nss) {
1093         auto status = oldTruncateCollFn(opCtx, nss);
1094         getExecutor().shutdown();
1095         return status;
1096     };
1097 
1098     auto initialSyncer = &getInitialSyncer();
1099     auto opCtx = makeOpCtx();
1100 
1101     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1102     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1103 
1104     initialSyncer->join();
1105     ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied);
1106 
1107     LockGuard lock(_storageInterfaceWorkDoneMutex);
1108     ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
1109 }
1110 
TEST_F(InitialSyncerTest,InitialSyncerCancelsRollbackCheckerOnShutdown)1111 TEST_F(InitialSyncerTest, InitialSyncerCancelsRollbackCheckerOnShutdown) {
1112     auto initialSyncer = &getInitialSyncer();
1113     auto opCtx = makeOpCtx();
1114 
1115     HostAndPort syncSource("localhost", 12345);
1116     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1117 
1118     ASSERT_EQUALS(InitialSyncer::State::kPreStart, initialSyncer->getState_forTest());
1119 
1120     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1121     ASSERT_EQUALS(InitialSyncer::State::kRunning, initialSyncer->getState_forTest());
1122 
1123     auto net = getNet();
1124     {
1125         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1126         ASSERT_TRUE(net->hasReadyRequests());
1127         auto noi = net->getNextReadyRequest();
1128         const auto& request = assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest());
1129         ASSERT_EQUALS("admin", request.dbname);
1130         ASSERT_EQUALS(syncSource, request.target);
1131         net->blackHole(noi);
1132     }
1133 
1134     ASSERT_OK(initialSyncer->shutdown());
1135     // Since we need to request the NetworkInterfaceMock to deliver the cancellation event,
1136     // the InitialSyncer has to be in a pre-completion state (ie. ShuttingDown).
1137     ASSERT_EQUALS(InitialSyncer::State::kShuttingDown, initialSyncer->getState_forTest());
1138 
1139     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
1140 
1141     initialSyncer->join();
1142     ASSERT_EQUALS(InitialSyncer::State::kComplete, initialSyncer->getState_forTest());
1143 
1144     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
1145 }
1146 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughRollbackCheckerCallbackError)1147 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughRollbackCheckerCallbackError) {
1148     auto initialSyncer = &getInitialSyncer();
1149     auto opCtx = makeOpCtx();
1150 
1151     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1152     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1153 
1154     auto net = getNet();
1155     {
1156         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1157         assertRemoteCommandNameEquals(
1158             "replSetGetRBID",
1159             net->scheduleErrorResponse(
1160                 Status(ErrorCodes::OperationFailed, "replSetGetRBID failed at sync source")));
1161         net->runReadyNetworkOperations();
1162     }
1163 
1164     initialSyncer->join();
1165     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1166 }
1167 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughLastOplogEntryFetcherScheduleError)1168 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherScheduleError) {
1169     auto initialSyncer = &getInitialSyncer();
1170     auto opCtx = makeOpCtx();
1171 
1172     // The last oplog entry fetcher is the first component that sends a find command so we reject
1173     // any find commands and save the request for inspection at the end of this test case.
1174     executor::RemoteCommandRequest request;
1175     _executorProxy->shouldFailScheduleRemoteCommandRequest =
1176         [&request](const executor::RemoteCommandRequest& requestToSend) {
1177             request = requestToSend;
1178             return "find" == requestToSend.cmdObj.firstElement().fieldNameStringData();
1179         };
1180 
1181     HostAndPort syncSource("localhost", 12345);
1182     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1183     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1184 
1185     auto net = getNet();
1186     {
1187         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1188 
1189         // Base rollback ID.
1190         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1191         net->runReadyNetworkOperations();
1192     }
1193 
1194     initialSyncer->join();
1195     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1196 
1197     ASSERT_EQUALS(syncSource, request.target);
1198     ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname);
1199     assertRemoteCommandNameEquals("find", request);
1200     ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort"));
1201     ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
1202 }
1203 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughLastOplogEntryFetcherCallbackError)1204 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherCallbackError) {
1205     auto initialSyncer = &getInitialSyncer();
1206     auto opCtx = makeOpCtx();
1207 
1208     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1209     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1210 
1211     auto net = getNet();
1212     {
1213         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1214 
1215         // Base rollback ID.
1216         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1217         net->runReadyNetworkOperations();
1218 
1219         assertRemoteCommandNameEquals(
1220             "find",
1221             net->scheduleErrorResponse(
1222                 Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
1223         net->runReadyNetworkOperations();
1224     }
1225 
1226     initialSyncer->join();
1227     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1228 }
1229 
TEST_F(InitialSyncerTest,InitialSyncerCancelsLastOplogEntryFetcherOnShutdown)1230 TEST_F(InitialSyncerTest, InitialSyncerCancelsLastOplogEntryFetcherOnShutdown) {
1231     auto initialSyncer = &getInitialSyncer();
1232     auto opCtx = makeOpCtx();
1233 
1234     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1235     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1236 
1237     auto net = getNet();
1238     {
1239         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1240 
1241         // Base rollback ID.
1242         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1243         net->runReadyNetworkOperations();
1244 
1245         ASSERT_TRUE(net->hasReadyRequests());
1246     }
1247 
1248     ASSERT_OK(initialSyncer->shutdown());
1249     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
1250 
1251     initialSyncer->join();
1252     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
1253 }
1254 
TEST_F(InitialSyncerTest,InitialSyncerReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments)1255 TEST_F(InitialSyncerTest,
1256        InitialSyncerReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments) {
1257     auto initialSyncer = &getInitialSyncer();
1258     auto opCtx = makeOpCtx();
1259 
1260     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1261     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1262 
1263     auto net = getNet();
1264     {
1265         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1266 
1267         // Base rollback ID.
1268         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1269         net->runReadyNetworkOperations();
1270 
1271         // Last oplog entry.
1272         processSuccessfulLastOplogEntryFetcherResponse({});
1273     }
1274 
1275     initialSyncer->join();
1276     ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, _lastApplied);
1277 }
1278 
TEST_F(InitialSyncerTest,InitialSyncerResendsFindCommandIfLastOplogEntryFetcherReturnsRetriableError)1279 TEST_F(InitialSyncerTest,
1280        InitialSyncerResendsFindCommandIfLastOplogEntryFetcherReturnsRetriableError) {
1281     auto initialSyncer = &getInitialSyncer();
1282     auto opCtx = makeOpCtx();
1283 
1284     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1285     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1286 
1287     auto net = getNet();
1288     executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1289 
1290     // Base rollback ID.
1291     net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1292     net->runReadyNetworkOperations();
1293 
1294     // Last oplog entry first attempt - retriable error.
1295     assertRemoteCommandNameEquals("find",
1296                                   net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
1297     net->runReadyNetworkOperations();
1298 
1299     // InitialSyncer stays active because it resends the find request for the last oplog entry.
1300     ASSERT_TRUE(initialSyncer->isActive());
1301 
1302     // Last oplog entry second attempt.
1303     processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1304 }
1305 
TEST_F(InitialSyncerTest,InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash)1306 TEST_F(InitialSyncerTest,
1307        InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash) {
1308     auto initialSyncer = &getInitialSyncer();
1309     auto opCtx = makeOpCtx();
1310 
1311     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1312     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1313 
1314     auto net = getNet();
1315     {
1316         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1317 
1318         // Base rollback ID.
1319         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1320         net->runReadyNetworkOperations();
1321 
1322         // Last oplog entry.
1323         processSuccessfulLastOplogEntryFetcherResponse({BSONObj()});
1324     }
1325 
1326     initialSyncer->join();
1327     ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
1328 }
1329 
TEST_F(InitialSyncerTest,InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp)1330 TEST_F(InitialSyncerTest,
1331        InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp) {
1332     auto initialSyncer = &getInitialSyncer();
1333     auto opCtx = makeOpCtx();
1334 
1335     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1336     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1337 
1338     auto net = getNet();
1339     {
1340         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1341 
1342         // Base rollback ID.
1343         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1344         net->runReadyNetworkOperations();
1345 
1346         // Last oplog entry.
1347         processSuccessfulLastOplogEntryFetcherResponse({BSON("h" << 1LL)});
1348     }
1349 
1350     initialSyncer->join();
1351     ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
1352 }
1353 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig)1354 TEST_F(InitialSyncerTest,
1355        InitialSyncerPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig) {
1356     auto initialSyncer = &getInitialSyncer();
1357     auto opCtx = makeOpCtx();
1358 
1359     getExternalState()->replSetConfigResult = Status(ErrorCodes::OperationFailed, "");
1360 
1361     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1362     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1363 
1364     auto net = getNet();
1365     {
1366         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1367 
1368         // Base rollback ID.
1369         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1370         net->runReadyNetworkOperations();
1371 
1372         // Last oplog entry.
1373         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1374 
1375         // Feature Compatibility Version.
1376         processSuccessfulFCVFetcherResponse36();
1377     }
1378 
1379     initialSyncer->join();
1380     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1381 }
1382 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughFCVFetcherScheduleError)1383 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherScheduleError) {
1384     auto initialSyncer = &getInitialSyncer();
1385     auto opCtx = makeOpCtx();
1386 
1387     // We reject the first find command that is on the fCV collection.
1388     executor::RemoteCommandRequest request;
1389     _executorProxy->shouldFailScheduleRemoteCommandRequest =
1390         [&request](const executor::RemoteCommandRequest& requestToSend) {
1391             request = requestToSend;
1392             return "find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
1393                 nsToCollectionSubstring(FeatureCompatibilityVersion::kCollection) ==
1394                 requestToSend.cmdObj.firstElement().str();
1395         };
1396 
1397     HostAndPort syncSource("localhost", 12345);
1398     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1399     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1400 
1401     auto net = getNet();
1402     {
1403         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1404 
1405         // Base rollback ID.
1406         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1407         net->runReadyNetworkOperations();
1408 
1409         // Last oplog entry.
1410         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1411     }
1412 
1413     initialSyncer->join();
1414     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1415 
1416     ASSERT_EQUALS(syncSource, request.target);
1417     assertFCVRequest(request);
1418 }
1419 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughFCVFetcherCallbackError)1420 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherCallbackError) {
1421     auto initialSyncer = &getInitialSyncer();
1422     auto opCtx = makeOpCtx();
1423 
1424     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1425     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1426 
1427     auto net = getNet();
1428     {
1429         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1430 
1431         // Base rollback ID.
1432         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1433         net->runReadyNetworkOperations();
1434 
1435         // Last oplog entry.
1436         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1437 
1438         auto request = assertRemoteCommandNameEquals(
1439             "find",
1440             net->scheduleErrorResponse(
1441                 Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
1442         assertFCVRequest(request);
1443         net->runReadyNetworkOperations();
1444     }
1445 
1446     initialSyncer->join();
1447     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1448 }
1449 
TEST_F(InitialSyncerTest,InitialSyncerCancelsFCVFetcherOnShutdown)1450 TEST_F(InitialSyncerTest, InitialSyncerCancelsFCVFetcherOnShutdown) {
1451     auto initialSyncer = &getInitialSyncer();
1452     auto opCtx = makeOpCtx();
1453 
1454     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1455     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1456 
1457     auto net = getNet();
1458     {
1459         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1460 
1461         // Base rollback ID.
1462         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1463         net->runReadyNetworkOperations();
1464 
1465         // Last oplog entry.
1466         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1467 
1468         ASSERT_TRUE(net->hasReadyRequests());
1469     }
1470 
1471     ASSERT_OK(initialSyncer->shutdown());
1472     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
1473 
1474     initialSyncer->join();
1475     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
1476 }
1477 
TEST_F(InitialSyncerTest,InitialSyncerResendsFindCommandIfFCVFetcherReturnsRetriableError)1478 TEST_F(InitialSyncerTest, InitialSyncerResendsFindCommandIfFCVFetcherReturnsRetriableError) {
1479     auto initialSyncer = &getInitialSyncer();
1480     auto opCtx = makeOpCtx();
1481 
1482     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1483     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1484 
1485     auto net = getNet();
1486     executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1487 
1488     // Base rollback ID.
1489     net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1490     net->runReadyNetworkOperations();
1491 
1492     // Last oplog entry.
1493     processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1494 
1495     // FCV first attempt - retriable error.
1496     assertRemoteCommandNameEquals("find",
1497                                   net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
1498     net->runReadyNetworkOperations();
1499 
1500     // InitialSyncer stays active because it resends the find request for the fCV.
1501     ASSERT_TRUE(initialSyncer->isActive());
1502 
1503     // FCV second attempt.
1504     processSuccessfulFCVFetcherResponse36();
1505 }
1506 
runInitialSyncWithBadFCVResponse(std::vector<BSONObj> docs,ErrorCodes::Error expectedError)1507 void InitialSyncerTest::runInitialSyncWithBadFCVResponse(std::vector<BSONObj> docs,
1508                                                          ErrorCodes::Error expectedError) {
1509     auto initialSyncer = &getInitialSyncer();
1510     auto opCtx = makeOpCtx();
1511 
1512     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1513     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1514 
1515     auto net = getNet();
1516     {
1517         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1518 
1519         // Base rollback ID.
1520         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1521         net->runReadyNetworkOperations();
1522 
1523         // Last oplog entry.
1524         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1525 
1526         processSuccessfulFCVFetcherResponse(docs);
1527     }
1528 
1529     initialSyncer->join();
1530     ASSERT_EQUALS(expectedError, _lastApplied);
1531 }
1532 
TEST_F(InitialSyncerTest,InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsEmptyBatchOfDocuments)1533 TEST_F(InitialSyncerTest,
1534        InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsEmptyBatchOfDocuments) {
1535     runInitialSyncWithBadFCVResponse({}, ErrorCodes::IncompatibleServerVersion);
1536 }
1537 
TEST_F(InitialSyncerTest,InitialSyncerReturnsTooManyMatchingDocumentsWhenFCVFetcherReturnsMultipleDocuments)1538 TEST_F(InitialSyncerTest,
1539        InitialSyncerReturnsTooManyMatchingDocumentsWhenFCVFetcherReturnsMultipleDocuments) {
1540     auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
1541                             << FeatureCompatibilityVersionCommandParser::kVersion36),
1542                  BSON("_id"
1543                       << "other")};
1544     runInitialSyncWithBadFCVResponse(docs, ErrorCodes::TooManyMatchingDocuments);
1545 }
1546 
TEST_F(InitialSyncerTest,InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsUpgradeTargetVersion)1547 TEST_F(InitialSyncerTest,
1548        InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsUpgradeTargetVersion) {
1549     auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
1550                             << FeatureCompatibilityVersionCommandParser::kVersion34
1551                             << "targetVersion"
1552                             << FeatureCompatibilityVersionCommandParser::kVersion36)};
1553     runInitialSyncWithBadFCVResponse(docs, ErrorCodes::IncompatibleServerVersion);
1554 }
1555 
TEST_F(InitialSyncerTest,InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsDowngradeTargetVersion)1556 TEST_F(InitialSyncerTest,
1557        InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsDowngradeTargetVersion) {
1558     auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
1559                             << FeatureCompatibilityVersionCommandParser::kVersion34
1560                             << "targetVersion"
1561                             << FeatureCompatibilityVersionCommandParser::kVersion34)};
1562     runInitialSyncWithBadFCVResponse(docs, ErrorCodes::IncompatibleServerVersion);
1563 }
1564 
TEST_F(InitialSyncerTest,InitialSyncerReturnsBadValueWhenFCVFetcherReturnsNoVersion)1565 TEST_F(InitialSyncerTest, InitialSyncerReturnsBadValueWhenFCVFetcherReturnsNoVersion) {
1566     auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "targetVersion"
1567                             << FeatureCompatibilityVersionCommandParser::kVersion34)};
1568     runInitialSyncWithBadFCVResponse(docs, ErrorCodes::BadValue);
1569 }
1570 
TEST_F(InitialSyncerTest,InitialSyncerSucceedsWhenFCVFetcherReturnsOldVersion)1571 TEST_F(InitialSyncerTest, InitialSyncerSucceedsWhenFCVFetcherReturnsOldVersion) {
1572     auto initialSyncer = &getInitialSyncer();
1573     auto opCtx = makeOpCtx();
1574 
1575     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1576     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1577 
1578     auto net = getNet();
1579     {
1580         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1581 
1582         // Base rollback ID.
1583         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1584         net->runReadyNetworkOperations();
1585 
1586         // Last oplog entry.
1587         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1588 
1589         auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
1590                                 << FeatureCompatibilityVersionCommandParser::kVersion34)};
1591         processSuccessfulFCVFetcherResponse(docs);
1592         ASSERT_TRUE(net->hasReadyRequests());
1593     }
1594 
1595     // We shut it down so we do not have to finish initial sync. If the fCV fetcher got an error,
1596     // we would return that.
1597     ASSERT_OK(initialSyncer->shutdown());
1598     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
1599 
1600     initialSyncer->join();
1601     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
1602 }
1603 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughOplogFetcherScheduleError)1604 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherScheduleError) {
1605     auto initialSyncer = &getInitialSyncer();
1606     auto opCtx = makeOpCtx();
1607 
1608     // Make the tailable oplog query fail. Allow all other requests to be scheduled.
1609     executor::RemoteCommandRequest request;
1610     _executorProxy->shouldFailScheduleRemoteCommandRequest =
1611         [&request](const executor::RemoteCommandRequest& requestToSend) {
1612             if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
1613                 requestToSend.cmdObj.getBoolField("tailable")) {
1614                 request = requestToSend;
1615                 return true;
1616             }
1617             return false;
1618         };
1619 
1620     HostAndPort syncSource("localhost", 12345);
1621     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1622     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1623 
1624     auto net = getNet();
1625     {
1626         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1627 
1628         // Base rollback ID.
1629         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1630         net->runReadyNetworkOperations();
1631 
1632         // Last oplog entry.
1633         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1634         net->runReadyNetworkOperations();
1635 
1636         // Feature Compatibility Version.
1637         processSuccessfulFCVFetcherResponse36();
1638 
1639         // OplogFetcher will shut down DatabasesCloner on error after setting the completion status.
1640         // We call runReadyNetworkOperations() again to deliver the cancellation status to
1641         // _databasesClonerCallback().
1642         net->runReadyNetworkOperations();
1643     }
1644     initialSyncer->join();
1645     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1646 
1647     ASSERT_EQUALS(syncSource, request.target);
1648     ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname);
1649     assertRemoteCommandNameEquals("find", request);
1650     ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1651     ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
1652 }
1653 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughOplogFetcherCallbackError)1654 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) {
1655     auto initialSyncer = &getInitialSyncer();
1656     auto opCtx = makeOpCtx();
1657 
1658     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1659     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1660 
1661     auto net = getNet();
1662     {
1663         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1664 
1665         // Base rollback ID.
1666         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1667         net->runReadyNetworkOperations();
1668 
1669         // Last oplog entry.
1670         net->scheduleSuccessfulResponse(
1671             makeCursorResponse(0LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
1672         net->runReadyNetworkOperations();
1673 
1674         // Feature Compatibility Version.
1675         processSuccessfulFCVFetcherResponse36();
1676 
1677         assertRemoteCommandNameEquals(
1678             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
1679         net->runReadyNetworkOperations();
1680 
1681         // Oplog tailing query.
1682         auto request = assertRemoteCommandNameEquals(
1683             "find", net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "dead cursor")));
1684         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1685         net->runReadyNetworkOperations();
1686 
1687 
1688         // OplogFetcher will shut down DatabasesCloner on error after setting the completion status.
1689         // We call runReadyNetworkOperations() again to deliver the cancellation status to
1690         // _databasesClonerCallback().
1691         net->runReadyNetworkOperations();
1692     }
1693 
1694     initialSyncer->join();
1695     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1696 }
1697 
TEST_F(InitialSyncerTest,InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply)1698 TEST_F(InitialSyncerTest,
1699        InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) {
1700     auto initialSyncer = &getInitialSyncer();
1701     auto opCtx = makeOpCtx();
1702 
1703     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1704     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1705 
1706     auto net = getNet();
1707     {
1708         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1709 
1710         // Base rollback ID.
1711         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1712         net->runReadyNetworkOperations();
1713 
1714         // Last oplog entry.
1715         auto request =
1716             assertRemoteCommandNameEquals("find",
1717                                           net->scheduleSuccessfulResponse(makeCursorResponse(
1718                                               0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
1719         ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
1720         net->runReadyNetworkOperations();
1721 
1722         // Feature Compatibility Version.
1723         processSuccessfulFCVFetcherResponse36();
1724 
1725         assertRemoteCommandNameEquals(
1726             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
1727         net->runReadyNetworkOperations();
1728 
1729         // Oplog tailing query.
1730         // Simulate cursor closing on sync source.
1731         request =
1732             assertRemoteCommandNameEquals("find",
1733                                           net->scheduleSuccessfulResponse(makeCursorResponse(
1734                                               0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
1735         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1736         net->runReadyNetworkOperations();
1737 
1738         // Second last oplog entry fetcher.
1739         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1740 
1741         // Last rollback checker replSetGetRBID command.
1742         assertRemoteCommandNameEquals(
1743             "replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)));
1744         net->runReadyNetworkOperations();
1745     }
1746 
1747     initialSyncer->join();
1748     ASSERT_EQUALS(makeOplogEntry(1).getOpTime(), unittest::assertGet(_lastApplied).opTime);
1749 }
1750 
TEST_F(InitialSyncerTest,InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp)1751 TEST_F(
1752     InitialSyncerTest,
1753     InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
1754     auto initialSyncer = &getInitialSyncer();
1755     auto opCtx = makeOpCtx();
1756 
1757     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1758     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1759 
1760     auto net = getNet();
1761     {
1762         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1763 
1764         // Base rollback ID.
1765         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1766         net->runReadyNetworkOperations();
1767 
1768         // Last oplog entry.
1769         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1770 
1771         // Feature Compatibility Version.
1772         processSuccessfulFCVFetcherResponse36();
1773 
1774         assertRemoteCommandNameEquals(
1775             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
1776         net->runReadyNetworkOperations();
1777 
1778         // Oplog tailing query.
1779         // Simulate cursor closing on sync source.
1780         auto request =
1781             assertRemoteCommandNameEquals("find",
1782                                           net->scheduleSuccessfulResponse(makeCursorResponse(
1783                                               0LL,
1784                                               _options.localOplogNS,
1785                                               {makeOplogEntryObj(1),
1786                                                makeOplogEntryObj(2, OpTypeEnum::kCommand),
1787                                                makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
1788         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1789         net->runReadyNetworkOperations();
1790 
1791         // Second last oplog entry fetcher.
1792         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(3)});
1793 
1794         // Last rollback checker replSetGetRBID command.
1795         assertRemoteCommandNameEquals(
1796             "replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)));
1797         net->runReadyNetworkOperations();
1798     }
1799 
1800     initialSyncer->join();
1801     ASSERT_EQUALS(makeOplogEntry(3).getOpTime(), unittest::assertGet(_lastApplied).opTime);
1802 }
1803 
TEST_F(InitialSyncerTest,InitialSyncerReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp)1804 TEST_F(
1805     InitialSyncerTest,
1806     InitialSyncerReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
1807     auto initialSyncer = &getInitialSyncer();
1808     auto opCtx = makeOpCtx();
1809 
1810     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1811     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1812 
1813     auto net = getNet();
1814     {
1815         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1816 
1817         // Base rollback ID.
1818         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1819         net->runReadyNetworkOperations();
1820 
1821         // Last oplog entry.
1822         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1823 
1824         // Feature Compatibility Version.
1825         processSuccessfulFCVFetcherResponse36();
1826 
1827         assertRemoteCommandNameEquals(
1828             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
1829         net->runReadyNetworkOperations();
1830 
1831         // Oplog tailing query.
1832         // Simulate cursor closing on sync source.
1833         auto request =
1834             assertRemoteCommandNameEquals("find",
1835                                           net->scheduleSuccessfulResponse(makeCursorResponse(
1836                                               0LL,
1837                                               _options.localOplogNS,
1838                                               {makeOplogEntryObj(1),
1839                                                makeOplogEntryObj(2, OpTypeEnum::kCommand),
1840                                                makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
1841         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1842         net->runReadyNetworkOperations();
1843 
1844         // Second last oplog entry fetcher.
1845         // Return an oplog entry with an optime that is more recent than what the completed
1846         // OplogFetcher has read from the sync source.
1847         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(4)});
1848     }
1849 
1850     initialSyncer->join();
1851     ASSERT_EQUALS(ErrorCodes::RemoteResultsUnavailable, _lastApplied);
1852 }
1853 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher)1854 TEST_F(InitialSyncerTest,
1855        InitialSyncerPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher) {
1856     auto initialSyncer = &getInitialSyncer();
1857     auto opCtx = makeOpCtx();
1858 
1859     // Make the listDatabases command fail. Allow all other requests to be scheduled.
1860     executor::RemoteCommandRequest request;
1861     _executorProxy->shouldFailScheduleRemoteCommandRequest =
1862         [&request](const executor::RemoteCommandRequest& requestToSend) {
1863             if ("listDatabases" == requestToSend.cmdObj.firstElement().fieldNameStringData()) {
1864                 request = requestToSend;
1865                 return true;
1866             }
1867             return false;
1868         };
1869 
1870     HostAndPort syncSource("localhost", 12345);
1871     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1872     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1873 
1874     auto net = getNet();
1875     {
1876         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1877 
1878         // Base rollback ID.
1879         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1880         net->runReadyNetworkOperations();
1881 
1882         // Last oplog entry.
1883         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1884 
1885         // Feature Compatibility Version.
1886         processSuccessfulFCVFetcherResponse36();
1887 
1888         // InitialSyncer shuts down OplogFetcher when it fails to schedule DatabasesCloner
1889         // so we should not expect any network requests in the queue.
1890         ASSERT_FALSE(net->hasReadyRequests());
1891 
1892         // OplogFetcher is shutting down but we still need to call runReadyNetworkOperations()
1893         // to deliver the cancellation status to the 'InitialSyncer::_oplogFetcherCallback'
1894         // callback.
1895         net->runReadyNetworkOperations();
1896     }
1897 
1898     initialSyncer->join();
1899     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1900 
1901     ASSERT_EQUALS(syncSource, request.target);
1902     ASSERT_EQUALS("admin", request.dbname);
1903     assertRemoteCommandNameEquals("listDatabases", request);
1904 }
1905 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher)1906 TEST_F(InitialSyncerTest,
1907        InitialSyncerPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher) {
1908     auto initialSyncer = &getInitialSyncer();
1909     auto opCtx = makeOpCtx();
1910 
1911     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1912     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1913 
1914     auto net = getNet();
1915     {
1916         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1917 
1918         // Base rollback ID.
1919         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1920         net->runReadyNetworkOperations();
1921 
1922         // Last oplog entry.
1923         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1924 
1925         // Feature Compatibility Version.
1926         processSuccessfulFCVFetcherResponse36();
1927 
1928         // DatabasesCloner's first remote command - listDatabases
1929         assertRemoteCommandNameEquals(
1930             "listDatabases",
1931             net->scheduleErrorResponse(Status(ErrorCodes::FailedToParse, "listDatabases failed")));
1932         net->runReadyNetworkOperations();
1933 
1934         // DatabasesCloner will shut down OplogFetcher on error after setting the completion status.
1935         // We call runReadyNetworkOperations() again to deliver the cancellation status to
1936         // _oplogFetcherCallback().
1937         net->runReadyNetworkOperations();
1938     }
1939 
1940     initialSyncer->join();
1941     ASSERT_EQUALS(ErrorCodes::FailedToParse, _lastApplied);
1942 }
1943 
TEST_F(InitialSyncerTest,InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases)1944 TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases) {
1945     auto initialSyncer = &getInitialSyncer();
1946     auto opCtx = makeOpCtx();
1947 
1948     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1949     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1950 
1951     auto net = getNet();
1952     {
1953         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1954 
1955         // Base rollback ID.
1956         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1957         net->runReadyNetworkOperations();
1958 
1959         // Last oplog entry.
1960         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1961 
1962         // Feature Compatibility Version.
1963         processSuccessfulFCVFetcherResponse36();
1964 
1965         // DatabasesCloner's first remote command - listDatabases
1966         assertRemoteCommandNameEquals(
1967             "listDatabases",
1968             net->scheduleSuccessfulResponse(makeListDatabasesResponse({"a", "local", "b"})));
1969         net->runReadyNetworkOperations();
1970 
1971         // Oplog tailing query.
1972         auto noi = net->getNextReadyRequest();
1973         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
1974         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1975         net->blackHole(noi);
1976 
1977         // DatabasesCloner should only send listCollections requests for databases 'a' and 'b'.
1978         request = assertRemoteCommandNameEquals(
1979             "listCollections",
1980             net->scheduleSuccessfulResponse(
1981                 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {})));
1982         ASSERT_EQUALS("a", request.dbname);
1983 
1984         request = assertRemoteCommandNameEquals(
1985             "listCollections",
1986             net->scheduleSuccessfulResponse(
1987                 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
1988         ASSERT_EQUALS("b", request.dbname);
1989 
1990         // After processing all the database names and returning empty lists of collections for each
1991         // database, data cloning should run to completion and we should expect to see a last oplog
1992         // entry fetcher request.
1993         request = assertRemoteCommandNameEquals(
1994             "find",
1995             net->scheduleSuccessfulResponse(
1996                 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
1997         ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
1998     }
1999 
2000     getExecutor().shutdown();
2001 
2002     initialSyncer->join();
2003     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2004 }
2005 
TEST_F(InitialSyncerTest,InitialSyncerIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases)2006 TEST_F(InitialSyncerTest,
2007        InitialSyncerIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases) {
2008     auto initialSyncer = &getInitialSyncer();
2009     auto opCtx = makeOpCtx();
2010 
2011     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2012     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2013 
2014     auto net = getNet();
2015     {
2016         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2017 
2018         // Base rollback ID.
2019         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2020         net->runReadyNetworkOperations();
2021 
2022         // Last oplog entry.
2023         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2024 
2025         // Feature Compatibility Version.
2026         processSuccessfulFCVFetcherResponse36();
2027 
2028         // DatabasesCloner's first remote command - listDatabases
2029         assertRemoteCommandNameEquals(
2030             "listDatabases",
2031             net->scheduleSuccessfulResponse(BSON("databases" << BSON_ARRAY(BSON("name"
2032                                                                                 << "a")
2033                                                                            << BSON("bad"
2034                                                                                    << "dbinfo")
2035                                                                            << BSON("name"
2036                                                                                    << "b"))
2037                                                              << "ok"
2038                                                              << 1)));
2039         net->runReadyNetworkOperations();
2040 
2041         // Oplog tailing query.
2042         auto noi = net->getNextReadyRequest();
2043         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2044         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2045         net->blackHole(noi);
2046 
2047         // DatabasesCloner should only send listCollections requests for databases 'a' and 'b'.
2048         request = assertRemoteCommandNameEquals(
2049             "listCollections",
2050             net->scheduleSuccessfulResponse(
2051                 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {})));
2052         ASSERT_EQUALS("a", request.dbname);
2053 
2054         request = assertRemoteCommandNameEquals(
2055             "listCollections",
2056             net->scheduleSuccessfulResponse(
2057                 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
2058         ASSERT_EQUALS("b", request.dbname);
2059 
2060         // After processing all the database names and returning empty lists of collections for each
2061         // database, data cloning should run to completion and we should expect to see a last oplog
2062         // entry fetcher request.
2063         request = assertRemoteCommandNameEquals(
2064             "find",
2065             net->scheduleSuccessfulResponse(
2066                 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
2067         ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
2068     }
2069 
2070     getExecutor().shutdown();
2071 
2072     initialSyncer->join();
2073     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2074 }
2075 
TEST_F(InitialSyncerTest,InitialSyncerCancelsBothOplogFetcherAndDatabasesClonerOnShutdown)2076 TEST_F(InitialSyncerTest, InitialSyncerCancelsBothOplogFetcherAndDatabasesClonerOnShutdown) {
2077     auto initialSyncer = &getInitialSyncer();
2078     auto opCtx = makeOpCtx();
2079 
2080     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2081     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2082 
2083     auto net = getNet();
2084     {
2085         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2086 
2087         // Base rollback ID.
2088         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2089         net->runReadyNetworkOperations();
2090 
2091         // Last oplog entry.
2092         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2093 
2094         // Feature Compatibility Version.
2095         processSuccessfulFCVFetcherResponse36();
2096     }
2097 
2098     ASSERT_OK(initialSyncer->shutdown());
2099     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
2100 
2101     initialSyncer->join();
2102     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2103 }
2104 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher)2105 TEST_F(InitialSyncerTest,
2106        InitialSyncerPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher) {
2107     auto initialSyncer = &getInitialSyncer();
2108     auto opCtx = makeOpCtx();
2109 
2110     // Make the second last oplog entry fetcher command fail. Allow all other requests to be
2111     // scheduled.
2112     executor::RemoteCommandRequest request;
2113     bool first = true;
2114     _executorProxy->shouldFailScheduleRemoteCommandRequest =
2115         [&first, &request](const executor::RemoteCommandRequest& requestToSend) {
2116             if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
2117                 requestToSend.cmdObj.hasField("sort") &&
2118                 1 == requestToSend.cmdObj.getIntField("limit")) {
2119                 if (first) {
2120                     first = false;
2121                     return false;
2122                 }
2123                 request = requestToSend;
2124                 return true;
2125             }
2126             return false;
2127         };
2128 
2129     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2130     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2131 
2132     auto net = getNet();
2133     {
2134         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2135 
2136         // Base rollback ID.
2137         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2138         net->runReadyNetworkOperations();
2139 
2140         // Last oplog entry.
2141         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2142 
2143         // Feature Compatibility Version.
2144         processSuccessfulFCVFetcherResponse36();
2145 
2146         // Quickest path to a successful DatabasesCloner completion is to respond to the
2147         // listDatabases with an empty list of database names.
2148         assertRemoteCommandNameEquals(
2149             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2150         net->runReadyNetworkOperations();
2151 
2152         // DatabasesCloner will shut down the OplogFetcher on failing to schedule the last entry
2153         // oplog fetcher after setting the completion status.
2154         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2155         // _oplogFetcherCallback().
2156         net->runReadyNetworkOperations();
2157     }
2158 
2159     initialSyncer->join();
2160     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2161 }
2162 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher)2163 TEST_F(InitialSyncerTest,
2164        InitialSyncerPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher) {
2165     auto initialSyncer = &getInitialSyncer();
2166     auto opCtx = makeOpCtx();
2167 
2168     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2169     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2170 
2171     auto net = getNet();
2172     {
2173         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2174 
2175         // Base rollback ID.
2176         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2177         net->runReadyNetworkOperations();
2178 
2179         // Last oplog entry.
2180         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2181 
2182         // Feature Compatibility Version.
2183         processSuccessfulFCVFetcherResponse36();
2184 
2185         // Quickest path to a successful DatabasesCloner completion is to respond to the
2186         // listDatabases with an empty list of database names.
2187         assertRemoteCommandNameEquals(
2188             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2189         net->runReadyNetworkOperations();
2190 
2191         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2192         // on to the DatabasesCloner's request.
2193         auto noi = net->getNextReadyRequest();
2194         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2195         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2196         net->blackHole(noi);
2197 
2198         // Second last oplog entry fetcher.
2199         request = assertRemoteCommandNameEquals(
2200             "find",
2201             net->scheduleErrorResponse(
2202                 Status(ErrorCodes::OperationFailed, "second last oplog entry fetcher failed")));
2203         ASSERT_TRUE(request.cmdObj.hasField("sort"));
2204         ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
2205         net->runReadyNetworkOperations();
2206 
2207         // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2208         // setting the completion status.
2209         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2210         // _oplogFetcherCallback().
2211         net->runReadyNetworkOperations();
2212     }
2213 
2214     initialSyncer->join();
2215     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2216 }
2217 
TEST_F(InitialSyncerTest,InitialSyncerCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown)2218 TEST_F(InitialSyncerTest,
2219        InitialSyncerCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown) {
2220     auto initialSyncer = &getInitialSyncer();
2221     auto opCtx = makeOpCtx();
2222 
2223     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2224     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2225 
2226     auto net = getNet();
2227     {
2228         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2229 
2230         // Base rollback ID.
2231         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2232         net->runReadyNetworkOperations();
2233 
2234         // Last oplog entry.
2235         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2236 
2237         // Feature Compatibility Version.
2238         processSuccessfulFCVFetcherResponse36();
2239 
2240         // Quickest path to a successful DatabasesCloner completion is to respond to the
2241         // listDatabases with an empty list of database names.
2242         auto request = assertRemoteCommandNameEquals(
2243             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2244         net->runReadyNetworkOperations();
2245 
2246         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2247         // on to the DatabasesCloner's request.
2248         auto noi = net->getNextReadyRequest();
2249         request = assertRemoteCommandNameEquals("find", noi->getRequest());
2250         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2251         net->blackHole(noi);
2252 
2253         // Second last oplog entry fetcher.
2254         noi = net->getNextReadyRequest();
2255         request = assertRemoteCommandNameEquals("find", noi->getRequest());
2256         ASSERT_TRUE(request.cmdObj.hasField("sort"));
2257         ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
2258         net->blackHole(noi);
2259     }
2260 
2261     initialSyncer->shutdown().transitional_ignore();
2262     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
2263 
2264     initialSyncer->join();
2265     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2266 }
2267 
TEST_F(InitialSyncerTest,InitialSyncerCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError)2268 TEST_F(InitialSyncerTest,
2269        InitialSyncerCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError) {
2270     auto initialSyncer = &getInitialSyncer();
2271     auto opCtx = makeOpCtx();
2272 
2273     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2274     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2275 
2276     auto net = getNet();
2277     {
2278         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2279 
2280         // Base rollback ID.
2281         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2282         net->runReadyNetworkOperations();
2283 
2284         // Last oplog entry.
2285         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2286 
2287         // Feature Compatibility Version.
2288         processSuccessfulFCVFetcherResponse36();
2289 
2290         // Quickest path to a successful DatabasesCloner completion is to respond to the
2291         // listDatabases with an empty list of database names.
2292         assertRemoteCommandNameEquals(
2293             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2294         net->runReadyNetworkOperations();
2295 
2296         // Save request for OplogFetcher's oplog tailing query. This request will be canceled.
2297         auto noi = net->getNextReadyRequest();
2298         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2299         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
2300         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2301         auto oplogFetcherNetworkOperationIterator = noi;
2302 
2303         // Second last oplog entry fetcher.
2304         // Blackhole this request which will be canceled when oplog fetcher fails.
2305         noi = net->getNextReadyRequest();
2306         request = assertRemoteCommandNameEquals("find", noi->getRequest());
2307         ASSERT_TRUE(request.cmdObj.hasField("sort"));
2308         ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
2309         net->blackHole(noi);
2310 
2311         // Make oplog fetcher fail.
2312         net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator,
2313                                    Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
2314         net->runReadyNetworkOperations();
2315 
2316         // _oplogFetcherCallback() will shut down the '_lastOplogEntryFetcher' after setting the
2317         // completion status.
2318         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2319         // _lastOplogEntryFetcherCallbackAfterCloningData().
2320         net->runReadyNetworkOperations();
2321     }
2322 
2323     initialSyncer->join();
2324     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2325 }
2326 
TEST_F(InitialSyncerTest,InitialSyncerReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument)2327 TEST_F(
2328     InitialSyncerTest,
2329     InitialSyncerReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument) {
2330     auto initialSyncer = &getInitialSyncer();
2331     auto opCtx = makeOpCtx();
2332 
2333     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2334     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2335 
2336     auto oplogEntry = makeOplogEntryObj(1);
2337     auto net = getNet();
2338     {
2339         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2340 
2341         // Base rollback ID.
2342         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2343         net->runReadyNetworkOperations();
2344 
2345         // Last oplog entry.
2346         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2347 
2348         // Feature Compatibility Version.
2349         processSuccessfulFCVFetcherResponse36();
2350 
2351         // Quickest path to a successful DatabasesCloner completion is to respond to the
2352         // listDatabases with an empty list of database names.
2353         assertRemoteCommandNameEquals(
2354             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2355         net->runReadyNetworkOperations();
2356 
2357         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2358         // on to the DatabasesCloner's request.
2359         auto noi = net->getNextReadyRequest();
2360         auto request = noi->getRequest();
2361         assertRemoteCommandNameEquals("find", request);
2362         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2363         net->blackHole(noi);
2364 
2365         // Second last oplog entry fetcher.
2366         processSuccessfulLastOplogEntryFetcherResponse({BSON("ts" << Timestamp(1) << "t" << 1 << "h"
2367                                                                   << "not a hash")});
2368 
2369         // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2370         // setting the completion status.
2371         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2372         // _oplogFetcherCallback().
2373         net->runReadyNetworkOperations();
2374     }
2375 
2376     initialSyncer->join();
2377     ASSERT_EQUALS(ErrorCodes::TypeMismatch, _lastApplied);
2378 }
2379 
TEST_F(InitialSyncerTest,InitialSyncerReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp)2380 TEST_F(InitialSyncerTest,
2381        InitialSyncerReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp) {
2382     auto initialSyncer = &getInitialSyncer();
2383     auto opCtx = makeOpCtx();
2384 
2385     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2386     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2387 
2388     auto net = getNet();
2389     {
2390         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2391 
2392         // Base rollback ID.
2393         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2394         net->runReadyNetworkOperations();
2395 
2396         // Last oplog entry.
2397         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
2398 
2399         // Feature Compatibility Version.
2400         processSuccessfulFCVFetcherResponse36();
2401 
2402         // Quickest path to a successful DatabasesCloner completion is to respond to the
2403         // listDatabases with an empty list of database names.
2404         assertRemoteCommandNameEquals(
2405             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2406         net->runReadyNetworkOperations();
2407 
2408         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2409         // on to the DatabasesCloner's request.
2410         auto noi = net->getNextReadyRequest();
2411         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2412         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2413         net->blackHole(noi);
2414 
2415         // Second last oplog entry fetcher.
2416         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2417 
2418         // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2419         // setting the completion status.
2420         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2421         // _oplogFetcherCallback().
2422         net->runReadyNetworkOperations();
2423     }
2424 
2425     initialSyncer->join();
2426     ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
2427 }
2428 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply)2429 TEST_F(
2430     InitialSyncerTest,
2431     InitialSyncerPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply) {
2432     auto initialSyncer = &getInitialSyncer();
2433     auto opCtx = makeOpCtx();
2434 
2435     NamespaceString insertDocumentNss;
2436     TimestampedBSONObj insertDocumentDoc;
2437     long long insertDocumentTerm;
2438     _storageInterface->insertDocumentFn =
2439         [&insertDocumentDoc, &insertDocumentNss, &insertDocumentTerm](OperationContext*,
2440                                                                       const NamespaceString& nss,
2441                                                                       const TimestampedBSONObj& doc,
2442                                                                       long long term) {
2443             insertDocumentNss = nss;
2444             insertDocumentDoc = doc;
2445             insertDocumentTerm = term;
2446             return Status(ErrorCodes::OperationFailed, "failed to insert oplog entry");
2447         };
2448 
2449     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2450     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2451 
2452     auto oplogEntry = makeOplogEntryObj(1);
2453     auto net = getNet();
2454     {
2455         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2456 
2457         // Base rollback ID.
2458         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2459         net->runReadyNetworkOperations();
2460 
2461         // Last oplog entry.
2462         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2463 
2464         // Feature Compatibility Version.
2465         processSuccessfulFCVFetcherResponse36();
2466 
2467         // Quickest path to a successful DatabasesCloner completion is to respond to the
2468         // listDatabases with an empty list of database names.
2469         assertRemoteCommandNameEquals(
2470             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2471         net->runReadyNetworkOperations();
2472 
2473         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2474         // on to the DatabasesCloner's request.
2475         auto noi = net->getNextReadyRequest();
2476         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2477         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2478         net->blackHole(noi);
2479 
2480         // Second last oplog entry fetcher.
2481         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2482 
2483         // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2484         // setting the completion status.
2485         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2486         // _oplogFetcherCallback().
2487         net->runReadyNetworkOperations();
2488     }
2489 
2490     initialSyncer->join();
2491     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2492     ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
2493     ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc.obj);
2494 }
2495 
TEST_F(InitialSyncerTest,InitialSyncerReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument)2496 TEST_F(
2497     InitialSyncerTest,
2498     InitialSyncerReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument) {
2499     auto initialSyncer = &getInitialSyncer();
2500     auto opCtx = makeOpCtx();
2501 
2502     NamespaceString insertDocumentNss;
2503     TimestampedBSONObj insertDocumentDoc;
2504     long long insertDocumentTerm;
2505     _storageInterface->insertDocumentFn = [initialSyncer,
2506                                            &insertDocumentDoc,
2507                                            &insertDocumentNss,
2508                                            &insertDocumentTerm](OperationContext*,
2509                                                                 const NamespaceString& nss,
2510                                                                 const TimestampedBSONObj& doc,
2511                                                                 long long term) {
2512         insertDocumentNss = nss;
2513         insertDocumentDoc = doc;
2514         insertDocumentTerm = term;
2515         initialSyncer->shutdown().transitional_ignore();
2516         return Status::OK();
2517     };
2518 
2519     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2520     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2521 
2522     auto oplogEntry = makeOplogEntryObj(1);
2523     auto net = getNet();
2524     {
2525         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2526 
2527         // Base rollback ID.
2528         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2529         net->runReadyNetworkOperations();
2530 
2531         // Last oplog entry.
2532         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2533 
2534         // Feature Compatibility Version.
2535         processSuccessfulFCVFetcherResponse36();
2536 
2537         // Quickest path to a successful DatabasesCloner completion is to respond to the
2538         // listDatabases with an empty list of database names.
2539         assertRemoteCommandNameEquals(
2540             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2541         net->runReadyNetworkOperations();
2542 
2543         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2544         // on to the DatabasesCloner's request.
2545         auto noi = net->getNextReadyRequest();
2546         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2547         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2548         net->blackHole(noi);
2549 
2550         // Second last oplog entry fetcher.
2551         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2552 
2553         // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2554         // setting the completion status.
2555         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2556         // _oplogFetcherCallback().
2557         net->runReadyNetworkOperations();
2558     }
2559 
2560     initialSyncer->join();
2561     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2562     ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
2563     ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc.obj);
2564 }
2565 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply)2566 TEST_F(
2567     InitialSyncerTest,
2568     InitialSyncerPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply) {
2569     auto initialSyncer = &getInitialSyncer();
2570     auto opCtx = makeOpCtx();
2571 
2572     // Make the second replSetGetRBID command fail. Allow all other requests to be scheduled.
2573     executor::RemoteCommandRequest request;
2574     bool first = true;
2575     _executorProxy->shouldFailScheduleRemoteCommandRequest =
2576         [&first, &request](const executor::RemoteCommandRequest& requestToSend) {
2577             if ("replSetGetRBID" == requestToSend.cmdObj.firstElement().fieldNameStringData()) {
2578                 if (first) {
2579                     first = false;
2580                     return false;
2581                 }
2582                 request = requestToSend;
2583                 return true;
2584             }
2585             return false;
2586         };
2587 
2588     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2589     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2590 
2591     auto oplogEntry = makeOplogEntryObj(1);
2592     auto net = getNet();
2593     {
2594         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2595 
2596         // Base rollback ID.
2597         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2598         net->runReadyNetworkOperations();
2599 
2600         // Last oplog entry.
2601         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2602 
2603         // Feature Compatibility Version.
2604         processSuccessfulFCVFetcherResponse36();
2605 
2606         // Quickest path to a successful DatabasesCloner completion is to respond to the
2607         // listDatabases with an empty list of database names.
2608         assertRemoteCommandNameEquals(
2609             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2610         net->runReadyNetworkOperations();
2611 
2612         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2613         // on to the DatabasesCloner's request.
2614         auto noi = net->getNextReadyRequest();
2615         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2616         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2617         net->blackHole(noi);
2618 
2619         // Second last oplog entry fetcher.
2620         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2621 
2622         // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2623         // setting the completion status.
2624         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2625         // _oplogFetcherCallback().
2626         net->runReadyNetworkOperations();
2627     }
2628 
2629     initialSyncer->join();
2630     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2631 }
2632 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply)2633 TEST_F(
2634     InitialSyncerTest,
2635     InitialSyncerPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply) {
2636     auto initialSyncer = &getInitialSyncer();
2637     auto opCtx = makeOpCtx();
2638 
2639     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2640     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2641 
2642     auto oplogEntry = makeOplogEntryObj(1);
2643     auto net = getNet();
2644     {
2645         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2646 
2647         // Base rollback ID.
2648         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2649         net->runReadyNetworkOperations();
2650 
2651         // Last oplog entry.
2652         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2653 
2654         // Feature Compatibility Version.
2655         processSuccessfulFCVFetcherResponse36();
2656 
2657         // Quickest path to a successful DatabasesCloner completion is to respond to the
2658         // listDatabases with an empty list of database names.
2659         assertRemoteCommandNameEquals(
2660             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2661         net->runReadyNetworkOperations();
2662 
2663         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2664         // on to the DatabasesCloner's request.
2665         auto noi = net->getNextReadyRequest();
2666         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2667         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2668         net->blackHole(noi);
2669 
2670         // Second last oplog entry fetcher.
2671         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2672 
2673         // Last rollback checker replSetGetRBID command.
2674         assertRemoteCommandNameEquals(
2675             "replSetGetRBID",
2676             net->scheduleErrorResponse(
2677                 Status(ErrorCodes::OperationFailed, "replSetGetRBID command failed")));
2678         net->runReadyNetworkOperations();
2679 
2680         // _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting
2681         // the completion status.
2682         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2683         // _oplogFetcherCallback().
2684         net->runReadyNetworkOperations();
2685     }
2686 
2687     initialSyncer->join();
2688     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2689 }
2690 
TEST_F(InitialSyncerTest,InitialSyncerCancelsLastRollbackCheckerOnShutdown)2691 TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) {
2692     auto initialSyncer = &getInitialSyncer();
2693     auto opCtx = makeOpCtx();
2694 
2695     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2696     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2697 
2698     auto oplogEntry = makeOplogEntryObj(1);
2699     auto net = getNet();
2700     {
2701         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2702 
2703         // Base rollback ID.
2704         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2705         net->runReadyNetworkOperations();
2706 
2707         // Last oplog entry.
2708         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2709 
2710         // Feature Compatibility Version.
2711         processSuccessfulFCVFetcherResponse36();
2712 
2713         // Quickest path to a successful DatabasesCloner completion is to respond to the
2714         // listDatabases with an empty list of database names.
2715         assertRemoteCommandNameEquals(
2716             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2717         net->runReadyNetworkOperations();
2718 
2719         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2720         // on to the DatabasesCloner's request.
2721         auto noi = net->getNextReadyRequest();
2722         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2723         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2724         net->blackHole(noi);
2725 
2726         // Second last oplog entry fetcher.
2727         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2728 
2729         // Last rollback checker replSetGetRBID command.
2730         noi = net->getNextReadyRequest();
2731         assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest());
2732         net->blackHole(noi);
2733 
2734         // _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting
2735         // the completion status.
2736         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2737         // _oplogFetcherCallback().
2738         net->runReadyNetworkOperations();
2739     }
2740 
2741     ASSERT_OK(initialSyncer->shutdown());
2742     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
2743 
2744     initialSyncer->join();
2745     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2746 }
2747 
TEST_F(InitialSyncerTest,InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherCallbackError)2748 TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherCallbackError) {
2749     auto initialSyncer = &getInitialSyncer();
2750     auto opCtx = makeOpCtx();
2751 
2752     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2753     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2754 
2755     auto oplogEntry = makeOplogEntryObj(1);
2756     auto net = getNet();
2757     {
2758         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2759 
2760         // Base rollback ID.
2761         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2762         net->runReadyNetworkOperations();
2763 
2764         // Last oplog entry.
2765         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2766 
2767         // Feature Compatibility Version.
2768         processSuccessfulFCVFetcherResponse36();
2769 
2770         // Quickest path to a successful DatabasesCloner completion is to respond to the
2771         // listDatabases with an empty list of database names.
2772         assertRemoteCommandNameEquals(
2773             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2774         net->runReadyNetworkOperations();
2775 
2776         // Save request for OplogFetcher's oplog tailing query. This request will be canceled.
2777         auto noi = net->getNextReadyRequest();
2778         auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2779         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
2780         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2781         auto oplogFetcherNetworkOperationIterator = noi;
2782 
2783         // Second last oplog entry fetcher.
2784         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2785 
2786         // Last rollback checker replSetGetRBID command.
2787         noi = net->getNextReadyRequest();
2788         request = noi->getRequest();
2789         assertRemoteCommandNameEquals("replSetGetRBID", request);
2790         net->blackHole(noi);
2791 
2792         // Make oplog fetcher fail.
2793         net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator,
2794                                    Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
2795         net->runReadyNetworkOperations();
2796 
2797         // _oplogFetcherCallback() will shut down the last rollback checker after setting the
2798         // completion status.
2799         // We call runReadyNetworkOperations() again to deliver the cancellation status to
2800         // _rollbackCheckerCheckForRollbackCallback().
2801         net->runReadyNetworkOperations();
2802     }
2803 
2804     initialSyncer->join();
2805     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2806 }
2807 
TEST_F(InitialSyncerTest,InitialSyncerReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData)2808 TEST_F(InitialSyncerTest,
2809        InitialSyncerReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData) {
2810     auto initialSyncer = &getInitialSyncer();
2811     auto opCtx = makeOpCtx();
2812 
2813     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2814     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2815 
2816     auto oplogEntry = makeOplogEntryObj(1);
2817     auto net = getNet();
2818     int baseRollbackId = 1;
2819     {
2820         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2821 
2822         // Base rollback ID.
2823         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
2824         net->runReadyNetworkOperations();
2825 
2826         // Last oplog entry.
2827         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2828 
2829         // Feature Compatibility Version.
2830         processSuccessfulFCVFetcherResponse36();
2831 
2832         // Quickest path to a successful DatabasesCloner completion is to respond to the
2833         // listDatabases with an empty list of database names.
2834         assertRemoteCommandNameEquals(
2835             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2836         net->runReadyNetworkOperations();
2837 
2838         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2839         // on to the DatabasesCloner's request.
2840         auto noi = net->getNextReadyRequest();
2841         auto request = noi->getRequest();
2842         assertRemoteCommandNameEquals("find", request);
2843         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2844         net->blackHole(noi);
2845 
2846         // Second last oplog entry fetcher.
2847         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2848 
2849         // Last rollback checker replSetGetRBID command.
2850         request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId + 1));
2851         net->runReadyNetworkOperations();
2852         assertRemoteCommandNameEquals("replSetGetRBID", request);
2853         net->runReadyNetworkOperations();
2854     }
2855 
2856     initialSyncer->join();
2857     ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, _lastApplied);
2858 }
2859 
TEST_F(InitialSyncerTest,LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning)2860 TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
2861     auto initialSyncer = &getInitialSyncer();
2862     auto opCtx = makeOpCtx();
2863 
2864     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2865     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2866 
2867     ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
2868 
2869     auto oplogEntry = makeOplogEntry(1);
2870     auto net = getNet();
2871     int baseRollbackId = 1;
2872     {
2873         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2874 
2875         // Base rollback ID.
2876         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
2877         net->runReadyNetworkOperations();
2878 
2879         // Last oplog entry.
2880         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()});
2881 
2882         // Feature Compatibility Version.
2883         processSuccessfulFCVFetcherResponse36();
2884 
2885         // Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
2886         // database names, we'll simulate copying a single database with a single collection on the
2887         // sync source.
2888         NamespaceString nss("a.a");
2889         auto request =
2890             net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
2891         assertRemoteCommandNameEquals("listDatabases", request);
2892         net->runReadyNetworkOperations();
2893 
2894         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2895         // on to the DatabasesCloner's request.
2896         auto noi = net->getNextReadyRequest();
2897         request = noi->getRequest();
2898         assertRemoteCommandNameEquals("find", request);
2899         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2900         net->blackHole(noi);
2901 
2902         // listCollections for "a"
2903         request = net->scheduleSuccessfulResponse(
2904             makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
2905         assertRemoteCommandNameEquals("listCollections", request);
2906 
2907         // count:a
2908         request = assertRemoteCommandNameEquals(
2909             "count", net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1)));
2910         ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
2911         ASSERT_EQUALS(nss.db(), request.dbname);
2912 
2913         // listIndexes:a
2914         request = assertRemoteCommandNameEquals(
2915             "listIndexes",
2916             net->scheduleSuccessfulResponse(makeCursorResponse(
2917                 0LL,
2918                 NamespaceString(nss.getCommandNS()),
2919                 {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
2920                           << "_id_"
2921                           << "ns"
2922                           << nss.ns())})));
2923         ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
2924         ASSERT_EQUALS(nss.db(), request.dbname);
2925 
2926         // find:a
2927         request = assertRemoteCommandNameEquals("find",
2928                                                 net->scheduleSuccessfulResponse(makeCursorResponse(
2929                                                     0LL, nss, {BSON("_id" << 1 << "a" << 1)})));
2930         ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
2931         ASSERT_EQUALS(nss.db(), request.dbname);
2932 
2933         // Second last oplog entry fetcher.
2934         processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()});
2935 
2936         // Last rollback checker replSetGetRBID command.
2937         request = assertRemoteCommandNameEquals(
2938             "replSetGetRBID",
2939             net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)));
2940         net->runReadyNetworkOperations();
2941 
2942         // Deliver cancellation to OplogFetcher.
2943         net->runReadyNetworkOperations();
2944     }
2945 
2946     initialSyncer->join();
2947     ASSERT_EQUALS(oplogEntry.getOpTime(), unittest::assertGet(_lastApplied).opTime);
2948     ASSERT_EQUALS(oplogEntry.getHash(), unittest::assertGet(_lastApplied).value);
2949     ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
2950 }
2951 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughGetNextApplierBatchScheduleError)2952 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleError) {
2953     auto initialSyncer = &getInitialSyncer();
2954     auto opCtx = makeOpCtx();
2955 
2956     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2957     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2958 
2959     ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
2960 
2961     auto net = getNet();
2962     int baseRollbackId = 1;
2963     {
2964         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2965 
2966         // Base rollback ID.
2967         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
2968         net->runReadyNetworkOperations();
2969 
2970         // Last oplog entry.
2971         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2972 
2973         // Feature Compatibility Version.
2974         processSuccessfulFCVFetcherResponse36();
2975 
2976         // Quickest path to a successful DatabasesCloner completion is to respond to the
2977         // listDatabases with an empty list of database names.
2978         assertRemoteCommandNameEquals(
2979             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2980         net->runReadyNetworkOperations();
2981 
2982         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2983         // on to the DatabasesCloner's request.
2984         auto noi = net->getNextReadyRequest();
2985         auto request = noi->getRequest();
2986         assertRemoteCommandNameEquals("find", request);
2987         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2988         net->blackHole(noi);
2989 
2990         // Before processing scheduled last oplog entry fetcher response, set flag in
2991         // TaskExecutorMock so that InitialSyncer will fail to schedule
2992         // _getNextApplierBatchCallback().
2993         _executorProxy->shouldFailScheduleWorkRequest = []() { return true; };
2994 
2995         // Second last oplog entry fetcher.
2996         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
2997 
2998         // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2999         // setting the completion status.
3000         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3001         // _oplogFetcherCallback().
3002         net->runReadyNetworkOperations();
3003     }
3004 
3005     initialSyncer->join();
3006     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3007 }
3008 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughSecondGetNextApplierBatchScheduleError)3009 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchScheduleError) {
3010     auto initialSyncer = &getInitialSyncer();
3011     auto opCtx = makeOpCtx();
3012 
3013     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3014     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3015 
3016     ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3017 
3018     auto net = getNet();
3019     int baseRollbackId = 1;
3020     {
3021         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3022 
3023         // Base rollback ID.
3024         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3025         net->runReadyNetworkOperations();
3026 
3027         // Last oplog entry.
3028         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3029 
3030         // Feature Compatibility Version.
3031         processSuccessfulFCVFetcherResponse36();
3032 
3033         // Quickest path to a successful DatabasesCloner completion is to respond to the
3034         // listDatabases with an empty list of database names.
3035         assertRemoteCommandNameEquals(
3036             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3037         net->runReadyNetworkOperations();
3038 
3039         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
3040         // on to the DatabasesCloner's request.
3041         auto noi = net->getNextReadyRequest();
3042         auto request = noi->getRequest();
3043         assertRemoteCommandNameEquals("find", request);
3044         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
3045         net->blackHole(noi);
3046 
3047         // Before processing scheduled last oplog entry fetcher response, set flag in
3048         // TaskExecutorMock so that InitialSyncer will fail to schedule second
3049         // _getNextApplierBatchCallback() at (now + options.getApplierBatchCallbackRetryWait).
3050         _executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
3051 
3052         // Second last oplog entry fetcher.
3053         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3054 
3055         // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
3056         // setting the completion status.
3057         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3058         // _oplogFetcherCallback().
3059         net->runReadyNetworkOperations();
3060     }
3061 
3062     initialSyncer->join();
3063     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3064 }
3065 
TEST_F(InitialSyncerTest,InitialSyncerCancelsGetNextApplierBatchOnShutdown)3066 TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
3067     auto initialSyncer = &getInitialSyncer();
3068     auto opCtx = makeOpCtx();
3069 
3070     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3071     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3072 
3073     ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3074 
3075     auto net = getNet();
3076     int baseRollbackId = 1;
3077     {
3078         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3079 
3080         // Base rollback ID.
3081         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3082         net->runReadyNetworkOperations();
3083 
3084         // Last oplog entry.
3085         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3086 
3087         // Feature Compatibility Version.
3088         processSuccessfulFCVFetcherResponse36();
3089 
3090         // Quickest path to a successful DatabasesCloner completion is to respond to the
3091         // listDatabases with an empty list of database names.
3092         assertRemoteCommandNameEquals(
3093             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3094         net->runReadyNetworkOperations();
3095 
3096         // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
3097         // on to the DatabasesCloner's request.
3098         auto noi = net->getNextReadyRequest();
3099         auto request = noi->getRequest();
3100         assertRemoteCommandNameEquals("find", request);
3101         ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
3102         net->blackHole(noi);
3103 
3104         // Second last oplog entry fetcher.
3105         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3106 
3107         // Since we black holed OplogFetcher's find request, _getNextApplierBatch_inlock() will
3108         // not return any operations for us to apply, leading to _getNextApplierBatchCallback()
3109         // rescheduling itself at new->now() + _options.getApplierBatchCallbackRetryWait.
3110     }
3111 
3112     ASSERT_OK(initialSyncer->shutdown());
3113     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
3114 
3115     initialSyncer->join();
3116     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
3117 }
3118 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughGetNextApplierBatchInLockError)3119 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockError) {
3120     auto initialSyncer = &getInitialSyncer();
3121     auto opCtx = makeOpCtx();
3122 
3123     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3124     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3125 
3126     ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3127 
3128     // _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
3129     // version (not OplogEntry::kOplogVersion).
3130     auto oplogEntry = makeOplogEntryObj(1);
3131     auto oplogEntryWithInconsistentVersion =
3132         makeOplogEntryObj(2, OpTypeEnum::kInsert, OplogEntry::kOplogVersion + 100);
3133 
3134     auto net = getNet();
3135     int baseRollbackId = 1;
3136     {
3137         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3138 
3139         // Base rollback ID.
3140         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3141         net->runReadyNetworkOperations();
3142 
3143         // Last oplog entry.
3144         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3145 
3146         // Feature Compatibility Version.
3147         processSuccessfulFCVFetcherResponse36();
3148 
3149         // Quickest path to a successful DatabasesCloner completion is to respond to the
3150         // listDatabases with an empty list of database names.
3151         assertRemoteCommandNameEquals(
3152             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3153         net->runReadyNetworkOperations();
3154 
3155         // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
3156         // oplog buffer and processed by _getNextApplierBatch_inlock().
3157         auto request = assertRemoteCommandNameEquals(
3158             "find",
3159             net->scheduleSuccessfulResponse(makeCursorResponse(
3160                 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion})));
3161         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3162         net->runReadyNetworkOperations();
3163 
3164         // Second last oplog entry fetcher.
3165         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3166 
3167         // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
3168         // completion status.
3169         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3170         // _oplogFetcherCallback().
3171         net->runReadyNetworkOperations();
3172     }
3173 
3174     initialSyncer->join();
3175     ASSERT_EQUALS(ErrorCodes::BadValue, _lastApplied);
3176 }
3177 
TEST_F(InitialSyncerTest,InitialSyncerReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled)3178 TEST_F(
3179     InitialSyncerTest,
3180     InitialSyncerReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled) {
3181     auto initialSyncer = &getInitialSyncer();
3182     auto opCtx = makeOpCtx();
3183 
3184     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3185     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3186 
3187     ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3188 
3189     // _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
3190     // version (not OplogEntry::kOplogVersion).
3191     auto oplogEntry = makeOplogEntryObj(1);
3192     auto oplogEntryWithInconsistentVersion =
3193         makeOplogEntryObj(2, OpTypeEnum::kInsert, OplogEntry::kOplogVersion + 100);
3194 
3195     // Enable 'rsSyncApplyStop' so that _getNextApplierBatch_inlock() returns an empty batch of
3196     // operations instead of a batch containing an oplog entry with a bad version.
3197     auto failPoint = getGlobalFailPointRegistry()->getFailPoint("rsSyncApplyStop");
3198     failPoint->setMode(FailPoint::alwaysOn);
3199     ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
3200 
3201     auto net = getNet();
3202     int baseRollbackId = 1;
3203     {
3204         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3205 
3206         // Base rollback ID.
3207         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3208         net->runReadyNetworkOperations();
3209 
3210         // Last oplog entry.
3211         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3212 
3213         // Feature Compatibility Version.
3214         processSuccessfulFCVFetcherResponse36();
3215 
3216         // Quickest path to a successful DatabasesCloner completion is to respond to the
3217         // listDatabases with an empty list of database names.
3218         assertRemoteCommandNameEquals(
3219             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3220         net->runReadyNetworkOperations();
3221 
3222         // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
3223         // oplog buffer and processed by _getNextApplierBatch_inlock().
3224         auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
3225             1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion}));
3226         assertRemoteCommandNameEquals("find", request);
3227         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3228         net->runReadyNetworkOperations();
3229 
3230         // Second last oplog entry fetcher.
3231         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3232 
3233         // Since the 'rsSyncApplyStop' fail point is enabled, InitialSyncer will get an empty
3234         // batch of operations from _getNextApplierBatch_inlock() even though the oplog buffer
3235         // is not empty.
3236     }
3237 
3238     // If the fail point is not working, the initial sync status will be set to BadValue (due to the
3239     // bad oplog entry in the oplog buffer) and shutdown() will not be able to overwrite this status
3240     // with CallbackCanceled.
3241     // Otherwise, shutdown() will cancel both the OplogFetcher and the scheduled
3242     // _getNextApplierBatchCallback() task. The final initial sync status will be CallbackCanceled.
3243     ASSERT_OK(initialSyncer->shutdown());
3244     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
3245 
3246     initialSyncer->join();
3247     ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
3248 }
3249 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughMultiApplierScheduleError)3250 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
3251     auto initialSyncer = &getInitialSyncer();
3252     auto opCtx = makeOpCtx();
3253 
3254     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3255     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3256 
3257     ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3258 
3259     auto net = getNet();
3260     int baseRollbackId = 1;
3261     {
3262         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3263 
3264         // Base rollback ID.
3265         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3266         net->runReadyNetworkOperations();
3267 
3268         // Last oplog entry.
3269         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3270 
3271         // Feature Compatibility Version.
3272         processSuccessfulFCVFetcherResponse36();
3273 
3274         // Quickest path to a successful DatabasesCloner completion is to respond to the
3275         // listDatabases with an empty list of database names.
3276         assertRemoteCommandNameEquals(
3277             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3278         net->runReadyNetworkOperations();
3279 
3280         // OplogFetcher's oplog tailing query. Save for later.
3281         auto noi = net->getNextReadyRequest();
3282         auto request = noi->getRequest();
3283         assertRemoteCommandNameEquals("find", request);
3284         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3285         auto oplogFetcherNoi = noi;
3286 
3287         // Second last oplog entry fetcher.
3288         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3289 
3290         // _getNextApplierBatchCallback() should have rescheduled itself.
3291         // We'll insert some operations in the oplog buffer so that we'll attempt to schedule
3292         // MultiApplier next time _getNextApplierBatchCallback() runs.
3293         net->scheduleSuccessfulResponse(
3294             oplogFetcherNoi,
3295             makeCursorResponse(
3296                 1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
3297         net->runReadyNetworkOperations();
3298 
3299         // Ignore OplogFetcher's getMore request.
3300         noi = net->getNextReadyRequest();
3301         request = noi->getRequest();
3302         assertRemoteCommandNameEquals("getMore", request);
3303 
3304         // Make MultiApplier::startup() fail.
3305         _executorProxy->shouldFailScheduleWorkRequest = []() { return true; };
3306 
3307         // Advance clock until _getNextApplierBatchCallback() runs.
3308         auto when = net->now() + _options.getApplierBatchCallbackRetryWait;
3309         ASSERT_EQUALS(when, net->runUntil(when));
3310 
3311         // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
3312         // completion status.
3313         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3314         // _oplogFetcherCallback().
3315         net->runReadyNetworkOperations();
3316     }
3317 
3318     initialSyncer->join();
3319     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3320 }
3321 
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughMultiApplierCallbackError)3322 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) {
3323     auto initialSyncer = &getInitialSyncer();
3324     auto opCtx = makeOpCtx();
3325 
3326     getExternalState()->multiApplyFn =
3327         [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) {
3328             return Status(ErrorCodes::OperationFailed, "multiApply failed");
3329         };
3330     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3331     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3332 
3333     auto net = getNet();
3334     int baseRollbackId = 1;
3335     {
3336         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3337 
3338         // Base rollback ID.
3339         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3340         net->runReadyNetworkOperations();
3341 
3342         // Last oplog entry.
3343         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3344 
3345         // Feature Compatibility Version.
3346         processSuccessfulFCVFetcherResponse36();
3347 
3348         // Quickest path to a successful DatabasesCloner completion is to respond to the
3349         // listDatabases with an empty list of database names.
3350         assertRemoteCommandNameEquals(
3351             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3352         net->runReadyNetworkOperations();
3353 
3354         // OplogFetcher's oplog tailing query. Provide enough operations to trigger MultiApplier.
3355         auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
3356             1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
3357         assertRemoteCommandNameEquals("find", request);
3358         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3359         net->runReadyNetworkOperations();
3360 
3361         // Second last oplog entry fetcher.
3362         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3363 
3364         // _multiApplierCallback() will shut down the OplogFetcher after setting the completion
3365         // status.
3366         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3367         // _oplogFetcherCallback().
3368         net->runReadyNetworkOperations();
3369     }
3370 
3371     initialSyncer->join();
3372     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3373 }
3374 
TEST_F(InitialSyncerTest,InitialSyncerCancelsGetNextApplierBatchCallbackOnOplogFetcherError)3375 TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplogFetcherError) {
3376     auto initialSyncer = &getInitialSyncer();
3377     auto opCtx = makeOpCtx();
3378 
3379     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3380     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3381 
3382     auto net = getNet();
3383     int baseRollbackId = 1;
3384     {
3385         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3386 
3387         // Base rollback ID.
3388         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3389         net->runReadyNetworkOperations();
3390 
3391         // Last oplog entry.
3392         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3393 
3394         // Feature Compatibility Version.
3395         processSuccessfulFCVFetcherResponse36();
3396 
3397         // Quickest path to a successful DatabasesCloner completion is to respond to the
3398         // listDatabases with an empty list of database names.
3399         assertRemoteCommandNameEquals(
3400             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3401         net->runReadyNetworkOperations();
3402 
3403         // OplogFetcher's oplog tailing query. Save for later.
3404         auto noi = net->getNextReadyRequest();
3405         auto request = noi->getRequest();
3406         assertRemoteCommandNameEquals("find", request);
3407         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3408         auto oplogFetcherNoi = noi;
3409 
3410         // Second last oplog entry fetcher.
3411         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3412 
3413         // Send error to _oplogFetcherCallback().
3414         net->scheduleErrorResponse(oplogFetcherNoi,
3415                                    Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
3416 
3417         // _oplogFetcherCallback() will cancel the _getNextApplierBatchCallback() task after setting
3418         // the completion status.
3419         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3420         // _oplogFetcherCallback().
3421         net->runReadyNetworkOperations();
3422     }
3423 
3424     initialSyncer->join();
3425     ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3426 }
3427 
doInitialSyncWithOneBatch()3428 OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch() {
3429     auto initialSyncer = &getInitialSyncer();
3430     auto opCtx = makeOpCtx();
3431 
3432     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3433     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3434 
3435     auto lastOp = makeOplogEntry(2);
3436 
3437     auto net = getNet();
3438     int baseRollbackId = 1;
3439     {
3440         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3441 
3442         // Base rollback ID.
3443         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3444         net->runReadyNetworkOperations();
3445 
3446         // Last oplog entry.
3447         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3448 
3449         // Feature Compatibility Version.
3450         processSuccessfulFCVFetcherResponse36();
3451 
3452         // Quickest path to a successful DatabasesCloner completion is to respond to the
3453         // listDatabases with an empty list of database names.
3454         assertRemoteCommandNameEquals(
3455             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3456         net->runReadyNetworkOperations();
3457 
3458         // OplogFetcher's oplog tailing query. Response has enough operations to reach
3459         // end timestamp.
3460         auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
3461             1LL, _options.localOplogNS, {makeOplogEntryObj(1), lastOp.toBSON()}));
3462         assertRemoteCommandNameEquals("find", request);
3463         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3464         net->runReadyNetworkOperations();
3465 
3466         // Second last oplog entry fetcher.
3467         processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
3468 
3469         // Black hole OplogFetcher's getMore request.
3470         auto noi = net->getNextReadyRequest();
3471         request = noi->getRequest();
3472         assertRemoteCommandNameEquals("getMore", request);
3473         net->blackHole(noi);
3474 
3475         // Last rollback ID.
3476         request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3477         assertRemoteCommandNameEquals("replSetGetRBID", request);
3478         net->runReadyNetworkOperations();
3479 
3480         // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
3481         // the completion status.
3482         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3483         // _oplogFetcherCallback().
3484         net->runReadyNetworkOperations();
3485     }
3486 
3487     initialSyncer->join();
3488     return lastOp;
3489 }
3490 
doSuccessfulInitialSyncWithOneBatch()3491 void InitialSyncerTest::doSuccessfulInitialSyncWithOneBatch() {
3492     auto lastOp = doInitialSyncWithOneBatch();
3493     ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
3494     ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
3495 
3496     ASSERT_EQUALS(lastOp.getOpTime().getTimestamp(), _storageInterface->getInitialDataTimestamp());
3497 }
3498 
TEST_F(InitialSyncerTest,InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch)3499 TEST_F(InitialSyncerTest,
3500        InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) {
3501     // In this test, getCollectionUUID should not return a UUID. Hence,
3502     // upgradeUUIDSchemaVersionNonReplicated should not be called.
3503     doSuccessfulInitialSyncWithOneBatch();
3504 
3505     // Ensure upgradeUUIDSchemaVersionNonReplicated was not called.
3506     LockGuard lock(_storageInterfaceWorkDoneMutex);
3507     ASSERT_FALSE(_storageInterfaceWorkDone.schemaUpgraded);
3508 }
3509 
TEST_F(InitialSyncerTest,InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches)3510 TEST_F(InitialSyncerTest,
3511        InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) {
3512     auto initialSyncer = &getInitialSyncer();
3513     auto opCtx = makeOpCtx();
3514 
3515     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3516     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3517 
3518     // To make InitialSyncer apply multiple batches, we make the third and last operation a command
3519     // so that it will go into a separate batch from the second operation. First operation is the
3520     // last fetched entry before data cloning and is not applied.
3521     auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
3522 
3523     auto net = getNet();
3524     int baseRollbackId = 1;
3525     {
3526         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3527 
3528         // Base rollback ID.
3529         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3530         net->runReadyNetworkOperations();
3531 
3532         // Last oplog entry.
3533         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3534 
3535         // Feature Compatibility Version.
3536         processSuccessfulFCVFetcherResponse36();
3537 
3538         // Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
3539         // database names, we'll simulate copying a single database with a single collection on the
3540         // sync source.
3541         NamespaceString nss("a.a");
3542         auto request =
3543             net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
3544         assertRemoteCommandNameEquals("listDatabases", request);
3545         net->runReadyNetworkOperations();
3546 
3547         // OplogFetcher's oplog tailing query. Response has enough operations to reach
3548         // end timestamp.
3549         request = net->scheduleSuccessfulResponse(
3550             makeCursorResponse(1LL,
3551                                _options.localOplogNS,
3552                                {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
3553         assertRemoteCommandNameEquals("find", request);
3554         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3555         net->runReadyNetworkOperations();
3556 
3557         // listCollections for "a"
3558         request = net->scheduleSuccessfulResponse(
3559             makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
3560         assertRemoteCommandNameEquals("listCollections", request);
3561 
3562         // Black hole OplogFetcher's getMore request.
3563         auto noi = net->getNextReadyRequest();
3564         request = noi->getRequest();
3565         assertRemoteCommandNameEquals("getMore", request);
3566         net->blackHole(noi);
3567 
3568         // count:a
3569         request = net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1));
3570         assertRemoteCommandNameEquals("count", request);
3571         ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3572         ASSERT_EQUALS(nss.db(), request.dbname);
3573 
3574         // listIndexes:a
3575         request = net->scheduleSuccessfulResponse(makeCursorResponse(
3576             0LL,
3577             NamespaceString(nss.getCommandNS()),
3578             {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
3579                       << "_id_"
3580                       << "ns"
3581                       << nss.ns())}));
3582         assertRemoteCommandNameEquals("listIndexes", request);
3583         ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3584         ASSERT_EQUALS(nss.db(), request.dbname);
3585 
3586         // find:a
3587         request = net->scheduleSuccessfulResponse(
3588             makeCursorResponse(0LL, nss, {BSON("_id" << 1 << "a" << 1)}));
3589         assertRemoteCommandNameEquals("find", request);
3590         ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3591         ASSERT_EQUALS(nss.db(), request.dbname);
3592 
3593         // Second last oplog entry fetcher.
3594         processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
3595 
3596         // Last rollback ID.
3597         request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3598         assertRemoteCommandNameEquals("replSetGetRBID", request);
3599         net->runReadyNetworkOperations();
3600 
3601         // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
3602         // the completion status.
3603         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3604         // _oplogFetcherCallback().
3605         net->runReadyNetworkOperations();
3606     }
3607 
3608     initialSyncer->join();
3609     ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
3610     ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
3611 }
3612 
TEST_F(InitialSyncerTest,InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply)3613 TEST_F(
3614     InitialSyncerTest,
3615     InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) {
3616     auto initialSyncer = &getInitialSyncer();
3617     auto opCtx = makeOpCtx();
3618 
3619     // Override DataReplicatorExternalState::_multiInitialSyncApply() so that it will also fetch a
3620     // missing document.
3621     // This forces InitialSyncer to evaluate its end timestamp for applying operations after each
3622     // batch.
3623     getExternalState()->multiApplyFn = [](OperationContext*,
3624                                           const MultiApplier::Operations& ops,
3625                                           MultiApplier::ApplyOperationFn applyOperation) {
3626         // 'OperationPtr*' is ignored by our overridden _multiInitialSyncApply().
3627         applyOperation(nullptr).transitional_ignore();
3628         return ops.back().getOpTime();
3629     };
3630     bool fetchCountIncremented = false;
3631     getExternalState()->multiInitialSyncApplyFn = [&fetchCountIncremented](
3632         MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32* fetchCount) {
3633         if (!fetchCountIncremented) {
3634             fetchCount->addAndFetch(1);
3635             fetchCountIncremented = true;
3636         }
3637         return Status::OK();
3638     };
3639 
3640     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3641     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3642 
3643     // Use command for third and last operation to ensure we have two batches to apply.
3644     auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
3645 
3646     auto net = getNet();
3647     int baseRollbackId = 1;
3648     {
3649         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3650 
3651         // Base rollback ID.
3652         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3653         net->runReadyNetworkOperations();
3654 
3655         // Last oplog entry.
3656         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3657 
3658         // Feature Compatibility Version.
3659         processSuccessfulFCVFetcherResponse36();
3660 
3661         // Quickest path to a successful DatabasesCloner completion is to respond to the
3662         // listDatabases with an empty list of database names.
3663         assertRemoteCommandNameEquals(
3664             "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3665         net->runReadyNetworkOperations();
3666 
3667         // OplogFetcher's oplog tailing query. Response has enough operations to reach
3668         // end timestamp.
3669         auto request = net->scheduleSuccessfulResponse(
3670             makeCursorResponse(1LL,
3671                                _options.localOplogNS,
3672                                {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
3673         assertRemoteCommandNameEquals("find", request);
3674         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3675         net->runReadyNetworkOperations();
3676 
3677         // Second last oplog entry fetcher.
3678         // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
3679         // applying the first batch.
3680         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3681 
3682         // Black hole OplogFetcher's getMore request.
3683         auto noi = net->getNextReadyRequest();
3684         request = noi->getRequest();
3685         assertRemoteCommandNameEquals("getMore", request);
3686         net->blackHole(noi);
3687 
3688         // Third last oplog entry fetcher.
3689         processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
3690 
3691         // Last rollback ID.
3692         request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3693         assertRemoteCommandNameEquals("replSetGetRBID", request);
3694         net->runReadyNetworkOperations();
3695 
3696         // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
3697         // the completion status.
3698         // We call runReadyNetworkOperations() again to deliver the cancellation status to
3699         // _oplogFetcherCallback().
3700         net->runReadyNetworkOperations();
3701     }
3702 
3703     initialSyncer->join();
3704     ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
3705     ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
3706 
3707     ASSERT_TRUE(fetchCountIncremented);
3708 
3709     auto progress = initialSyncer->getInitialSyncProgress();
3710     log() << "Progress after failed initial sync attempt: " << progress;
3711     ASSERT_EQUALS(1, progress.getIntField("fetchedMissingDocs")) << progress;
3712 }
3713 
TEST_F(InitialSyncerTest,InitialSyncerReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled)3714 TEST_F(InitialSyncerTest,
3715        InitialSyncerReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) {
3716     auto initialSyncer = &getInitialSyncer();
3717     auto opCtx = makeOpCtx();
3718 
3719     // This fail point makes chooseSyncSourceCallback fail with an InvalidSyncSource error.
3720     auto failPoint = getGlobalFailPointRegistry()->getFailPoint("failInitialSyncWithBadHost");
3721     failPoint->setMode(FailPoint::alwaysOn);
3722     ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
3723 
3724     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3725     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3726 
3727     initialSyncer->join();
3728     ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied);
3729 }
3730 
TEST_F(InitialSyncerTest,OplogOutOfOrderOnOplogFetchFinish)3731 TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
3732     auto initialSyncer = &getInitialSyncer();
3733     auto opCtx = makeOpCtx();
3734 
3735     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3736     ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3737 
3738     auto net = getNet();
3739     int baseRollbackId = 1;
3740     {
3741         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3742 
3743         // Base rollback ID.
3744         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3745         net->runReadyNetworkOperations();
3746 
3747         // Last oplog entry.
3748         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3749 
3750         // Feature Compatibility Version.
3751         processSuccessfulFCVFetcherResponse36();
3752 
3753         // Ignore listDatabases request.
3754         auto noi = net->getNextReadyRequest();
3755         auto request = noi->getRequest();
3756         assertRemoteCommandNameEquals("listDatabases", request);
3757         net->blackHole(noi);
3758 
3759         // OplogFetcher's oplog tailing query.
3760         request = net->scheduleSuccessfulResponse(
3761             makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
3762         assertRemoteCommandNameEquals("find", request);
3763         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3764         net->runReadyNetworkOperations();
3765 
3766         // Ensure that OplogFetcher fails with an OplogOutOfOrder error by responding to the getMore
3767         // request with oplog entries containing the following timestamps (most recently processed
3768         // oplog entry has a timestamp of 1):
3769         //     (last=1), 5, 4
3770         request = net->scheduleSuccessfulResponse(makeCursorResponse(
3771             1LL, _options.localOplogNS, {makeOplogEntryObj(5), makeOplogEntryObj(4)}, false));
3772         assertRemoteCommandNameEquals("getMore", request);
3773         net->runReadyNetworkOperations();
3774 
3775         // Deliver cancellation signal to DatabasesCloner.
3776         net->runReadyNetworkOperations();
3777     }
3778 
3779     initialSyncer->join();
3780     ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
3781 }
3782 
TEST_F(InitialSyncerTest,GetInitialSyncProgressReturnsCorrectProgress)3783 TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
3784     auto initialSyncer = &getInitialSyncer();
3785     auto opCtx = makeOpCtx();
3786 
3787     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
3788     ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
3789 
3790     auto net = getNet();
3791     int baseRollbackId = 1;
3792 
3793     // Play first 2 responses to ensure initial syncer has started the oplog fetcher.
3794     {
3795         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3796 
3797         // Base rollback ID.
3798         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3799         net->runReadyNetworkOperations();
3800 
3801         // Last oplog entry.
3802         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3803 
3804         // Feature Compatibility Version.
3805         processSuccessfulFCVFetcherResponse36();
3806     }
3807 
3808     log() << "Done playing first failed response";
3809 
3810     auto progress = initialSyncer->getInitialSyncProgress();
3811     log() << "Progress after first failed response: " << progress;
3812     ASSERT_EQUALS(progress.nFields(), 8) << progress;
3813     ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress;
3814     ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
3815     ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
3816     ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
3817     ASSERT_BSONOBJ_EQ(progress.getObjectField("initialSyncAttempts"), BSONObj());
3818     ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
3819     ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
3820     ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
3821 
3822     // Play rest of the failed round of responses.
3823     {
3824         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3825 
3826         auto request = net->scheduleErrorResponse(
3827             Status(ErrorCodes::FailedToParse, "fail on clone -- listDBs injected failure"));
3828         assertRemoteCommandNameEquals("listDatabases", request);
3829         net->runReadyNetworkOperations();
3830 
3831         // Deliver cancellation to OplogFetcher
3832         net->runReadyNetworkOperations();
3833     }
3834 
3835     log() << "Done playing failed responses";
3836 
3837     // Play the first 2 responses of the successful round of responses to ensure that the
3838     // initial syncer starts the oplog fetcher.
3839     {
3840         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3841 
3842         auto when = net->now() + _options.initialSyncRetryWait;
3843         ASSERT_EQUALS(when, net->runUntil(when));
3844 
3845         // Base rollback ID.
3846         auto request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3847         assertRemoteCommandNameEquals("replSetGetRBID", request);
3848         net->runReadyNetworkOperations();
3849 
3850         // Last oplog entry.
3851         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3852 
3853         // Feature Compatibility Version.
3854         processSuccessfulFCVFetcherResponse36();
3855     }
3856 
3857     log() << "Done playing first successful response";
3858 
3859     progress = initialSyncer->getInitialSyncProgress();
3860     log() << "Progress after failure: " << progress;
3861     ASSERT_EQUALS(progress.nFields(), 8) << progress;
3862     ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
3863     ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
3864     ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
3865     ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
3866     ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
3867     ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
3868     ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
3869 
3870     BSONObj attempts = progress["initialSyncAttempts"].Obj();
3871     ASSERT_EQUALS(attempts.nFields(), 1) << attempts;
3872     BSONObj attempt0 = attempts["0"].Obj();
3873     ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
3874     ASSERT_EQUALS(attempt0.getStringField("status"),
3875                   std::string("FailedToParse: error cloning databases :: caused by :: fail on "
3876                               "clone -- listDBs injected failure"))
3877         << attempt0;
3878     ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
3879     ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
3880         << attempt0;
3881 
3882     // Play all but last of the successful round of responses.
3883     {
3884         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3885 
3886         // listDatabases
3887         NamespaceString nss("a.a");
3888         auto request =
3889             net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
3890         assertRemoteCommandNameEquals("listDatabases", request);
3891         net->runReadyNetworkOperations();
3892 
3893         // Ignore oplog tailing query.
3894         request = net->scheduleSuccessfulResponse(makeCursorResponse(1LL,
3895                                                                      _options.localOplogNS,
3896                                                                      {makeOplogEntryObj(1),
3897                                                                       makeOplogEntryObj(2),
3898                                                                       makeOplogEntryObj(3),
3899                                                                       makeOplogEntryObj(4),
3900                                                                       makeOplogEntryObj(5),
3901                                                                       makeOplogEntryObj(6),
3902                                                                       makeOplogEntryObj(7)}));
3903         assertRemoteCommandNameEquals("find", request);
3904         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3905         net->runReadyNetworkOperations();
3906 
3907         // listCollections for "a"
3908         request = net->scheduleSuccessfulResponse(
3909             makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
3910         assertRemoteCommandNameEquals("listCollections", request);
3911 
3912         auto noi = net->getNextReadyRequest();
3913         request = noi->getRequest();
3914         assertRemoteCommandNameEquals("getMore", request);
3915         net->blackHole(noi);
3916 
3917         // count:a
3918         request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1));
3919         assertRemoteCommandNameEquals("count", request);
3920         ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3921         ASSERT_EQUALS(nss.db(), request.dbname);
3922 
3923         // listIndexes:a
3924         request = net->scheduleSuccessfulResponse(makeCursorResponse(
3925             0LL,
3926             NamespaceString(nss.getCommandNS()),
3927             {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
3928                       << "_id_"
3929                       << "ns"
3930                       << nss.ns())}));
3931         assertRemoteCommandNameEquals("listIndexes", request);
3932         ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3933         ASSERT_EQUALS(nss.db(), request.dbname);
3934 
3935         // find:a - 5 batches
3936         for (int i = 1; i <= 5; ++i) {
3937             request = net->scheduleSuccessfulResponse(
3938                 makeCursorResponse(i < 5 ? 2LL : 0LL, nss, {BSON("_id" << i << "a" << i)}, i == 1));
3939             ASSERT_EQUALS(i == 1 ? "find" : "getMore",
3940                           request.cmdObj.firstElement().fieldNameStringData());
3941             net->runReadyNetworkOperations();
3942         }
3943 
3944         // Second last oplog entry fetcher.
3945         // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
3946         // applying the first batch.
3947         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(7)});
3948     }
3949     log() << "Done playing all but last successful response";
3950 
3951     progress = initialSyncer->getInitialSyncProgress();
3952     log() << "Progress after all but last successful response: " << progress;
3953     ASSERT_EQUALS(progress.nFields(), 9) << progress;
3954     ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
3955     ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
3956     ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
3957     ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
3958     ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
3959     ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
3960     // Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
3961     ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
3962     auto databasesProgress = progress.getObjectField("databases");
3963     ASSERT_EQUALS(1, databasesProgress.getIntField("databasesCloned")) << databasesProgress;
3964     auto dbProgress = databasesProgress.getObjectField("a");
3965     ASSERT_EQUALS(1, dbProgress.getIntField("collections")) << dbProgress;
3966     ASSERT_EQUALS(1, dbProgress.getIntField("clonedCollections")) << dbProgress;
3967     auto collectionProgress = dbProgress.getObjectField("a.a");
3968     ASSERT_EQUALS(
3969         5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsToCopyFieldName))
3970         << collectionProgress;
3971     ASSERT_EQUALS(
3972         5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsCopiedFieldName))
3973         << collectionProgress;
3974     ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress;
3975     ASSERT_EQUALS(5, collectionProgress.getIntField("fetchedBatches")) << collectionProgress;
3976 
3977     attempts = progress["initialSyncAttempts"].Obj();
3978     ASSERT_EQUALS(attempts.nFields(), 1) << progress;
3979     attempt0 = attempts["0"].Obj();
3980     ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
3981     ASSERT_EQUALS(attempt0.getStringField("status"),
3982                   std::string("FailedToParse: error cloning databases :: caused by :: fail on "
3983                               "clone -- listDBs injected failure"))
3984         << attempt0;
3985     ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
3986     ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
3987         << attempt0;
3988 
3989     // Play last successful response.
3990     {
3991         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3992 
3993         // Last rollback ID.
3994         assertRemoteCommandNameEquals(
3995             "replSetGetRBID",
3996             net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)));
3997         net->runReadyNetworkOperations();
3998 
3999         // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
4000         // the completion status.
4001         // We call runReadyNetworkOperations() again to deliver the cancellation status to
4002         // _oplogFetcherCallback().
4003         net->runReadyNetworkOperations();
4004     }
4005 
4006     log() << "waiting for initial sync to verify it completed OK";
4007     initialSyncer->join();
4008     ASSERT_EQUALS(makeOplogEntry(7).getOpTime(), unittest::assertGet(_lastApplied).opTime);
4009 
4010     progress = initialSyncer->getInitialSyncProgress();
4011     log() << "Progress at end: " << progress;
4012     ASSERT_EQUALS(progress.nFields(), 11) << progress;
4013     ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
4014     ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
4015     ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
4016     ASSERT_EQUALS(progress["initialSyncEnd"].type(), Date) << progress;
4017     ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
4018     ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
4019     ASSERT_EQUALS(progress["initialSyncElapsedMillis"].type(), NumberInt) << progress;
4020     ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
4021     // Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
4022     ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
4023 
4024     attempts = progress["initialSyncAttempts"].Obj();
4025     ASSERT_EQUALS(attempts.nFields(), 2) << attempts;
4026 
4027     attempt0 = attempts["0"].Obj();
4028     ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
4029     ASSERT_EQUALS(attempt0.getStringField("status"),
4030                   std::string("FailedToParse: error cloning databases :: caused by :: fail on "
4031                               "clone -- listDBs injected failure"))
4032         << attempt0;
4033     ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
4034     ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
4035         << attempt0;
4036 
4037     BSONObj attempt1 = attempts["1"].Obj();
4038     ASSERT_EQUALS(attempt1.nFields(), 3) << attempt1;
4039     ASSERT_EQUALS(attempt1.getStringField("status"), std::string("OK")) << attempt1;
4040     ASSERT_EQUALS(attempt1["durationMillis"].type(), NumberInt) << attempt1;
4041     ASSERT_EQUALS(attempt1.getStringField("syncSource"), std::string("localhost:27017"))
4042         << attempt1;
4043 }
4044 
TEST_F(InitialSyncerTest,GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExceedBsonLimit)4045 TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExceedBsonLimit) {
4046     auto initialSyncer = &getInitialSyncer();
4047     auto opCtx = makeOpCtx();
4048 
4049     _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
4050     ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
4051 
4052     const std::size_t numCollections = 200000U;
4053 
4054     auto net = getNet();
4055     int baseRollbackId = 1;
4056     {
4057         executor::NetworkInterfaceMock::InNetworkGuard guard(net);
4058 
4059         // Base rollback ID.
4060         net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
4061         net->runReadyNetworkOperations();
4062 
4063         // Last oplog entry.
4064         processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
4065 
4066         // Feature Compatibility Version.
4067         processSuccessfulFCVFetcherResponse36();
4068 
4069         // listDatabases
4070         NamespaceString nss("a.a");
4071         auto request =
4072             net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
4073         assertRemoteCommandNameEquals("listDatabases", request);
4074         net->runReadyNetworkOperations();
4075 
4076         // Ignore oplog tailing query.
4077         request = net->scheduleSuccessfulResponse(
4078             makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
4079         assertRemoteCommandNameEquals("find", request);
4080         ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
4081         net->runReadyNetworkOperations();
4082 
4083         // listCollections for "a"
4084         std::vector<BSONObj> collectionInfos;
4085         for (std::size_t i = 0; i < numCollections; ++i) {
4086             const std::string collName = str::stream() << "coll-" << i;
4087             collectionInfos.push_back(BSON("name" << collName << "options" << BSONObj()));
4088         }
4089         request = net->scheduleSuccessfulResponse(
4090             makeCursorResponse(0LL, nss.getCommandNS(), collectionInfos));
4091         assertRemoteCommandNameEquals("listCollections", request);
4092         net->runReadyNetworkOperations();
4093     }
4094 
4095     // This returns a valid document because we omit the cloner stats when they do not fit in a
4096     // BSON document.
4097     auto progress = initialSyncer->getInitialSyncProgress();
4098     ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
4099     ASSERT_FALSE(progress.hasField("databases")) << progress;
4100 
4101     // Initial sync will attempt to log stats again at shutdown in a callback, where it should not
4102     // terminate because we now return a valid stats document.
4103     ASSERT_OK(initialSyncer->shutdown());
4104 
4105     // Deliver cancellation signal to callbacks.
4106     executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
4107 
4108     initialSyncer->join();
4109 }
4110 
TEST_F(InitialSyncerTest,InitialSyncerUpdatesCollectionUUIDsIfgetCollectionUUIDReturnsUUID)4111 TEST_F(InitialSyncerTest, InitialSyncerUpdatesCollectionUUIDsIfgetCollectionUUIDReturnsUUID) {
4112     // Ensure getCollectionUUID returns a UUID. This should trigger a call to
4113     // upgradeUUIDSchemaVersionNonReplicated.
4114     {
4115         LockGuard lock(_storageInterfaceWorkDoneMutex);
4116         _storageInterfaceWorkDone.uuid = UUID::gen();
4117     }
4118     doSuccessfulInitialSyncWithOneBatch();
4119 
4120     // Ensure upgradeUUIDSchemaVersionNonReplicated was called.
4121     LockGuard lock(_storageInterfaceWorkDoneMutex);
4122     ASSERT_TRUE(_storageInterfaceWorkDone.schemaUpgraded);
4123 }
4124 
TEST_F(InitialSyncerTest,InitialSyncerCapturesGetCollectionUUIDError)4125 TEST_F(InitialSyncerTest, InitialSyncerCapturesGetCollectionUUIDError) {
4126     // Ensure getCollectionUUID returns a bad status. This should be passed to the initial syncer.
4127     {
4128         LockGuard lock(_storageInterfaceWorkDoneMutex);
4129         _storageInterfaceWorkDone.getCollectionUUIDShouldFail = true;
4130     }
4131     doInitialSyncWithOneBatch();
4132 
4133     // Ensure the getCollectionUUID status was captured.
4134     ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, _lastApplied);
4135 
4136     // Ensure upgradeUUIDSchemaVersionNonReplicated was not called.
4137     LockGuard lock(_storageInterfaceWorkDoneMutex);
4138     ASSERT_FALSE(_storageInterfaceWorkDone.schemaUpgraded);
4139 }
4140 
TEST_F(InitialSyncerTest,InitialSyncerCapturesUpgradeUUIDSchemaVersionError)4141 TEST_F(InitialSyncerTest, InitialSyncerCapturesUpgradeUUIDSchemaVersionError) {
4142     // Ensure getCollectionUUID returns a UUID. This should trigger a call to
4143     // upgradeUUIDSchemaVersionNonReplicated.
4144     {
4145         LockGuard lock(_storageInterfaceWorkDoneMutex);
4146         _storageInterfaceWorkDone.uuid = UUID::gen();
4147     }
4148 
4149     // Ensure upgradeUUIDSchemaVersionNonReplicated returns a bad status. This should be passed to
4150     // the initial syncer.
4151     {
4152         LockGuard lock(_storageInterfaceWorkDoneMutex);
4153         _storageInterfaceWorkDone.upgradeUUIDSchemaVersionNonReplicatedShouldFail = true;
4154     }
4155     doInitialSyncWithOneBatch();
4156 
4157     // Ensure the upgradeUUIDSchemaVersionNonReplicated status was captured.
4158     ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, _lastApplied);
4159 }
4160 
4161 }  // namespace
4162