1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include <iosfwd>
34 #include <memory>
35 #include <ostream>
36
37 #include "mongo/client/fetcher.h"
38 #include "mongo/db/client.h"
39 #include "mongo/db/commands/feature_compatibility_version.h"
40 #include "mongo/db/json.h"
41 #include "mongo/db/query/getmore_request.h"
42 #include "mongo/db/repl/base_cloner_test_fixture.h"
43 #include "mongo/db/repl/data_replicator_external_state_mock.h"
44 #include "mongo/db/repl/initial_syncer.h"
45 #include "mongo/db/repl/member_state.h"
46 #include "mongo/db/repl/oplog_entry.h"
47 #include "mongo/db/repl/oplog_fetcher.h"
48 #include "mongo/db/repl/optime.h"
49 #include "mongo/db/repl/replication_consistency_markers_mock.h"
50 #include "mongo/db/repl/replication_process.h"
51 #include "mongo/db/repl/replication_recovery_mock.h"
52 #include "mongo/db/repl/reporter.h"
53 #include "mongo/db/repl/storage_interface.h"
54 #include "mongo/db/repl/storage_interface_mock.h"
55 #include "mongo/db/repl/sync_source_resolver.h"
56 #include "mongo/db/repl/sync_source_selector.h"
57 #include "mongo/db/repl/sync_source_selector_mock.h"
58 #include "mongo/db/repl/task_executor_mock.h"
59 #include "mongo/db/repl/update_position_args.h"
60 #include "mongo/executor/network_interface_mock.h"
61 #include "mongo/executor/thread_pool_task_executor_test_fixture.h"
62 #include "mongo/stdx/mutex.h"
63 #include "mongo/util/concurrency/old_thread_pool.h"
64 #include "mongo/util/concurrency/thread_name.h"
65 #include "mongo/util/fail_point_service.h"
66 #include "mongo/util/mongoutils/str.h"
67 #include "mongo/util/scopeguard.h"
68
69 #include "mongo/unittest/barrier.h"
70 #include "mongo/unittest/unittest.h"
71
72 namespace mongo {
73 namespace repl {
74
75 /**
76 * Insertion operator for InitialSyncer::State. Formats initial syncer state for output stream.
77 */
operator <<(std::ostream & os,const InitialSyncer::State & state)78 std::ostream& operator<<(std::ostream& os, const InitialSyncer::State& state) {
79 switch (state) {
80 case InitialSyncer::State::kPreStart:
81 return os << "PreStart";
82 case InitialSyncer::State::kRunning:
83 return os << "Running";
84 case InitialSyncer::State::kShuttingDown:
85 return os << "ShuttingDown";
86 case InitialSyncer::State::kComplete:
87 return os << "Complete";
88 }
89 MONGO_UNREACHABLE;
90 }
91
92 } // namespace repl
93 } // namespace mongo
94
95
96 namespace {
97
98 using namespace mongo;
99 using namespace mongo::repl;
100
101 using executor::NetworkInterfaceMock;
102 using executor::RemoteCommandRequest;
103 using executor::RemoteCommandResponse;
104 using unittest::log;
105
106 using LockGuard = stdx::lock_guard<stdx::mutex>;
107 using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard;
108 using UniqueLock = stdx::unique_lock<stdx::mutex>;
109
110 struct CollectionCloneInfo {
111 CollectionMockStats stats;
112 CollectionBulkLoaderMock* loader = nullptr;
113 Status status{ErrorCodes::NotYetInitialized, ""};
114 };
115
116 class InitialSyncerTest : public executor::ThreadPoolExecutorTest, public SyncSourceSelector {
117 public:
InitialSyncerTest()118 InitialSyncerTest() {}
119
120 executor::ThreadPoolMock::Options makeThreadPoolMockOptions() const override;
121
122 /**
123 * clear/reset state
124 */
reset()125 void reset() {
126 _setMyLastOptime = [this](const OpTime& opTime,
127 ReplicationCoordinator::DataConsistency consistency) {
128 _myLastOpTime = opTime;
129 };
130 _myLastOpTime = OpTime();
131 _syncSourceSelector = stdx::make_unique<SyncSourceSelectorMock>();
132 }
133
134 // SyncSourceSelector
clearSyncSourceBlacklist()135 void clearSyncSourceBlacklist() override {
136 _syncSourceSelector->clearSyncSourceBlacklist();
137 }
chooseNewSyncSource(const OpTime & ot)138 HostAndPort chooseNewSyncSource(const OpTime& ot) override {
139 return _syncSourceSelector->chooseNewSyncSource(ot);
140 }
blacklistSyncSource(const HostAndPort & host,Date_t until)141 void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
142 _syncSourceSelector->blacklistSyncSource(host, until);
143 }
shouldChangeSyncSource(const HostAndPort & currentSource,const rpc::ReplSetMetadata & replMetadata,boost::optional<rpc::OplogQueryMetadata> oqMetadata)144 bool shouldChangeSyncSource(const HostAndPort& currentSource,
145 const rpc::ReplSetMetadata& replMetadata,
146 boost::optional<rpc::OplogQueryMetadata> oqMetadata) override {
147 return _syncSourceSelector->shouldChangeSyncSource(currentSource, replMetadata, oqMetadata);
148 }
149
scheduleNetworkResponse(std::string cmdName,const BSONObj & obj)150 void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) {
151 NetworkInterfaceMock* net = getNet();
152 if (!net->hasReadyRequests()) {
153 log() << "The network doesn't have a request to process for this response: " << obj;
154 }
155 verifyNextRequestCommandName(cmdName);
156 scheduleNetworkResponse(net->getNextReadyRequest(), obj);
157 }
158
scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi,const BSONObj & obj)159 void scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi,
160 const BSONObj& obj) {
161 NetworkInterfaceMock* net = getNet();
162 Milliseconds millis(0);
163 RemoteCommandResponse response(obj, BSONObj(), millis);
164 log() << "Sending response for network request:";
165 log() << " req: " << noi->getRequest().dbname << "." << noi->getRequest().cmdObj;
166 log() << " resp:" << response;
167
168 net->scheduleResponse(noi, net->now(), response);
169 }
170
scheduleNetworkResponse(std::string cmdName,Status errorStatus)171 void scheduleNetworkResponse(std::string cmdName, Status errorStatus) {
172 NetworkInterfaceMock* net = getNet();
173 if (!getNet()->hasReadyRequests()) {
174 log() << "The network doesn't have a request to process for the error: " << errorStatus;
175 }
176 verifyNextRequestCommandName(cmdName);
177 net->scheduleResponse(net->getNextReadyRequest(), net->now(), errorStatus);
178 }
179
processNetworkResponse(std::string cmdName,const BSONObj & obj)180 void processNetworkResponse(std::string cmdName, const BSONObj& obj) {
181 scheduleNetworkResponse(cmdName, obj);
182 finishProcessingNetworkResponse();
183 }
184
processNetworkResponse(std::string cmdName,Status errorStatus)185 void processNetworkResponse(std::string cmdName, Status errorStatus) {
186 scheduleNetworkResponse(cmdName, errorStatus);
187 finishProcessingNetworkResponse();
188 }
189
190 /**
191 * Schedules and processes a successful response to the network request sent by InitialSyncer's
192 * last oplog entry fetcher. Also validates the find command arguments in the request.
193 */
194 void processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs);
195
196 /**
197 * Schedules and processes a successful response to the network request sent by InitialSyncer's
198 * feature compatibility version fetcher. Includes the 'docs' provided in the response.
199 */
200 void processSuccessfulFCVFetcherResponse(std::vector<BSONObj> docs);
201
202 /**
203 * Schedules and processes a successful response to the network request sent by InitialSyncer's
204 * feature compatibility version fetcher. Always includes a valid fCV=3.6 document in the
205 * response.
206 */
207 void processSuccessfulFCVFetcherResponse36();
208
finishProcessingNetworkResponse()209 void finishProcessingNetworkResponse() {
210 getNet()->runReadyNetworkOperations();
211 if (getNet()->hasReadyRequests()) {
212 log() << "The network has unexpected requests to process, next req:";
213 NetworkInterfaceMock::NetworkOperation req = *getNet()->getNextReadyRequest();
214 log() << req.getDiagnosticString();
215 }
216 ASSERT_FALSE(getNet()->hasReadyRequests());
217 }
218
getInitialSyncer()219 InitialSyncer& getInitialSyncer() {
220 return *_initialSyncer;
221 }
222
getExternalState()223 DataReplicatorExternalStateMock* getExternalState() {
224 return _externalState;
225 }
226
getStorage()227 StorageInterface& getStorage() {
228 return *_storageInterface;
229 }
230
getDbWorkThreadPool()231 OldThreadPool& getDbWorkThreadPool() {
232 return *_dbWorkThreadPool;
233 }
234
235 protected:
236 struct StorageInterfaceResults {
237 bool createOplogCalled = false;
238 bool truncateCalled = false;
239 bool insertedOplogEntries = false;
240 int oplogEntriesInserted = 0;
241 bool droppedUserDBs = false;
242 std::vector<std::string> droppedCollections;
243 int documentsInsertedCount = 0;
244 bool schemaUpgraded = false;
245 OptionalCollectionUUID uuid;
246 bool getCollectionUUIDShouldFail = false;
247 bool upgradeUUIDSchemaVersionNonReplicatedShouldFail = false;
248 };
249
250 stdx::mutex _storageInterfaceWorkDoneMutex; // protects _storageInterfaceWorkDone.
251 StorageInterfaceResults _storageInterfaceWorkDone;
252
setUp()253 void setUp() override {
254 executor::ThreadPoolExecutorTest::setUp();
255 _storageInterface = stdx::make_unique<StorageInterfaceMock>();
256 _storageInterface->createOplogFn = [this](OperationContext* opCtx,
257 const NamespaceString& nss) {
258 LockGuard lock(_storageInterfaceWorkDoneMutex);
259 _storageInterfaceWorkDone.createOplogCalled = true;
260 _storageInterfaceWorkDone.schemaUpgraded = false;
261 _storageInterfaceWorkDone.uuid = boost::none;
262 _storageInterfaceWorkDone.getCollectionUUIDShouldFail = false;
263 _storageInterfaceWorkDone.upgradeUUIDSchemaVersionNonReplicatedShouldFail = false;
264 return Status::OK();
265 };
266 _storageInterface->truncateCollFn = [this](OperationContext* opCtx,
267 const NamespaceString& nss) {
268 LockGuard lock(_storageInterfaceWorkDoneMutex);
269 _storageInterfaceWorkDone.truncateCalled = true;
270 return Status::OK();
271 };
272 _storageInterface->insertDocumentFn = [this](OperationContext* opCtx,
273 const NamespaceString& nss,
274 const TimestampedBSONObj& doc,
275 long long term) {
276 LockGuard lock(_storageInterfaceWorkDoneMutex);
277 ++_storageInterfaceWorkDone.documentsInsertedCount;
278 return Status::OK();
279 };
280 _storageInterface->insertDocumentsFn = [this](OperationContext* opCtx,
281 const NamespaceString& nss,
282 const std::vector<InsertStatement>& ops) {
283 LockGuard lock(_storageInterfaceWorkDoneMutex);
284 _storageInterfaceWorkDone.insertedOplogEntries = true;
285 ++_storageInterfaceWorkDone.oplogEntriesInserted;
286 return Status::OK();
287 };
288 _storageInterface->dropCollFn = [this](OperationContext* opCtx,
289 const NamespaceString& nss) {
290 LockGuard lock(_storageInterfaceWorkDoneMutex);
291 _storageInterfaceWorkDone.droppedCollections.push_back(nss.ns());
292 return Status::OK();
293 };
294 _storageInterface->dropUserDBsFn = [this](OperationContext* opCtx) {
295 LockGuard lock(_storageInterfaceWorkDoneMutex);
296 _storageInterfaceWorkDone.droppedUserDBs = true;
297 return Status::OK();
298 };
299 _storageInterface->createCollectionForBulkFn =
300 [this](const NamespaceString& nss,
301 const CollectionOptions& options,
302 const BSONObj idIndexSpec,
303 const std::vector<BSONObj>& secondaryIndexSpecs) {
304 // Get collection info from map.
305 const auto collInfo = &_collections[nss];
306 if (collInfo->stats.initCalled) {
307 log() << "reusing collection during test which may cause problems, ns:" << nss;
308 }
309 (collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats))
310 ->init(secondaryIndexSpecs)
311 .transitional_ignore();
312
313 return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
314 std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
315 };
316 _storageInterface->getCollectionUUIDFn = [this](OperationContext* opCtx,
317 const NamespaceString& nss) {
318 LockGuard lock(_storageInterfaceWorkDoneMutex);
319 if (_storageInterfaceWorkDone.getCollectionUUIDShouldFail) {
320 // getCollectionUUID returns NamespaceNotFound if either the db or the collection is
321 // missing.
322 return StatusWith<OptionalCollectionUUID>(Status(
323 ErrorCodes::NamespaceNotFound,
324 str::stream() << "getCollectionUUID failed because namespace " << nss.ns()
325 << " not found."));
326 } else {
327 return StatusWith<OptionalCollectionUUID>(_storageInterfaceWorkDone.uuid);
328 }
329 };
330
331 _storageInterface->upgradeUUIDSchemaVersionNonReplicatedFn =
332 [this](OperationContext* opCtx) {
333 LockGuard lock(_storageInterfaceWorkDoneMutex);
334 if (_storageInterfaceWorkDone.upgradeUUIDSchemaVersionNonReplicatedShouldFail) {
335 // One of the status codes a failed upgradeUUIDSchemaVersionNonReplicated call
336 // can return is NamespaceNotFound.
337 return Status(ErrorCodes::NamespaceNotFound,
338 "upgradeUUIDSchemaVersionNonReplicated failed because the "
339 "desired ns was not found.");
340 } else {
341 _storageInterfaceWorkDone.schemaUpgraded = true;
342 return Status::OK();
343 }
344 };
345
346 _dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1);
347
348 Client::initThreadIfNotAlready();
349 reset();
350
351 launchExecutorThread();
352
353 _replicationProcess = stdx::make_unique<ReplicationProcess>(
354 _storageInterface.get(),
355 stdx::make_unique<ReplicationConsistencyMarkersMock>(),
356 stdx::make_unique<ReplicationRecoveryMock>());
357
358 _executorProxy = stdx::make_unique<TaskExecutorMock>(&getExecutor());
359
360 _myLastOpTime = OpTime({3, 0}, 1);
361
362 InitialSyncerOptions options;
363 options.initialSyncRetryWait = Milliseconds(1);
364 options.batchLimits.bytes = 512 * 1024 * 1024U;
365 options.batchLimits.ops = 5000U;
366 options.getMyLastOptime = [this]() { return _myLastOpTime; };
367 options.setMyLastOptime = [this](const OpTime& opTime,
368 ReplicationCoordinator::DataConsistency consistency) {
369 _setMyLastOptime(opTime, consistency);
370 };
371 options.resetOptimes = [this]() { _myLastOpTime = OpTime(); };
372 options.getSlaveDelay = []() { return Seconds(0); };
373 options.syncSourceSelector = this;
374
375 _options = options;
376
377 ThreadPool::Options threadPoolOptions;
378 threadPoolOptions.poolName = "replication";
379 threadPoolOptions.minThreads = 1U;
380 threadPoolOptions.maxThreads = 1U;
381 threadPoolOptions.onCreateThread = [](const std::string& threadName) {
382 Client::initThread(threadName.c_str());
383 };
384
385 auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
386 dataReplicatorExternalState->taskExecutor = _executorProxy.get();
387 dataReplicatorExternalState->dbWorkThreadPool = &getDbWorkThreadPool();
388 dataReplicatorExternalState->currentTerm = 1LL;
389 dataReplicatorExternalState->lastCommittedOpTime = _myLastOpTime;
390 {
391 ReplSetConfig config;
392 ASSERT_OK(config.initialize(BSON("_id"
393 << "myset"
394 << "version"
395 << 1
396 << "protocolVersion"
397 << 1
398 << "members"
399 << BSON_ARRAY(BSON("_id" << 0 << "host"
400 << "localhost:12345"))
401 << "settings"
402 << BSON("electionTimeoutMillis" << 10000))));
403 dataReplicatorExternalState->replSetConfigResult = config;
404 }
405 _externalState = dataReplicatorExternalState.get();
406
407 _lastApplied = getDetectableErrorStatus();
408 _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) {
409 _lastApplied = lastApplied;
410 };
411
412 try {
413 // When creating InitialSyncer, we wrap _onCompletion so that we can override the
414 // InitialSyncer's callback behavior post-construction.
415 // See InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException.
416 _initialSyncer = stdx::make_unique<InitialSyncer>(
417 options,
418 std::move(dataReplicatorExternalState),
419 _storageInterface.get(),
420 _replicationProcess.get(),
421 [this](const StatusWith<OpTimeWithHash>& lastApplied) {
422 _onCompletion(lastApplied);
423 });
424 _initialSyncer->setScheduleDbWorkFn_forTest(
425 [this](const executor::TaskExecutor::CallbackFn& work) {
426 return getExecutor().scheduleWork(work);
427 });
428
429 } catch (...) {
430 ASSERT_OK(exceptionToStatus());
431 }
432 }
433
tearDownExecutorThread()434 void tearDownExecutorThread() {
435 if (_executorThreadShutdownComplete) {
436 return;
437 }
438 getExecutor().shutdown();
439 getExecutor().join();
440 _executorThreadShutdownComplete = true;
441 }
442
tearDown()443 void tearDown() override {
444 tearDownExecutorThread();
445 _initialSyncer.reset();
446 _dbWorkThreadPool->join();
447 _dbWorkThreadPool.reset();
448 _replicationProcess.reset();
449 _storageInterface.reset();
450 }
451
452 /**
453 * Note: An empty cmdName will skip validation.
454 */
verifyNextRequestCommandName(std::string cmdName)455 void verifyNextRequestCommandName(std::string cmdName) {
456 const auto net = getNet();
457 ASSERT_TRUE(net->hasReadyRequests());
458
459 if (cmdName != "") {
460 const NetworkInterfaceMock::NetworkOperationIterator req =
461 net->getFrontOfUnscheduledQueue();
462 const BSONObj reqBSON = req->getRequest().cmdObj;
463 const BSONElement cmdElem = reqBSON.firstElement();
464 auto reqCmdName = cmdElem.fieldNameStringData();
465 ASSERT_EQ(cmdName, reqCmdName);
466 }
467 }
468
469 void runInitialSyncWithBadFCVResponse(std::vector<BSONObj> docs,
470 ErrorCodes::Error expectedError);
471 void doSuccessfulInitialSyncWithOneBatch();
472 OplogEntry doInitialSyncWithOneBatch();
473
474 std::unique_ptr<TaskExecutorMock> _executorProxy;
475
476 InitialSyncerOptions _options;
477 InitialSyncerOptions::SetMyLastOptimeFn _setMyLastOptime;
478 OpTime _myLastOpTime;
479 std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector;
480 std::unique_ptr<StorageInterfaceMock> _storageInterface;
481 std::unique_ptr<ReplicationProcess> _replicationProcess;
482 std::unique_ptr<OldThreadPool> _dbWorkThreadPool;
483 std::map<NamespaceString, CollectionMockStats> _collectionStats;
484 std::map<NamespaceString, CollectionCloneInfo> _collections;
485
486 StatusWith<OpTimeWithHash> _lastApplied = Status(ErrorCodes::NotYetInitialized, "");
487 InitialSyncer::OnCompletionFn _onCompletion;
488
489 private:
490 DataReplicatorExternalStateMock* _externalState;
491 std::unique_ptr<InitialSyncer> _initialSyncer;
492 bool _executorThreadShutdownComplete = false;
493 };
494
makeThreadPoolMockOptions() const495 executor::ThreadPoolMock::Options InitialSyncerTest::makeThreadPoolMockOptions() const {
496 executor::ThreadPoolMock::Options options;
497 options.onCreateThread = []() { Client::initThread("InitialSyncerTest"); };
498 return options;
499 }
500
advanceClock(NetworkInterfaceMock * net,Milliseconds duration)501 void advanceClock(NetworkInterfaceMock* net, Milliseconds duration) {
502 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
503 auto when = net->now() + duration;
504 ASSERT_EQUALS(when, net->runUntil(when));
505 }
506
makeOpCtx()507 ServiceContext::UniqueOperationContext makeOpCtx() {
508 return cc().makeOperationContext();
509 }
510
511 /**
512 * Generates a replSetGetRBID response.
513 */
makeRollbackCheckerResponse(int rollbackId)514 BSONObj makeRollbackCheckerResponse(int rollbackId) {
515 return BSON("ok" << 1 << "rbid" << rollbackId);
516 }
517
518 /**
519 * Generates a cursor response for a Fetcher to consume.
520 */
makeCursorResponse(CursorId cursorId,const NamespaceString & nss,std::vector<BSONObj> docs,bool isFirstBatch=true,int rbid=1)521 RemoteCommandResponse makeCursorResponse(CursorId cursorId,
522 const NamespaceString& nss,
523 std::vector<BSONObj> docs,
524 bool isFirstBatch = true,
525 int rbid = 1) {
526 OpTime futureOpTime(Timestamp(1000, 1000), 1000);
527 rpc::OplogQueryMetadata oqMetadata(futureOpTime, futureOpTime, rbid, 0, 0);
528 BSONObjBuilder metadataBob;
529 ASSERT_OK(oqMetadata.writeToMetadata(&metadataBob));
530 auto metadataObj = metadataBob.obj();
531
532 BSONObjBuilder bob;
533 {
534 BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
535 cursorBob.append("id", cursorId);
536 cursorBob.append("ns", nss.toString());
537 {
538 BSONArrayBuilder batchBob(
539 cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch"));
540 for (const auto& doc : docs) {
541 batchBob.append(doc);
542 }
543 }
544 }
545 bob.append("ok", 1);
546 return {bob.obj(), metadataObj, Milliseconds(0)};
547 }
548
549 /**
550 * Generates a listDatabases response for a DatabasesCloner to consume.
551 */
makeListDatabasesResponse(std::vector<std::string> databaseNames)552 BSONObj makeListDatabasesResponse(std::vector<std::string> databaseNames) {
553 BSONObjBuilder bob;
554 {
555 BSONArrayBuilder databasesBob(bob.subarrayStart("databases"));
556 for (const auto& name : databaseNames) {
557 BSONObjBuilder nameBob(databasesBob.subobjStart());
558 nameBob.append("name", name);
559 }
560 }
561 bob.append("ok", 1);
562 return bob.obj();
563 }
564
565 /**
566 * Generates oplog entries with the given number used for the timestamp.
567 */
makeOplogEntry(int t,OpTypeEnum opType=OpTypeEnum::kInsert,int version=OplogEntry::kOplogVersion)568 OplogEntry makeOplogEntry(int t,
569 OpTypeEnum opType = OpTypeEnum::kInsert,
570 int version = OplogEntry::kOplogVersion) {
571 BSONObj oField = BSON("_id" << t << "a" << t);
572 if (opType == OpTypeEnum::kCommand) {
573 // Insert an arbitrary command name so that the oplog entry is valid.
574 oField = BSON("dropIndexes"
575 << "a_1");
576 }
577 return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime
578 static_cast<long long>(t), // hash
579 opType, // op type
580 NamespaceString("a.a"), // namespace
581 boost::none, // uuid
582 boost::none, // fromMigrate
583 version, // version
584 oField, // o
585 boost::none, // o2
586 {}, // sessionInfo
587 boost::none, // upsert
588 boost::none, // wall clock time
589 boost::none, // statement id
590 boost::none, // optime of previous write within same transaction
591 boost::none, // pre-image optime
592 boost::none); // post-image optime
593 }
594
makeOplogEntryObj(int t,OpTypeEnum opType=OpTypeEnum::kInsert,int version=OplogEntry::kOplogVersion)595 BSONObj makeOplogEntryObj(int t,
596 OpTypeEnum opType = OpTypeEnum::kInsert,
597 int version = OplogEntry::kOplogVersion) {
598 return makeOplogEntry(t, opType, version).toBSON();
599 }
600
processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs)601 void InitialSyncerTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs) {
602 auto net = getNet();
603 auto request = assertRemoteCommandNameEquals(
604 "find",
605 net->scheduleSuccessfulResponse(makeCursorResponse(0LL, _options.localOplogNS, docs)));
606 ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
607 ASSERT_TRUE(request.cmdObj.hasField("sort"));
608 ASSERT_EQUALS(mongo::BSONType::Object, request.cmdObj["sort"].type());
609 ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort"));
610 net->runReadyNetworkOperations();
611 }
612
assertFCVRequest(RemoteCommandRequest request)613 void assertFCVRequest(RemoteCommandRequest request) {
614 ASSERT_EQUALS(nsToDatabaseSubstring(FeatureCompatibilityVersion::kCollection), request.dbname)
615 << request.toString();
616 ASSERT_EQUALS(nsToCollectionSubstring(FeatureCompatibilityVersion::kCollection),
617 request.cmdObj.getStringField("find"));
618 ASSERT_BSONOBJ_EQ(BSON("_id" << FeatureCompatibilityVersion::kParameterName),
619 request.cmdObj.getObjectField("filter"));
620 }
621
processSuccessfulFCVFetcherResponse36()622 void InitialSyncerTest::processSuccessfulFCVFetcherResponse36() {
623 auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
624 << FeatureCompatibilityVersionCommandParser::kVersion36)};
625 processSuccessfulFCVFetcherResponse(docs);
626 }
627
processSuccessfulFCVFetcherResponse(std::vector<BSONObj> docs)628 void InitialSyncerTest::processSuccessfulFCVFetcherResponse(std::vector<BSONObj> docs) {
629 auto net = getNet();
630 auto request = assertRemoteCommandNameEquals(
631 "find",
632 net->scheduleSuccessfulResponse(makeCursorResponse(
633 0LL, NamespaceString(FeatureCompatibilityVersion::kCollection), docs)));
634 assertFCVRequest(request);
635 net->runReadyNetworkOperations();
636 }
637
TEST_F(InitialSyncerTest,InvalidConstruction)638 TEST_F(InitialSyncerTest, InvalidConstruction) {
639 InitialSyncerOptions options;
640 options.getMyLastOptime = []() { return OpTime(); };
641 options.setMyLastOptime = [](const OpTime&,
642 ReplicationCoordinator::DataConsistency consistency) {};
643 options.resetOptimes = []() {};
644 options.getSlaveDelay = []() { return Seconds(0); };
645 options.syncSourceSelector = this;
646 auto callback = [](const StatusWith<OpTimeWithHash>&) {};
647
648 // Null task executor in external state.
649 {
650 auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
651 ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
652 std::move(dataReplicatorExternalState),
653 _storageInterface.get(),
654 _replicationProcess.get(),
655 callback),
656 AssertionException,
657 ErrorCodes::BadValue,
658 "task executor cannot be null");
659 }
660
661 // Null callback function.
662 {
663 auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
664 dataReplicatorExternalState->taskExecutor = &getExecutor();
665 ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
666 std::move(dataReplicatorExternalState),
667 _storageInterface.get(),
668 _replicationProcess.get(),
669 InitialSyncer::OnCompletionFn()),
670 AssertionException,
671 ErrorCodes::BadValue,
672 "callback function cannot be null");
673 }
674 }
675
TEST_F(InitialSyncerTest,CreateDestroy)676 TEST_F(InitialSyncerTest, CreateDestroy) {}
677
678 const std::uint32_t maxAttempts = 1U;
679
TEST_F(InitialSyncerTest,StartupReturnsIllegalOperationIfAlreadyActive)680 TEST_F(InitialSyncerTest, StartupReturnsIllegalOperationIfAlreadyActive) {
681 auto initialSyncer = &getInitialSyncer();
682 auto opCtx = makeOpCtx();
683 ASSERT_FALSE(initialSyncer->isActive());
684 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
685 ASSERT_TRUE(initialSyncer->isActive());
686 ASSERT_EQUALS(ErrorCodes::IllegalOperation, initialSyncer->startup(opCtx.get(), maxAttempts));
687 ASSERT_TRUE(initialSyncer->isActive());
688 }
689
TEST_F(InitialSyncerTest,StartupReturnsShutdownInProgressIfInitialSyncerIsShuttingDown)690 TEST_F(InitialSyncerTest, StartupReturnsShutdownInProgressIfInitialSyncerIsShuttingDown) {
691 auto initialSyncer = &getInitialSyncer();
692 auto opCtx = makeOpCtx();
693 ASSERT_FALSE(initialSyncer->isActive());
694 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
695 ASSERT_TRUE(initialSyncer->isActive());
696 // SyncSourceSelector returns an invalid sync source so InitialSyncer is stuck waiting for
697 // another sync source in 'Options::syncSourceRetryWait' ms.
698 ASSERT_OK(initialSyncer->shutdown());
699 ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
700 }
701
TEST_F(InitialSyncerTest,StartupReturnsShutdownInProgressIfExecutorIsShutdown)702 TEST_F(InitialSyncerTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) {
703 auto initialSyncer = &getInitialSyncer();
704 auto opCtx = makeOpCtx();
705 getExecutor().shutdown();
706 ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
707 ASSERT_FALSE(initialSyncer->isActive());
708
709 // Cannot startup initial syncer again since it's in the Complete state.
710 ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
711 }
712
TEST_F(InitialSyncerTest,ShutdownTransitionsStateToCompleteIfCalledBeforeStartup)713 TEST_F(InitialSyncerTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) {
714 auto initialSyncer = &getInitialSyncer();
715 auto opCtx = makeOpCtx();
716 ASSERT_OK(initialSyncer->shutdown());
717 ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, initialSyncer->startup(opCtx.get(), maxAttempts));
718 // Initial syncer is inactive when it's in the Complete state.
719 ASSERT_FALSE(initialSyncer->isActive());
720 }
721
TEST_F(InitialSyncerTest,StartupSetsInitialSyncFlagOnSuccess)722 TEST_F(InitialSyncerTest, StartupSetsInitialSyncFlagOnSuccess) {
723 auto initialSyncer = &getInitialSyncer();
724 auto opCtx = makeOpCtx();
725
726 // Initial sync flag should not be set before starting.
727 ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
728
729 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
730 ASSERT_TRUE(initialSyncer->isActive());
731
732 // Initial sync flag should be set.
733 ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
734 }
735
TEST_F(InitialSyncerTest,StartupSetsInitialDataTimestampAndStableTimestampOnSuccess)736 TEST_F(InitialSyncerTest, StartupSetsInitialDataTimestampAndStableTimestampOnSuccess) {
737 auto initialSyncer = &getInitialSyncer();
738 auto opCtx = makeOpCtx();
739
740 // Set initial data timestamp forward first.
741 auto serviceCtx = opCtx.get()->getServiceContext();
742 _storageInterface->setInitialDataTimestamp(serviceCtx, Timestamp(5, 5));
743 _storageInterface->setStableTimestamp(serviceCtx, Timestamp(6, 6));
744
745 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
746 ASSERT_TRUE(initialSyncer->isActive());
747
748 ASSERT_EQUALS(Timestamp::kAllowUnstableCheckpointsSentinel,
749 _storageInterface->getInitialDataTimestamp());
750 ASSERT_EQUALS(Timestamp::min(), _storageInterface->getStableTimestamp());
751 }
752
TEST_F(InitialSyncerTest,InitialSyncerReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup)753 TEST_F(InitialSyncerTest, InitialSyncerReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup) {
754 auto initialSyncer = &getInitialSyncer();
755 auto opCtx = makeOpCtx();
756
757 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
758 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
759
760 // This will cancel the _startInitialSyncAttemptCallback() task scheduled by startup().
761 ASSERT_OK(initialSyncer->shutdown());
762
763 // Depending on which InitialSyncer stage (_chooseSyncSource or _rollbackCheckerResetCallback)
764 // was interrupted by shutdown(), we may have to request the network interface to deliver
765 // cancellation signals to the InitialSyncer callbacks in for InitialSyncer to run to
766 // completion.
767 executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
768
769 initialSyncer->join();
770
771 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
772 }
773
TEST_F(InitialSyncerTest,InitialSyncerRetriesSyncSourceSelectionIfChooseNewSyncSourceReturnsInvalidSyncSource)774 TEST_F(InitialSyncerTest,
775 InitialSyncerRetriesSyncSourceSelectionIfChooseNewSyncSourceReturnsInvalidSyncSource) {
776 auto initialSyncer = &getInitialSyncer();
777 auto opCtx = makeOpCtx();
778
779 // Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
780 // because InitialSyncer will look for a valid sync source immediately after startup.
781 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
782
783 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
784
785 // Run first sync source selection attempt.
786 executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
787
788 // InitialSyncer will not drop user databases while looking for a valid sync source.
789 ASSERT_FALSE(_storageInterfaceWorkDone.droppedUserDBs);
790
791 // First sync source selection attempt failed. Update SyncSourceSelectorMock to return valid
792 // sync source next time chooseNewSyncSource() is called.
793 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
794
795 // Advance clock until the next sync source selection attempt.
796 advanceClock(getNet(), _options.syncSourceRetryWait);
797
798 // DataReplictor drops user databases after obtaining a valid sync source.
799 ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
800 }
801
802 const std::uint32_t chooseSyncSourceMaxAttempts = 10U;
803
804 /**
805 * Advances executor clock so that InitialSyncer exhausts all 'chooseSyncSourceMaxAttempts' (server
806 * parameter numInitialSyncConnectAttempts) sync source selection attempts.
807 * If SyncSourceSelectorMock keeps returning an invalid sync source, InitialSyncer will retry every
808 * '_options.syncSourceRetryWait' ms up to a maximum of 'chooseSyncSourceMaxAttempts' attempts.
809 */
_simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock * net,Milliseconds syncSourceRetryWait)810 void _simulateChooseSyncSourceFailure(executor::NetworkInterfaceMock* net,
811 Milliseconds syncSourceRetryWait) {
812 advanceClock(net, int(chooseSyncSourceMaxAttempts - 1) * syncSourceRetryWait);
813 }
814
TEST_F(InitialSyncerTest,InitialSyncerReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts)815 TEST_F(
816 InitialSyncerTest,
817 InitialSyncerReturnsInitialSyncOplogSourceMissingIfNoValidSyncSourceCanBeFoundAfterTenFailedChooseSyncSourceAttempts) {
818 auto initialSyncer = &getInitialSyncer();
819 auto opCtx = makeOpCtx();
820
821 // Override chooseNewSyncSource() result in SyncSourceSelectorMock before calling startup()
822 // because InitialSyncer will look for a valid sync source immediately after startup.
823 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
824
825 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
826
827 _simulateChooseSyncSourceFailure(getNet(), _options.syncSourceRetryWait);
828
829 initialSyncer->join();
830
831 ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
832 }
833
834 // Confirms that InitialSyncer keeps retrying initial sync.
835 // Make every initial sync attempt fail early by having the sync source selector always return an
836 // invalid sync source.
TEST_F(InitialSyncerTest,InitialSyncerRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError)837 TEST_F(InitialSyncerTest,
838 InitialSyncerRetriesInitialSyncUpToMaxAttemptsAndReturnsLastAttemptError) {
839 auto initialSyncer = &getInitialSyncer();
840 auto opCtx = makeOpCtx();
841
842 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
843
844 const std::uint32_t initialSyncMaxAttempts = 3U;
845 ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
846
847 auto net = getNet();
848 for (std::uint32_t i = 0; i < initialSyncMaxAttempts; ++i) {
849 _simulateChooseSyncSourceFailure(net, _options.syncSourceRetryWait);
850 advanceClock(net, _options.initialSyncRetryWait);
851 }
852
853 initialSyncer->join();
854
855 ASSERT_EQUALS(ErrorCodes::InitialSyncOplogSourceMissing, _lastApplied);
856
857 // Check number of failed attempts in stats.
858 auto progress = initialSyncer->getInitialSyncProgress();
859 unittest::log() << "Progress after " << initialSyncMaxAttempts
860 << " failed attempts: " << progress;
861 ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), int(initialSyncMaxAttempts))
862 << progress;
863 ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), int(initialSyncMaxAttempts))
864 << progress;
865 }
866
TEST_F(InitialSyncerTest,InitialSyncerResetsOptimesOnNewAttempt)867 TEST_F(InitialSyncerTest, InitialSyncerResetsOptimesOnNewAttempt) {
868 auto initialSyncer = &getInitialSyncer();
869 auto opCtx = makeOpCtx();
870
871 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
872
873 // Set the last optime to an arbitrary nonzero value. The value of the 'consistency' argument
874 // doesn't matter.
875 auto origOptime = OpTime(Timestamp(1000, 1), 1);
876 _setMyLastOptime(origOptime, ReplicationCoordinator::DataConsistency::Inconsistent);
877
878 // Start initial sync.
879 const std::uint32_t initialSyncMaxAttempts = 1U;
880 ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
881
882 auto net = getNet();
883
884 // Simulate a failed initial sync attempt
885 _simulateChooseSyncSourceFailure(net, _options.syncSourceRetryWait);
886 advanceClock(net, _options.initialSyncRetryWait);
887
888 initialSyncer->join();
889
890 // Make sure the initial sync attempt reset optimes.
891 ASSERT_EQUALS(OpTime(), _options.getMyLastOptime());
892 }
893
TEST_F(InitialSyncerTest,InitialSyncerReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection)894 TEST_F(InitialSyncerTest,
895 InitialSyncerReturnsCallbackCanceledIfShutdownWhileRetryingSyncSourceSelection) {
896 auto initialSyncer = &getInitialSyncer();
897 auto opCtx = makeOpCtx();
898
899 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
900 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
901
902 auto net = getNet();
903 {
904 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
905 auto when = net->now() + _options.syncSourceRetryWait / 2;
906 ASSERT_GREATER_THAN(when, net->now());
907 ASSERT_EQUALS(when, net->runUntil(when));
908 }
909
910 // This will cancel the _chooseSyncSourceCallback() task scheduled at getNet()->now() +
911 // '_options.syncSourceRetryWait'.
912 ASSERT_OK(initialSyncer->shutdown());
913
914 initialSyncer->join();
915
916 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
917 }
918
TEST_F(InitialSyncerTest,InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback)919 TEST_F(InitialSyncerTest,
920 InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextChooseSyncSourceCallback) {
921 auto initialSyncer = &getInitialSyncer();
922 auto opCtx = makeOpCtx();
923
924 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
925 _executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
926 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
927
928 initialSyncer->join();
929
930 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
931 }
932
TEST_F(InitialSyncerTest,InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt)933 TEST_F(InitialSyncerTest,
934 InitialSyncerReturnsScheduleErrorIfTaskExecutorFailsToScheduleNextInitialSyncAttempt) {
935 auto initialSyncer = &getInitialSyncer();
936 auto opCtx = makeOpCtx();
937
938 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
939
940 ASSERT_EQUALS(InitialSyncer::State::kPreStart, initialSyncer->getState_forTest());
941
942 ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
943 ASSERT_EQUALS(InitialSyncer::State::kRunning, initialSyncer->getState_forTest());
944
945 // Advance clock so that we run all but the last sync source callback.
946 auto net = getNet();
947 advanceClock(net, int(chooseSyncSourceMaxAttempts - 2) * _options.syncSourceRetryWait);
948
949 // Last choose sync source attempt should now be scheduled. Advance clock so we fail last
950 // choose sync source attempt which cause the next initial sync attempt to be scheduled.
951 _executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
952 advanceClock(net, _options.syncSourceRetryWait);
953
954 initialSyncer->join();
955
956 ASSERT_EQUALS(InitialSyncer::State::kComplete, initialSyncer->getState_forTest());
957 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
958 }
959
960 // This test verifies that the initial syncer will still transition to a complete state even if
961 // the completion callback function throws an exception.
TEST_F(InitialSyncerTest,InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException)962 TEST_F(InitialSyncerTest, InitialSyncerTransitionsToCompleteWhenFinishCallbackThrowsException) {
963 auto initialSyncer = &getInitialSyncer();
964 auto opCtx = makeOpCtx();
965
966 _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) {
967 _lastApplied = lastApplied;
968 uassert(ErrorCodes::InternalError, "", false);
969 };
970
971 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort());
972 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
973
974 ASSERT_OK(initialSyncer->shutdown());
975 initialSyncer->join();
976
977 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
978 }
979
980 class SharedCallbackState {
981 MONGO_DISALLOW_COPYING(SharedCallbackState);
982
983 public:
SharedCallbackState(bool * sharedCallbackStateDestroyed)984 explicit SharedCallbackState(bool* sharedCallbackStateDestroyed)
985 : _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {}
~SharedCallbackState()986 ~SharedCallbackState() {
987 *_sharedCallbackStateDestroyed = true;
988 }
989
990 private:
991 bool* _sharedCallbackStateDestroyed;
992 };
993
TEST_F(InitialSyncerTest,InitialSyncerResetsOnCompletionCallbackFunctionPointerUponCompletion)994 TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointerUponCompletion) {
995 bool sharedCallbackStateDestroyed = false;
996 auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed);
997 decltype(_lastApplied) lastApplied = getDetectableErrorStatus();
998
999 auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
1000 dataReplicatorExternalState->taskExecutor = &getExecutor();
1001 auto initialSyncer = stdx::make_unique<InitialSyncer>(
1002 _options,
1003 std::move(dataReplicatorExternalState),
1004 _storageInterface.get(),
1005 _replicationProcess.get(),
1006 [&lastApplied, sharedCallbackData](const StatusWith<OpTimeWithHash>& result) {
1007 lastApplied = result;
1008 });
1009 ON_BLOCK_EXIT([this]() { getExecutor().shutdown(); });
1010
1011 auto opCtx = makeOpCtx();
1012
1013 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1014
1015 sharedCallbackData.reset();
1016 ASSERT_FALSE(sharedCallbackStateDestroyed);
1017
1018 ASSERT_OK(initialSyncer->shutdown());
1019
1020 // Depending on which InitialSyncer stage (_chooseSyncSource or _rollbackCheckerResetCallback)
1021 // was interrupted by shutdown(), we may have to request the network interface to deliver
1022 // cancellation signals to the InitialSyncer callbacks in for InitialSyncer to run to
1023 // completion.
1024 executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
1025
1026 initialSyncer->join();
1027
1028 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, lastApplied);
1029
1030 // InitialSyncer should reset 'InitialSyncer::_onCompletion' after running callback function
1031 // for the last time before becoming inactive.
1032 // This ensures that we release resources associated with 'InitialSyncer::_onCompletion'.
1033 ASSERT_TRUE(sharedCallbackStateDestroyed);
1034 }
1035
TEST_F(InitialSyncerTest,InitialSyncerTruncatesOplogAndDropsReplicatedDatabases)1036 TEST_F(InitialSyncerTest, InitialSyncerTruncatesOplogAndDropsReplicatedDatabases) {
1037 // We are not interested in proceeding beyond the dropUserDB stage so we inject a failure
1038 // after setting '_storageInterfaceWorkDone.droppedUserDBs' to true.
1039 auto oldDropUserDBsFn = _storageInterface->dropUserDBsFn;
1040 _storageInterface->dropUserDBsFn = [oldDropUserDBsFn](OperationContext* opCtx) {
1041 ASSERT_OK(oldDropUserDBsFn(opCtx));
1042 return Status(ErrorCodes::OperationFailed, "drop userdbs failed");
1043 };
1044
1045 auto initialSyncer = &getInitialSyncer();
1046 auto opCtx = makeOpCtx();
1047
1048 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1049 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1050
1051 initialSyncer->join();
1052 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1053
1054 LockGuard lock(_storageInterfaceWorkDoneMutex);
1055 ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
1056 ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
1057 }
1058
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughGetRollbackIdScheduleError)1059 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) {
1060 auto initialSyncer = &getInitialSyncer();
1061 auto opCtx = makeOpCtx();
1062
1063 // replSetGetRBID is the first remote command to be scheduled by the initial syncer after
1064 // creating the oplog collection.
1065 executor::RemoteCommandRequest request;
1066 _executorProxy->shouldFailScheduleRemoteCommandRequest =
1067 [&request](const executor::RemoteCommandRequest& requestToSend) {
1068 request = requestToSend;
1069 return true;
1070 };
1071
1072 HostAndPort syncSource("localhost", 12345);
1073 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1074 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1075
1076 initialSyncer->join();
1077 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1078
1079 ASSERT_EQUALS("admin", request.dbname);
1080 assertRemoteCommandNameEquals("replSetGetRBID", request);
1081 ASSERT_EQUALS(syncSource, request.target);
1082 }
1083
TEST_F(InitialSyncerTest,InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown)1084 TEST_F(
1085 InitialSyncerTest,
1086 InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) {
1087 // The rollback id request is sent immediately after oplog truncation. We shut the task executor
1088 // down before returning from truncate() to make the scheduleRemoteCommand() call for
1089 // replSetGetRBID fail.
1090 auto oldTruncateCollFn = _storageInterface->truncateCollFn;
1091 _storageInterface->truncateCollFn = [oldTruncateCollFn, this](OperationContext* opCtx,
1092 const NamespaceString& nss) {
1093 auto status = oldTruncateCollFn(opCtx, nss);
1094 getExecutor().shutdown();
1095 return status;
1096 };
1097
1098 auto initialSyncer = &getInitialSyncer();
1099 auto opCtx = makeOpCtx();
1100
1101 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1102 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1103
1104 initialSyncer->join();
1105 ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied);
1106
1107 LockGuard lock(_storageInterfaceWorkDoneMutex);
1108 ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
1109 }
1110
TEST_F(InitialSyncerTest,InitialSyncerCancelsRollbackCheckerOnShutdown)1111 TEST_F(InitialSyncerTest, InitialSyncerCancelsRollbackCheckerOnShutdown) {
1112 auto initialSyncer = &getInitialSyncer();
1113 auto opCtx = makeOpCtx();
1114
1115 HostAndPort syncSource("localhost", 12345);
1116 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1117
1118 ASSERT_EQUALS(InitialSyncer::State::kPreStart, initialSyncer->getState_forTest());
1119
1120 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1121 ASSERT_EQUALS(InitialSyncer::State::kRunning, initialSyncer->getState_forTest());
1122
1123 auto net = getNet();
1124 {
1125 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1126 ASSERT_TRUE(net->hasReadyRequests());
1127 auto noi = net->getNextReadyRequest();
1128 const auto& request = assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest());
1129 ASSERT_EQUALS("admin", request.dbname);
1130 ASSERT_EQUALS(syncSource, request.target);
1131 net->blackHole(noi);
1132 }
1133
1134 ASSERT_OK(initialSyncer->shutdown());
1135 // Since we need to request the NetworkInterfaceMock to deliver the cancellation event,
1136 // the InitialSyncer has to be in a pre-completion state (ie. ShuttingDown).
1137 ASSERT_EQUALS(InitialSyncer::State::kShuttingDown, initialSyncer->getState_forTest());
1138
1139 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
1140
1141 initialSyncer->join();
1142 ASSERT_EQUALS(InitialSyncer::State::kComplete, initialSyncer->getState_forTest());
1143
1144 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
1145 }
1146
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughRollbackCheckerCallbackError)1147 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughRollbackCheckerCallbackError) {
1148 auto initialSyncer = &getInitialSyncer();
1149 auto opCtx = makeOpCtx();
1150
1151 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1152 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1153
1154 auto net = getNet();
1155 {
1156 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1157 assertRemoteCommandNameEquals(
1158 "replSetGetRBID",
1159 net->scheduleErrorResponse(
1160 Status(ErrorCodes::OperationFailed, "replSetGetRBID failed at sync source")));
1161 net->runReadyNetworkOperations();
1162 }
1163
1164 initialSyncer->join();
1165 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1166 }
1167
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughLastOplogEntryFetcherScheduleError)1168 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherScheduleError) {
1169 auto initialSyncer = &getInitialSyncer();
1170 auto opCtx = makeOpCtx();
1171
1172 // The last oplog entry fetcher is the first component that sends a find command so we reject
1173 // any find commands and save the request for inspection at the end of this test case.
1174 executor::RemoteCommandRequest request;
1175 _executorProxy->shouldFailScheduleRemoteCommandRequest =
1176 [&request](const executor::RemoteCommandRequest& requestToSend) {
1177 request = requestToSend;
1178 return "find" == requestToSend.cmdObj.firstElement().fieldNameStringData();
1179 };
1180
1181 HostAndPort syncSource("localhost", 12345);
1182 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1183 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1184
1185 auto net = getNet();
1186 {
1187 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1188
1189 // Base rollback ID.
1190 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1191 net->runReadyNetworkOperations();
1192 }
1193
1194 initialSyncer->join();
1195 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1196
1197 ASSERT_EQUALS(syncSource, request.target);
1198 ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname);
1199 assertRemoteCommandNameEquals("find", request);
1200 ASSERT_BSONOBJ_EQ(BSON("$natural" << -1), request.cmdObj.getObjectField("sort"));
1201 ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
1202 }
1203
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughLastOplogEntryFetcherCallbackError)1204 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughLastOplogEntryFetcherCallbackError) {
1205 auto initialSyncer = &getInitialSyncer();
1206 auto opCtx = makeOpCtx();
1207
1208 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1209 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1210
1211 auto net = getNet();
1212 {
1213 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1214
1215 // Base rollback ID.
1216 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1217 net->runReadyNetworkOperations();
1218
1219 assertRemoteCommandNameEquals(
1220 "find",
1221 net->scheduleErrorResponse(
1222 Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
1223 net->runReadyNetworkOperations();
1224 }
1225
1226 initialSyncer->join();
1227 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1228 }
1229
TEST_F(InitialSyncerTest,InitialSyncerCancelsLastOplogEntryFetcherOnShutdown)1230 TEST_F(InitialSyncerTest, InitialSyncerCancelsLastOplogEntryFetcherOnShutdown) {
1231 auto initialSyncer = &getInitialSyncer();
1232 auto opCtx = makeOpCtx();
1233
1234 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1235 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1236
1237 auto net = getNet();
1238 {
1239 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1240
1241 // Base rollback ID.
1242 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1243 net->runReadyNetworkOperations();
1244
1245 ASSERT_TRUE(net->hasReadyRequests());
1246 }
1247
1248 ASSERT_OK(initialSyncer->shutdown());
1249 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
1250
1251 initialSyncer->join();
1252 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
1253 }
1254
TEST_F(InitialSyncerTest,InitialSyncerReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments)1255 TEST_F(InitialSyncerTest,
1256 InitialSyncerReturnsNoMatchingDocumentIfLastOplogEntryFetcherReturnsEmptyBatchOfDocuments) {
1257 auto initialSyncer = &getInitialSyncer();
1258 auto opCtx = makeOpCtx();
1259
1260 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1261 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1262
1263 auto net = getNet();
1264 {
1265 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1266
1267 // Base rollback ID.
1268 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1269 net->runReadyNetworkOperations();
1270
1271 // Last oplog entry.
1272 processSuccessfulLastOplogEntryFetcherResponse({});
1273 }
1274
1275 initialSyncer->join();
1276 ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, _lastApplied);
1277 }
1278
TEST_F(InitialSyncerTest,InitialSyncerResendsFindCommandIfLastOplogEntryFetcherReturnsRetriableError)1279 TEST_F(InitialSyncerTest,
1280 InitialSyncerResendsFindCommandIfLastOplogEntryFetcherReturnsRetriableError) {
1281 auto initialSyncer = &getInitialSyncer();
1282 auto opCtx = makeOpCtx();
1283
1284 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1285 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1286
1287 auto net = getNet();
1288 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1289
1290 // Base rollback ID.
1291 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1292 net->runReadyNetworkOperations();
1293
1294 // Last oplog entry first attempt - retriable error.
1295 assertRemoteCommandNameEquals("find",
1296 net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
1297 net->runReadyNetworkOperations();
1298
1299 // InitialSyncer stays active because it resends the find request for the last oplog entry.
1300 ASSERT_TRUE(initialSyncer->isActive());
1301
1302 // Last oplog entry second attempt.
1303 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1304 }
1305
TEST_F(InitialSyncerTest,InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash)1306 TEST_F(InitialSyncerTest,
1307 InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash) {
1308 auto initialSyncer = &getInitialSyncer();
1309 auto opCtx = makeOpCtx();
1310
1311 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1312 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1313
1314 auto net = getNet();
1315 {
1316 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1317
1318 // Base rollback ID.
1319 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1320 net->runReadyNetworkOperations();
1321
1322 // Last oplog entry.
1323 processSuccessfulLastOplogEntryFetcherResponse({BSONObj()});
1324 }
1325
1326 initialSyncer->join();
1327 ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
1328 }
1329
TEST_F(InitialSyncerTest,InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp)1330 TEST_F(InitialSyncerTest,
1331 InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp) {
1332 auto initialSyncer = &getInitialSyncer();
1333 auto opCtx = makeOpCtx();
1334
1335 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1336 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1337
1338 auto net = getNet();
1339 {
1340 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1341
1342 // Base rollback ID.
1343 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1344 net->runReadyNetworkOperations();
1345
1346 // Last oplog entry.
1347 processSuccessfulLastOplogEntryFetcherResponse({BSON("h" << 1LL)});
1348 }
1349
1350 initialSyncer->join();
1351 ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied);
1352 }
1353
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig)1354 TEST_F(InitialSyncerTest,
1355 InitialSyncerPassesThroughErrorFromDataReplicatorExternalStateGetCurrentConfig) {
1356 auto initialSyncer = &getInitialSyncer();
1357 auto opCtx = makeOpCtx();
1358
1359 getExternalState()->replSetConfigResult = Status(ErrorCodes::OperationFailed, "");
1360
1361 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1362 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1363
1364 auto net = getNet();
1365 {
1366 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1367
1368 // Base rollback ID.
1369 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1370 net->runReadyNetworkOperations();
1371
1372 // Last oplog entry.
1373 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1374
1375 // Feature Compatibility Version.
1376 processSuccessfulFCVFetcherResponse36();
1377 }
1378
1379 initialSyncer->join();
1380 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1381 }
1382
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughFCVFetcherScheduleError)1383 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherScheduleError) {
1384 auto initialSyncer = &getInitialSyncer();
1385 auto opCtx = makeOpCtx();
1386
1387 // We reject the first find command that is on the fCV collection.
1388 executor::RemoteCommandRequest request;
1389 _executorProxy->shouldFailScheduleRemoteCommandRequest =
1390 [&request](const executor::RemoteCommandRequest& requestToSend) {
1391 request = requestToSend;
1392 return "find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
1393 nsToCollectionSubstring(FeatureCompatibilityVersion::kCollection) ==
1394 requestToSend.cmdObj.firstElement().str();
1395 };
1396
1397 HostAndPort syncSource("localhost", 12345);
1398 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1399 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1400
1401 auto net = getNet();
1402 {
1403 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1404
1405 // Base rollback ID.
1406 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1407 net->runReadyNetworkOperations();
1408
1409 // Last oplog entry.
1410 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1411 }
1412
1413 initialSyncer->join();
1414 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1415
1416 ASSERT_EQUALS(syncSource, request.target);
1417 assertFCVRequest(request);
1418 }
1419
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughFCVFetcherCallbackError)1420 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughFCVFetcherCallbackError) {
1421 auto initialSyncer = &getInitialSyncer();
1422 auto opCtx = makeOpCtx();
1423
1424 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1425 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1426
1427 auto net = getNet();
1428 {
1429 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1430
1431 // Base rollback ID.
1432 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1433 net->runReadyNetworkOperations();
1434
1435 // Last oplog entry.
1436 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1437
1438 auto request = assertRemoteCommandNameEquals(
1439 "find",
1440 net->scheduleErrorResponse(
1441 Status(ErrorCodes::OperationFailed, "find command failed at sync source")));
1442 assertFCVRequest(request);
1443 net->runReadyNetworkOperations();
1444 }
1445
1446 initialSyncer->join();
1447 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1448 }
1449
TEST_F(InitialSyncerTest,InitialSyncerCancelsFCVFetcherOnShutdown)1450 TEST_F(InitialSyncerTest, InitialSyncerCancelsFCVFetcherOnShutdown) {
1451 auto initialSyncer = &getInitialSyncer();
1452 auto opCtx = makeOpCtx();
1453
1454 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1455 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1456
1457 auto net = getNet();
1458 {
1459 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1460
1461 // Base rollback ID.
1462 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1463 net->runReadyNetworkOperations();
1464
1465 // Last oplog entry.
1466 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1467
1468 ASSERT_TRUE(net->hasReadyRequests());
1469 }
1470
1471 ASSERT_OK(initialSyncer->shutdown());
1472 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
1473
1474 initialSyncer->join();
1475 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
1476 }
1477
TEST_F(InitialSyncerTest,InitialSyncerResendsFindCommandIfFCVFetcherReturnsRetriableError)1478 TEST_F(InitialSyncerTest, InitialSyncerResendsFindCommandIfFCVFetcherReturnsRetriableError) {
1479 auto initialSyncer = &getInitialSyncer();
1480 auto opCtx = makeOpCtx();
1481
1482 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1483 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1484
1485 auto net = getNet();
1486 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1487
1488 // Base rollback ID.
1489 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1490 net->runReadyNetworkOperations();
1491
1492 // Last oplog entry.
1493 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1494
1495 // FCV first attempt - retriable error.
1496 assertRemoteCommandNameEquals("find",
1497 net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
1498 net->runReadyNetworkOperations();
1499
1500 // InitialSyncer stays active because it resends the find request for the fCV.
1501 ASSERT_TRUE(initialSyncer->isActive());
1502
1503 // FCV second attempt.
1504 processSuccessfulFCVFetcherResponse36();
1505 }
1506
runInitialSyncWithBadFCVResponse(std::vector<BSONObj> docs,ErrorCodes::Error expectedError)1507 void InitialSyncerTest::runInitialSyncWithBadFCVResponse(std::vector<BSONObj> docs,
1508 ErrorCodes::Error expectedError) {
1509 auto initialSyncer = &getInitialSyncer();
1510 auto opCtx = makeOpCtx();
1511
1512 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1513 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1514
1515 auto net = getNet();
1516 {
1517 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1518
1519 // Base rollback ID.
1520 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1521 net->runReadyNetworkOperations();
1522
1523 // Last oplog entry.
1524 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1525
1526 processSuccessfulFCVFetcherResponse(docs);
1527 }
1528
1529 initialSyncer->join();
1530 ASSERT_EQUALS(expectedError, _lastApplied);
1531 }
1532
TEST_F(InitialSyncerTest,InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsEmptyBatchOfDocuments)1533 TEST_F(InitialSyncerTest,
1534 InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsEmptyBatchOfDocuments) {
1535 runInitialSyncWithBadFCVResponse({}, ErrorCodes::IncompatibleServerVersion);
1536 }
1537
TEST_F(InitialSyncerTest,InitialSyncerReturnsTooManyMatchingDocumentsWhenFCVFetcherReturnsMultipleDocuments)1538 TEST_F(InitialSyncerTest,
1539 InitialSyncerReturnsTooManyMatchingDocumentsWhenFCVFetcherReturnsMultipleDocuments) {
1540 auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
1541 << FeatureCompatibilityVersionCommandParser::kVersion36),
1542 BSON("_id"
1543 << "other")};
1544 runInitialSyncWithBadFCVResponse(docs, ErrorCodes::TooManyMatchingDocuments);
1545 }
1546
TEST_F(InitialSyncerTest,InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsUpgradeTargetVersion)1547 TEST_F(InitialSyncerTest,
1548 InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsUpgradeTargetVersion) {
1549 auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
1550 << FeatureCompatibilityVersionCommandParser::kVersion34
1551 << "targetVersion"
1552 << FeatureCompatibilityVersionCommandParser::kVersion36)};
1553 runInitialSyncWithBadFCVResponse(docs, ErrorCodes::IncompatibleServerVersion);
1554 }
1555
TEST_F(InitialSyncerTest,InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsDowngradeTargetVersion)1556 TEST_F(InitialSyncerTest,
1557 InitialSyncerReturnsIncompatibleServerVersionWhenFCVFetcherReturnsDowngradeTargetVersion) {
1558 auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
1559 << FeatureCompatibilityVersionCommandParser::kVersion34
1560 << "targetVersion"
1561 << FeatureCompatibilityVersionCommandParser::kVersion34)};
1562 runInitialSyncWithBadFCVResponse(docs, ErrorCodes::IncompatibleServerVersion);
1563 }
1564
TEST_F(InitialSyncerTest,InitialSyncerReturnsBadValueWhenFCVFetcherReturnsNoVersion)1565 TEST_F(InitialSyncerTest, InitialSyncerReturnsBadValueWhenFCVFetcherReturnsNoVersion) {
1566 auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "targetVersion"
1567 << FeatureCompatibilityVersionCommandParser::kVersion34)};
1568 runInitialSyncWithBadFCVResponse(docs, ErrorCodes::BadValue);
1569 }
1570
TEST_F(InitialSyncerTest,InitialSyncerSucceedsWhenFCVFetcherReturnsOldVersion)1571 TEST_F(InitialSyncerTest, InitialSyncerSucceedsWhenFCVFetcherReturnsOldVersion) {
1572 auto initialSyncer = &getInitialSyncer();
1573 auto opCtx = makeOpCtx();
1574
1575 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1576 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1577
1578 auto net = getNet();
1579 {
1580 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1581
1582 // Base rollback ID.
1583 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1584 net->runReadyNetworkOperations();
1585
1586 // Last oplog entry.
1587 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1588
1589 auto docs = {BSON("_id" << FeatureCompatibilityVersion::kParameterName << "version"
1590 << FeatureCompatibilityVersionCommandParser::kVersion34)};
1591 processSuccessfulFCVFetcherResponse(docs);
1592 ASSERT_TRUE(net->hasReadyRequests());
1593 }
1594
1595 // We shut it down so we do not have to finish initial sync. If the fCV fetcher got an error,
1596 // we would return that.
1597 ASSERT_OK(initialSyncer->shutdown());
1598 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
1599
1600 initialSyncer->join();
1601 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
1602 }
1603
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughOplogFetcherScheduleError)1604 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherScheduleError) {
1605 auto initialSyncer = &getInitialSyncer();
1606 auto opCtx = makeOpCtx();
1607
1608 // Make the tailable oplog query fail. Allow all other requests to be scheduled.
1609 executor::RemoteCommandRequest request;
1610 _executorProxy->shouldFailScheduleRemoteCommandRequest =
1611 [&request](const executor::RemoteCommandRequest& requestToSend) {
1612 if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
1613 requestToSend.cmdObj.getBoolField("tailable")) {
1614 request = requestToSend;
1615 return true;
1616 }
1617 return false;
1618 };
1619
1620 HostAndPort syncSource("localhost", 12345);
1621 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1622 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1623
1624 auto net = getNet();
1625 {
1626 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1627
1628 // Base rollback ID.
1629 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1630 net->runReadyNetworkOperations();
1631
1632 // Last oplog entry.
1633 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1634 net->runReadyNetworkOperations();
1635
1636 // Feature Compatibility Version.
1637 processSuccessfulFCVFetcherResponse36();
1638
1639 // OplogFetcher will shut down DatabasesCloner on error after setting the completion status.
1640 // We call runReadyNetworkOperations() again to deliver the cancellation status to
1641 // _databasesClonerCallback().
1642 net->runReadyNetworkOperations();
1643 }
1644 initialSyncer->join();
1645 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1646
1647 ASSERT_EQUALS(syncSource, request.target);
1648 ASSERT_EQUALS(_options.localOplogNS.db(), request.dbname);
1649 assertRemoteCommandNameEquals("find", request);
1650 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1651 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
1652 }
1653
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughOplogFetcherCallbackError)1654 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) {
1655 auto initialSyncer = &getInitialSyncer();
1656 auto opCtx = makeOpCtx();
1657
1658 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1659 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1660
1661 auto net = getNet();
1662 {
1663 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1664
1665 // Base rollback ID.
1666 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1667 net->runReadyNetworkOperations();
1668
1669 // Last oplog entry.
1670 net->scheduleSuccessfulResponse(
1671 makeCursorResponse(0LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
1672 net->runReadyNetworkOperations();
1673
1674 // Feature Compatibility Version.
1675 processSuccessfulFCVFetcherResponse36();
1676
1677 assertRemoteCommandNameEquals(
1678 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
1679 net->runReadyNetworkOperations();
1680
1681 // Oplog tailing query.
1682 auto request = assertRemoteCommandNameEquals(
1683 "find", net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "dead cursor")));
1684 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1685 net->runReadyNetworkOperations();
1686
1687
1688 // OplogFetcher will shut down DatabasesCloner on error after setting the completion status.
1689 // We call runReadyNetworkOperations() again to deliver the cancellation status to
1690 // _databasesClonerCallback().
1691 net->runReadyNetworkOperations();
1692 }
1693
1694 initialSyncer->join();
1695 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1696 }
1697
TEST_F(InitialSyncerTest,InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply)1698 TEST_F(InitialSyncerTest,
1699 InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreNoOperationsToApply) {
1700 auto initialSyncer = &getInitialSyncer();
1701 auto opCtx = makeOpCtx();
1702
1703 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1704 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1705
1706 auto net = getNet();
1707 {
1708 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1709
1710 // Base rollback ID.
1711 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1712 net->runReadyNetworkOperations();
1713
1714 // Last oplog entry.
1715 auto request =
1716 assertRemoteCommandNameEquals("find",
1717 net->scheduleSuccessfulResponse(makeCursorResponse(
1718 0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
1719 ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
1720 net->runReadyNetworkOperations();
1721
1722 // Feature Compatibility Version.
1723 processSuccessfulFCVFetcherResponse36();
1724
1725 assertRemoteCommandNameEquals(
1726 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
1727 net->runReadyNetworkOperations();
1728
1729 // Oplog tailing query.
1730 // Simulate cursor closing on sync source.
1731 request =
1732 assertRemoteCommandNameEquals("find",
1733 net->scheduleSuccessfulResponse(makeCursorResponse(
1734 0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
1735 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1736 net->runReadyNetworkOperations();
1737
1738 // Second last oplog entry fetcher.
1739 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1740
1741 // Last rollback checker replSetGetRBID command.
1742 assertRemoteCommandNameEquals(
1743 "replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)));
1744 net->runReadyNetworkOperations();
1745 }
1746
1747 initialSyncer->join();
1748 ASSERT_EQUALS(makeOplogEntry(1).getOpTime(), unittest::assertGet(_lastApplied).opTime);
1749 }
1750
TEST_F(InitialSyncerTest,InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp)1751 TEST_F(
1752 InitialSyncerTest,
1753 InitialSyncerSucceedsOnEarlyOplogFetcherCompletionIfThereAreEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
1754 auto initialSyncer = &getInitialSyncer();
1755 auto opCtx = makeOpCtx();
1756
1757 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1758 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1759
1760 auto net = getNet();
1761 {
1762 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1763
1764 // Base rollback ID.
1765 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1766 net->runReadyNetworkOperations();
1767
1768 // Last oplog entry.
1769 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1770
1771 // Feature Compatibility Version.
1772 processSuccessfulFCVFetcherResponse36();
1773
1774 assertRemoteCommandNameEquals(
1775 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
1776 net->runReadyNetworkOperations();
1777
1778 // Oplog tailing query.
1779 // Simulate cursor closing on sync source.
1780 auto request =
1781 assertRemoteCommandNameEquals("find",
1782 net->scheduleSuccessfulResponse(makeCursorResponse(
1783 0LL,
1784 _options.localOplogNS,
1785 {makeOplogEntryObj(1),
1786 makeOplogEntryObj(2, OpTypeEnum::kCommand),
1787 makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
1788 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1789 net->runReadyNetworkOperations();
1790
1791 // Second last oplog entry fetcher.
1792 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(3)});
1793
1794 // Last rollback checker replSetGetRBID command.
1795 assertRemoteCommandNameEquals(
1796 "replSetGetRBID", net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)));
1797 net->runReadyNetworkOperations();
1798 }
1799
1800 initialSyncer->join();
1801 ASSERT_EQUALS(makeOplogEntry(3).getOpTime(), unittest::assertGet(_lastApplied).opTime);
1802 }
1803
TEST_F(InitialSyncerTest,InitialSyncerReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp)1804 TEST_F(
1805 InitialSyncerTest,
1806 InitialSyncerReturnsRemoteResultsUnavailableOnEarlyOplogFetcherCompletionIfThereAreNotEnoughOperationsInTheOplogBufferToReachEndTimestamp) {
1807 auto initialSyncer = &getInitialSyncer();
1808 auto opCtx = makeOpCtx();
1809
1810 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1811 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1812
1813 auto net = getNet();
1814 {
1815 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1816
1817 // Base rollback ID.
1818 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1819 net->runReadyNetworkOperations();
1820
1821 // Last oplog entry.
1822 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1823
1824 // Feature Compatibility Version.
1825 processSuccessfulFCVFetcherResponse36();
1826
1827 assertRemoteCommandNameEquals(
1828 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
1829 net->runReadyNetworkOperations();
1830
1831 // Oplog tailing query.
1832 // Simulate cursor closing on sync source.
1833 auto request =
1834 assertRemoteCommandNameEquals("find",
1835 net->scheduleSuccessfulResponse(makeCursorResponse(
1836 0LL,
1837 _options.localOplogNS,
1838 {makeOplogEntryObj(1),
1839 makeOplogEntryObj(2, OpTypeEnum::kCommand),
1840 makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
1841 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1842 net->runReadyNetworkOperations();
1843
1844 // Second last oplog entry fetcher.
1845 // Return an oplog entry with an optime that is more recent than what the completed
1846 // OplogFetcher has read from the sync source.
1847 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(4)});
1848 }
1849
1850 initialSyncer->join();
1851 ASSERT_EQUALS(ErrorCodes::RemoteResultsUnavailable, _lastApplied);
1852 }
1853
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher)1854 TEST_F(InitialSyncerTest,
1855 InitialSyncerPassesThroughDatabasesClonerScheduleErrorAndCancelsOplogFetcher) {
1856 auto initialSyncer = &getInitialSyncer();
1857 auto opCtx = makeOpCtx();
1858
1859 // Make the listDatabases command fail. Allow all other requests to be scheduled.
1860 executor::RemoteCommandRequest request;
1861 _executorProxy->shouldFailScheduleRemoteCommandRequest =
1862 [&request](const executor::RemoteCommandRequest& requestToSend) {
1863 if ("listDatabases" == requestToSend.cmdObj.firstElement().fieldNameStringData()) {
1864 request = requestToSend;
1865 return true;
1866 }
1867 return false;
1868 };
1869
1870 HostAndPort syncSource("localhost", 12345);
1871 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(syncSource);
1872 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1873
1874 auto net = getNet();
1875 {
1876 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1877
1878 // Base rollback ID.
1879 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1880 net->runReadyNetworkOperations();
1881
1882 // Last oplog entry.
1883 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1884
1885 // Feature Compatibility Version.
1886 processSuccessfulFCVFetcherResponse36();
1887
1888 // InitialSyncer shuts down OplogFetcher when it fails to schedule DatabasesCloner
1889 // so we should not expect any network requests in the queue.
1890 ASSERT_FALSE(net->hasReadyRequests());
1891
1892 // OplogFetcher is shutting down but we still need to call runReadyNetworkOperations()
1893 // to deliver the cancellation status to the 'InitialSyncer::_oplogFetcherCallback'
1894 // callback.
1895 net->runReadyNetworkOperations();
1896 }
1897
1898 initialSyncer->join();
1899 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
1900
1901 ASSERT_EQUALS(syncSource, request.target);
1902 ASSERT_EQUALS("admin", request.dbname);
1903 assertRemoteCommandNameEquals("listDatabases", request);
1904 }
1905
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher)1906 TEST_F(InitialSyncerTest,
1907 InitialSyncerPassesThroughDatabasesClonerCallbackErrorAndCancelsOplogFetcher) {
1908 auto initialSyncer = &getInitialSyncer();
1909 auto opCtx = makeOpCtx();
1910
1911 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1912 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1913
1914 auto net = getNet();
1915 {
1916 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1917
1918 // Base rollback ID.
1919 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1920 net->runReadyNetworkOperations();
1921
1922 // Last oplog entry.
1923 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1924
1925 // Feature Compatibility Version.
1926 processSuccessfulFCVFetcherResponse36();
1927
1928 // DatabasesCloner's first remote command - listDatabases
1929 assertRemoteCommandNameEquals(
1930 "listDatabases",
1931 net->scheduleErrorResponse(Status(ErrorCodes::FailedToParse, "listDatabases failed")));
1932 net->runReadyNetworkOperations();
1933
1934 // DatabasesCloner will shut down OplogFetcher on error after setting the completion status.
1935 // We call runReadyNetworkOperations() again to deliver the cancellation status to
1936 // _oplogFetcherCallback().
1937 net->runReadyNetworkOperations();
1938 }
1939
1940 initialSyncer->join();
1941 ASSERT_EQUALS(ErrorCodes::FailedToParse, _lastApplied);
1942 }
1943
TEST_F(InitialSyncerTest,InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases)1944 TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases) {
1945 auto initialSyncer = &getInitialSyncer();
1946 auto opCtx = makeOpCtx();
1947
1948 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
1949 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
1950
1951 auto net = getNet();
1952 {
1953 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
1954
1955 // Base rollback ID.
1956 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
1957 net->runReadyNetworkOperations();
1958
1959 // Last oplog entry.
1960 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
1961
1962 // Feature Compatibility Version.
1963 processSuccessfulFCVFetcherResponse36();
1964
1965 // DatabasesCloner's first remote command - listDatabases
1966 assertRemoteCommandNameEquals(
1967 "listDatabases",
1968 net->scheduleSuccessfulResponse(makeListDatabasesResponse({"a", "local", "b"})));
1969 net->runReadyNetworkOperations();
1970
1971 // Oplog tailing query.
1972 auto noi = net->getNextReadyRequest();
1973 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
1974 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
1975 net->blackHole(noi);
1976
1977 // DatabasesCloner should only send listCollections requests for databases 'a' and 'b'.
1978 request = assertRemoteCommandNameEquals(
1979 "listCollections",
1980 net->scheduleSuccessfulResponse(
1981 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {})));
1982 ASSERT_EQUALS("a", request.dbname);
1983
1984 request = assertRemoteCommandNameEquals(
1985 "listCollections",
1986 net->scheduleSuccessfulResponse(
1987 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
1988 ASSERT_EQUALS("b", request.dbname);
1989
1990 // After processing all the database names and returning empty lists of collections for each
1991 // database, data cloning should run to completion and we should expect to see a last oplog
1992 // entry fetcher request.
1993 request = assertRemoteCommandNameEquals(
1994 "find",
1995 net->scheduleSuccessfulResponse(
1996 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
1997 ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
1998 }
1999
2000 getExecutor().shutdown();
2001
2002 initialSyncer->join();
2003 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2004 }
2005
TEST_F(InitialSyncerTest,InitialSyncerIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases)2006 TEST_F(InitialSyncerTest,
2007 InitialSyncerIgnoresDatabaseInfoDocumentWithoutNameFieldWhenCloningDatabases) {
2008 auto initialSyncer = &getInitialSyncer();
2009 auto opCtx = makeOpCtx();
2010
2011 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2012 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2013
2014 auto net = getNet();
2015 {
2016 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2017
2018 // Base rollback ID.
2019 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2020 net->runReadyNetworkOperations();
2021
2022 // Last oplog entry.
2023 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2024
2025 // Feature Compatibility Version.
2026 processSuccessfulFCVFetcherResponse36();
2027
2028 // DatabasesCloner's first remote command - listDatabases
2029 assertRemoteCommandNameEquals(
2030 "listDatabases",
2031 net->scheduleSuccessfulResponse(BSON("databases" << BSON_ARRAY(BSON("name"
2032 << "a")
2033 << BSON("bad"
2034 << "dbinfo")
2035 << BSON("name"
2036 << "b"))
2037 << "ok"
2038 << 1)));
2039 net->runReadyNetworkOperations();
2040
2041 // Oplog tailing query.
2042 auto noi = net->getNextReadyRequest();
2043 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2044 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2045 net->blackHole(noi);
2046
2047 // DatabasesCloner should only send listCollections requests for databases 'a' and 'b'.
2048 request = assertRemoteCommandNameEquals(
2049 "listCollections",
2050 net->scheduleSuccessfulResponse(
2051 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("a"), {})));
2052 ASSERT_EQUALS("a", request.dbname);
2053
2054 request = assertRemoteCommandNameEquals(
2055 "listCollections",
2056 net->scheduleSuccessfulResponse(
2057 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
2058 ASSERT_EQUALS("b", request.dbname);
2059
2060 // After processing all the database names and returning empty lists of collections for each
2061 // database, data cloning should run to completion and we should expect to see a last oplog
2062 // entry fetcher request.
2063 request = assertRemoteCommandNameEquals(
2064 "find",
2065 net->scheduleSuccessfulResponse(
2066 makeCursorResponse(0LL, NamespaceString::makeListCollectionsNSS("b"), {})));
2067 ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
2068 }
2069
2070 getExecutor().shutdown();
2071
2072 initialSyncer->join();
2073 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2074 }
2075
TEST_F(InitialSyncerTest,InitialSyncerCancelsBothOplogFetcherAndDatabasesClonerOnShutdown)2076 TEST_F(InitialSyncerTest, InitialSyncerCancelsBothOplogFetcherAndDatabasesClonerOnShutdown) {
2077 auto initialSyncer = &getInitialSyncer();
2078 auto opCtx = makeOpCtx();
2079
2080 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2081 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2082
2083 auto net = getNet();
2084 {
2085 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2086
2087 // Base rollback ID.
2088 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2089 net->runReadyNetworkOperations();
2090
2091 // Last oplog entry.
2092 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2093
2094 // Feature Compatibility Version.
2095 processSuccessfulFCVFetcherResponse36();
2096 }
2097
2098 ASSERT_OK(initialSyncer->shutdown());
2099 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
2100
2101 initialSyncer->join();
2102 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2103 }
2104
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher)2105 TEST_F(InitialSyncerTest,
2106 InitialSyncerPassesThroughSecondLastOplogEntryFetcherScheduleErrorAndCancelsOplogFetcher) {
2107 auto initialSyncer = &getInitialSyncer();
2108 auto opCtx = makeOpCtx();
2109
2110 // Make the second last oplog entry fetcher command fail. Allow all other requests to be
2111 // scheduled.
2112 executor::RemoteCommandRequest request;
2113 bool first = true;
2114 _executorProxy->shouldFailScheduleRemoteCommandRequest =
2115 [&first, &request](const executor::RemoteCommandRequest& requestToSend) {
2116 if ("find" == requestToSend.cmdObj.firstElement().fieldNameStringData() &&
2117 requestToSend.cmdObj.hasField("sort") &&
2118 1 == requestToSend.cmdObj.getIntField("limit")) {
2119 if (first) {
2120 first = false;
2121 return false;
2122 }
2123 request = requestToSend;
2124 return true;
2125 }
2126 return false;
2127 };
2128
2129 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2130 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2131
2132 auto net = getNet();
2133 {
2134 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2135
2136 // Base rollback ID.
2137 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2138 net->runReadyNetworkOperations();
2139
2140 // Last oplog entry.
2141 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2142
2143 // Feature Compatibility Version.
2144 processSuccessfulFCVFetcherResponse36();
2145
2146 // Quickest path to a successful DatabasesCloner completion is to respond to the
2147 // listDatabases with an empty list of database names.
2148 assertRemoteCommandNameEquals(
2149 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2150 net->runReadyNetworkOperations();
2151
2152 // DatabasesCloner will shut down the OplogFetcher on failing to schedule the last entry
2153 // oplog fetcher after setting the completion status.
2154 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2155 // _oplogFetcherCallback().
2156 net->runReadyNetworkOperations();
2157 }
2158
2159 initialSyncer->join();
2160 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2161 }
2162
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher)2163 TEST_F(InitialSyncerTest,
2164 InitialSyncerPassesThroughSecondLastOplogEntryFetcherCallbackErrorAndCancelsOplogFetcher) {
2165 auto initialSyncer = &getInitialSyncer();
2166 auto opCtx = makeOpCtx();
2167
2168 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2169 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2170
2171 auto net = getNet();
2172 {
2173 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2174
2175 // Base rollback ID.
2176 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2177 net->runReadyNetworkOperations();
2178
2179 // Last oplog entry.
2180 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2181
2182 // Feature Compatibility Version.
2183 processSuccessfulFCVFetcherResponse36();
2184
2185 // Quickest path to a successful DatabasesCloner completion is to respond to the
2186 // listDatabases with an empty list of database names.
2187 assertRemoteCommandNameEquals(
2188 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2189 net->runReadyNetworkOperations();
2190
2191 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2192 // on to the DatabasesCloner's request.
2193 auto noi = net->getNextReadyRequest();
2194 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2195 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2196 net->blackHole(noi);
2197
2198 // Second last oplog entry fetcher.
2199 request = assertRemoteCommandNameEquals(
2200 "find",
2201 net->scheduleErrorResponse(
2202 Status(ErrorCodes::OperationFailed, "second last oplog entry fetcher failed")));
2203 ASSERT_TRUE(request.cmdObj.hasField("sort"));
2204 ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
2205 net->runReadyNetworkOperations();
2206
2207 // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2208 // setting the completion status.
2209 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2210 // _oplogFetcherCallback().
2211 net->runReadyNetworkOperations();
2212 }
2213
2214 initialSyncer->join();
2215 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2216 }
2217
TEST_F(InitialSyncerTest,InitialSyncerCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown)2218 TEST_F(InitialSyncerTest,
2219 InitialSyncerCancelsBothSecondLastOplogEntryFetcherAndOplogFetcherOnShutdown) {
2220 auto initialSyncer = &getInitialSyncer();
2221 auto opCtx = makeOpCtx();
2222
2223 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2224 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2225
2226 auto net = getNet();
2227 {
2228 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2229
2230 // Base rollback ID.
2231 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2232 net->runReadyNetworkOperations();
2233
2234 // Last oplog entry.
2235 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2236
2237 // Feature Compatibility Version.
2238 processSuccessfulFCVFetcherResponse36();
2239
2240 // Quickest path to a successful DatabasesCloner completion is to respond to the
2241 // listDatabases with an empty list of database names.
2242 auto request = assertRemoteCommandNameEquals(
2243 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2244 net->runReadyNetworkOperations();
2245
2246 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2247 // on to the DatabasesCloner's request.
2248 auto noi = net->getNextReadyRequest();
2249 request = assertRemoteCommandNameEquals("find", noi->getRequest());
2250 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2251 net->blackHole(noi);
2252
2253 // Second last oplog entry fetcher.
2254 noi = net->getNextReadyRequest();
2255 request = assertRemoteCommandNameEquals("find", noi->getRequest());
2256 ASSERT_TRUE(request.cmdObj.hasField("sort"));
2257 ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
2258 net->blackHole(noi);
2259 }
2260
2261 initialSyncer->shutdown().transitional_ignore();
2262 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
2263
2264 initialSyncer->join();
2265 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2266 }
2267
TEST_F(InitialSyncerTest,InitialSyncerCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError)2268 TEST_F(InitialSyncerTest,
2269 InitialSyncerCancelsSecondLastOplogEntryFetcherOnOplogFetcherCallbackError) {
2270 auto initialSyncer = &getInitialSyncer();
2271 auto opCtx = makeOpCtx();
2272
2273 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2274 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2275
2276 auto net = getNet();
2277 {
2278 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2279
2280 // Base rollback ID.
2281 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2282 net->runReadyNetworkOperations();
2283
2284 // Last oplog entry.
2285 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2286
2287 // Feature Compatibility Version.
2288 processSuccessfulFCVFetcherResponse36();
2289
2290 // Quickest path to a successful DatabasesCloner completion is to respond to the
2291 // listDatabases with an empty list of database names.
2292 assertRemoteCommandNameEquals(
2293 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2294 net->runReadyNetworkOperations();
2295
2296 // Save request for OplogFetcher's oplog tailing query. This request will be canceled.
2297 auto noi = net->getNextReadyRequest();
2298 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2299 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
2300 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2301 auto oplogFetcherNetworkOperationIterator = noi;
2302
2303 // Second last oplog entry fetcher.
2304 // Blackhole this request which will be canceled when oplog fetcher fails.
2305 noi = net->getNextReadyRequest();
2306 request = assertRemoteCommandNameEquals("find", noi->getRequest());
2307 ASSERT_TRUE(request.cmdObj.hasField("sort"));
2308 ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
2309 net->blackHole(noi);
2310
2311 // Make oplog fetcher fail.
2312 net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator,
2313 Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
2314 net->runReadyNetworkOperations();
2315
2316 // _oplogFetcherCallback() will shut down the '_lastOplogEntryFetcher' after setting the
2317 // completion status.
2318 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2319 // _lastOplogEntryFetcherCallbackAfterCloningData().
2320 net->runReadyNetworkOperations();
2321 }
2322
2323 initialSyncer->join();
2324 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2325 }
2326
TEST_F(InitialSyncerTest,InitialSyncerReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument)2327 TEST_F(
2328 InitialSyncerTest,
2329 InitialSyncerReturnsTypeMismatchErrorWhenSecondLastOplogEntryFetcherReturnsMalformedDocument) {
2330 auto initialSyncer = &getInitialSyncer();
2331 auto opCtx = makeOpCtx();
2332
2333 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2334 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2335
2336 auto oplogEntry = makeOplogEntryObj(1);
2337 auto net = getNet();
2338 {
2339 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2340
2341 // Base rollback ID.
2342 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2343 net->runReadyNetworkOperations();
2344
2345 // Last oplog entry.
2346 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2347
2348 // Feature Compatibility Version.
2349 processSuccessfulFCVFetcherResponse36();
2350
2351 // Quickest path to a successful DatabasesCloner completion is to respond to the
2352 // listDatabases with an empty list of database names.
2353 assertRemoteCommandNameEquals(
2354 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2355 net->runReadyNetworkOperations();
2356
2357 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2358 // on to the DatabasesCloner's request.
2359 auto noi = net->getNextReadyRequest();
2360 auto request = noi->getRequest();
2361 assertRemoteCommandNameEquals("find", request);
2362 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2363 net->blackHole(noi);
2364
2365 // Second last oplog entry fetcher.
2366 processSuccessfulLastOplogEntryFetcherResponse({BSON("ts" << Timestamp(1) << "t" << 1 << "h"
2367 << "not a hash")});
2368
2369 // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2370 // setting the completion status.
2371 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2372 // _oplogFetcherCallback().
2373 net->runReadyNetworkOperations();
2374 }
2375
2376 initialSyncer->join();
2377 ASSERT_EQUALS(ErrorCodes::TypeMismatch, _lastApplied);
2378 }
2379
TEST_F(InitialSyncerTest,InitialSyncerReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp)2380 TEST_F(InitialSyncerTest,
2381 InitialSyncerReturnsOplogOutOfOrderIfStopTimestampPrecedesBeginTimestamp) {
2382 auto initialSyncer = &getInitialSyncer();
2383 auto opCtx = makeOpCtx();
2384
2385 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2386 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2387
2388 auto net = getNet();
2389 {
2390 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2391
2392 // Base rollback ID.
2393 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2394 net->runReadyNetworkOperations();
2395
2396 // Last oplog entry.
2397 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
2398
2399 // Feature Compatibility Version.
2400 processSuccessfulFCVFetcherResponse36();
2401
2402 // Quickest path to a successful DatabasesCloner completion is to respond to the
2403 // listDatabases with an empty list of database names.
2404 assertRemoteCommandNameEquals(
2405 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2406 net->runReadyNetworkOperations();
2407
2408 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2409 // on to the DatabasesCloner's request.
2410 auto noi = net->getNextReadyRequest();
2411 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2412 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2413 net->blackHole(noi);
2414
2415 // Second last oplog entry fetcher.
2416 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2417
2418 // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2419 // setting the completion status.
2420 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2421 // _oplogFetcherCallback().
2422 net->runReadyNetworkOperations();
2423 }
2424
2425 initialSyncer->join();
2426 ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
2427 }
2428
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply)2429 TEST_F(
2430 InitialSyncerTest,
2431 InitialSyncerPassesThroughInsertOplogSeedDocumentErrorAfterDataCloningFinishesWithNoOperationsToApply) {
2432 auto initialSyncer = &getInitialSyncer();
2433 auto opCtx = makeOpCtx();
2434
2435 NamespaceString insertDocumentNss;
2436 TimestampedBSONObj insertDocumentDoc;
2437 long long insertDocumentTerm;
2438 _storageInterface->insertDocumentFn =
2439 [&insertDocumentDoc, &insertDocumentNss, &insertDocumentTerm](OperationContext*,
2440 const NamespaceString& nss,
2441 const TimestampedBSONObj& doc,
2442 long long term) {
2443 insertDocumentNss = nss;
2444 insertDocumentDoc = doc;
2445 insertDocumentTerm = term;
2446 return Status(ErrorCodes::OperationFailed, "failed to insert oplog entry");
2447 };
2448
2449 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2450 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2451
2452 auto oplogEntry = makeOplogEntryObj(1);
2453 auto net = getNet();
2454 {
2455 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2456
2457 // Base rollback ID.
2458 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2459 net->runReadyNetworkOperations();
2460
2461 // Last oplog entry.
2462 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2463
2464 // Feature Compatibility Version.
2465 processSuccessfulFCVFetcherResponse36();
2466
2467 // Quickest path to a successful DatabasesCloner completion is to respond to the
2468 // listDatabases with an empty list of database names.
2469 assertRemoteCommandNameEquals(
2470 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2471 net->runReadyNetworkOperations();
2472
2473 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2474 // on to the DatabasesCloner's request.
2475 auto noi = net->getNextReadyRequest();
2476 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2477 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2478 net->blackHole(noi);
2479
2480 // Second last oplog entry fetcher.
2481 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2482
2483 // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2484 // setting the completion status.
2485 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2486 // _oplogFetcherCallback().
2487 net->runReadyNetworkOperations();
2488 }
2489
2490 initialSyncer->join();
2491 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2492 ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
2493 ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc.obj);
2494 }
2495
TEST_F(InitialSyncerTest,InitialSyncerReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument)2496 TEST_F(
2497 InitialSyncerTest,
2498 InitialSyncerReturnsCallbackCanceledAndDoesNotScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument) {
2499 auto initialSyncer = &getInitialSyncer();
2500 auto opCtx = makeOpCtx();
2501
2502 NamespaceString insertDocumentNss;
2503 TimestampedBSONObj insertDocumentDoc;
2504 long long insertDocumentTerm;
2505 _storageInterface->insertDocumentFn = [initialSyncer,
2506 &insertDocumentDoc,
2507 &insertDocumentNss,
2508 &insertDocumentTerm](OperationContext*,
2509 const NamespaceString& nss,
2510 const TimestampedBSONObj& doc,
2511 long long term) {
2512 insertDocumentNss = nss;
2513 insertDocumentDoc = doc;
2514 insertDocumentTerm = term;
2515 initialSyncer->shutdown().transitional_ignore();
2516 return Status::OK();
2517 };
2518
2519 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2520 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2521
2522 auto oplogEntry = makeOplogEntryObj(1);
2523 auto net = getNet();
2524 {
2525 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2526
2527 // Base rollback ID.
2528 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2529 net->runReadyNetworkOperations();
2530
2531 // Last oplog entry.
2532 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2533
2534 // Feature Compatibility Version.
2535 processSuccessfulFCVFetcherResponse36();
2536
2537 // Quickest path to a successful DatabasesCloner completion is to respond to the
2538 // listDatabases with an empty list of database names.
2539 assertRemoteCommandNameEquals(
2540 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2541 net->runReadyNetworkOperations();
2542
2543 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2544 // on to the DatabasesCloner's request.
2545 auto noi = net->getNextReadyRequest();
2546 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2547 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2548 net->blackHole(noi);
2549
2550 // Second last oplog entry fetcher.
2551 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2552
2553 // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2554 // setting the completion status.
2555 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2556 // _oplogFetcherCallback().
2557 net->runReadyNetworkOperations();
2558 }
2559
2560 initialSyncer->join();
2561 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2562 ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
2563 ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc.obj);
2564 }
2565
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply)2566 TEST_F(
2567 InitialSyncerTest,
2568 InitialSyncerPassesThroughRollbackCheckerScheduleErrorAfterCloningFinishesWithNoOperationsToApply) {
2569 auto initialSyncer = &getInitialSyncer();
2570 auto opCtx = makeOpCtx();
2571
2572 // Make the second replSetGetRBID command fail. Allow all other requests to be scheduled.
2573 executor::RemoteCommandRequest request;
2574 bool first = true;
2575 _executorProxy->shouldFailScheduleRemoteCommandRequest =
2576 [&first, &request](const executor::RemoteCommandRequest& requestToSend) {
2577 if ("replSetGetRBID" == requestToSend.cmdObj.firstElement().fieldNameStringData()) {
2578 if (first) {
2579 first = false;
2580 return false;
2581 }
2582 request = requestToSend;
2583 return true;
2584 }
2585 return false;
2586 };
2587
2588 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2589 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2590
2591 auto oplogEntry = makeOplogEntryObj(1);
2592 auto net = getNet();
2593 {
2594 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2595
2596 // Base rollback ID.
2597 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2598 net->runReadyNetworkOperations();
2599
2600 // Last oplog entry.
2601 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2602
2603 // Feature Compatibility Version.
2604 processSuccessfulFCVFetcherResponse36();
2605
2606 // Quickest path to a successful DatabasesCloner completion is to respond to the
2607 // listDatabases with an empty list of database names.
2608 assertRemoteCommandNameEquals(
2609 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2610 net->runReadyNetworkOperations();
2611
2612 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2613 // on to the DatabasesCloner's request.
2614 auto noi = net->getNextReadyRequest();
2615 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2616 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2617 net->blackHole(noi);
2618
2619 // Second last oplog entry fetcher.
2620 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2621
2622 // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2623 // setting the completion status.
2624 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2625 // _oplogFetcherCallback().
2626 net->runReadyNetworkOperations();
2627 }
2628
2629 initialSyncer->join();
2630 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2631 }
2632
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply)2633 TEST_F(
2634 InitialSyncerTest,
2635 InitialSyncerPassesThroughRollbackCheckerCallbackErrorAfterCloningFinishesWithNoOperationsToApply) {
2636 auto initialSyncer = &getInitialSyncer();
2637 auto opCtx = makeOpCtx();
2638
2639 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2640 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2641
2642 auto oplogEntry = makeOplogEntryObj(1);
2643 auto net = getNet();
2644 {
2645 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2646
2647 // Base rollback ID.
2648 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2649 net->runReadyNetworkOperations();
2650
2651 // Last oplog entry.
2652 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2653
2654 // Feature Compatibility Version.
2655 processSuccessfulFCVFetcherResponse36();
2656
2657 // Quickest path to a successful DatabasesCloner completion is to respond to the
2658 // listDatabases with an empty list of database names.
2659 assertRemoteCommandNameEquals(
2660 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2661 net->runReadyNetworkOperations();
2662
2663 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2664 // on to the DatabasesCloner's request.
2665 auto noi = net->getNextReadyRequest();
2666 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2667 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2668 net->blackHole(noi);
2669
2670 // Second last oplog entry fetcher.
2671 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2672
2673 // Last rollback checker replSetGetRBID command.
2674 assertRemoteCommandNameEquals(
2675 "replSetGetRBID",
2676 net->scheduleErrorResponse(
2677 Status(ErrorCodes::OperationFailed, "replSetGetRBID command failed")));
2678 net->runReadyNetworkOperations();
2679
2680 // _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting
2681 // the completion status.
2682 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2683 // _oplogFetcherCallback().
2684 net->runReadyNetworkOperations();
2685 }
2686
2687 initialSyncer->join();
2688 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2689 }
2690
TEST_F(InitialSyncerTest,InitialSyncerCancelsLastRollbackCheckerOnShutdown)2691 TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) {
2692 auto initialSyncer = &getInitialSyncer();
2693 auto opCtx = makeOpCtx();
2694
2695 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2696 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2697
2698 auto oplogEntry = makeOplogEntryObj(1);
2699 auto net = getNet();
2700 {
2701 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2702
2703 // Base rollback ID.
2704 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2705 net->runReadyNetworkOperations();
2706
2707 // Last oplog entry.
2708 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2709
2710 // Feature Compatibility Version.
2711 processSuccessfulFCVFetcherResponse36();
2712
2713 // Quickest path to a successful DatabasesCloner completion is to respond to the
2714 // listDatabases with an empty list of database names.
2715 assertRemoteCommandNameEquals(
2716 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2717 net->runReadyNetworkOperations();
2718
2719 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2720 // on to the DatabasesCloner's request.
2721 auto noi = net->getNextReadyRequest();
2722 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2723 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2724 net->blackHole(noi);
2725
2726 // Second last oplog entry fetcher.
2727 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2728
2729 // Last rollback checker replSetGetRBID command.
2730 noi = net->getNextReadyRequest();
2731 assertRemoteCommandNameEquals("replSetGetRBID", noi->getRequest());
2732 net->blackHole(noi);
2733
2734 // _rollbackCheckerCheckForRollbackCallback() will shut down the OplogFetcher after setting
2735 // the completion status.
2736 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2737 // _oplogFetcherCallback().
2738 net->runReadyNetworkOperations();
2739 }
2740
2741 ASSERT_OK(initialSyncer->shutdown());
2742 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
2743
2744 initialSyncer->join();
2745 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
2746 }
2747
TEST_F(InitialSyncerTest,InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherCallbackError)2748 TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherCallbackError) {
2749 auto initialSyncer = &getInitialSyncer();
2750 auto opCtx = makeOpCtx();
2751
2752 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2753 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2754
2755 auto oplogEntry = makeOplogEntryObj(1);
2756 auto net = getNet();
2757 {
2758 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2759
2760 // Base rollback ID.
2761 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1));
2762 net->runReadyNetworkOperations();
2763
2764 // Last oplog entry.
2765 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2766
2767 // Feature Compatibility Version.
2768 processSuccessfulFCVFetcherResponse36();
2769
2770 // Quickest path to a successful DatabasesCloner completion is to respond to the
2771 // listDatabases with an empty list of database names.
2772 assertRemoteCommandNameEquals(
2773 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2774 net->runReadyNetworkOperations();
2775
2776 // Save request for OplogFetcher's oplog tailing query. This request will be canceled.
2777 auto noi = net->getNextReadyRequest();
2778 auto request = assertRemoteCommandNameEquals("find", noi->getRequest());
2779 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
2780 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2781 auto oplogFetcherNetworkOperationIterator = noi;
2782
2783 // Second last oplog entry fetcher.
2784 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2785
2786 // Last rollback checker replSetGetRBID command.
2787 noi = net->getNextReadyRequest();
2788 request = noi->getRequest();
2789 assertRemoteCommandNameEquals("replSetGetRBID", request);
2790 net->blackHole(noi);
2791
2792 // Make oplog fetcher fail.
2793 net->scheduleErrorResponse(oplogFetcherNetworkOperationIterator,
2794 Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
2795 net->runReadyNetworkOperations();
2796
2797 // _oplogFetcherCallback() will shut down the last rollback checker after setting the
2798 // completion status.
2799 // We call runReadyNetworkOperations() again to deliver the cancellation status to
2800 // _rollbackCheckerCheckForRollbackCallback().
2801 net->runReadyNetworkOperations();
2802 }
2803
2804 initialSyncer->join();
2805 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
2806 }
2807
TEST_F(InitialSyncerTest,InitialSyncerReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData)2808 TEST_F(InitialSyncerTest,
2809 InitialSyncerReturnsUnrecoverableRollbackErrorIfSyncSourceRolledBackAfterCloningData) {
2810 auto initialSyncer = &getInitialSyncer();
2811 auto opCtx = makeOpCtx();
2812
2813 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2814 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2815
2816 auto oplogEntry = makeOplogEntryObj(1);
2817 auto net = getNet();
2818 int baseRollbackId = 1;
2819 {
2820 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2821
2822 // Base rollback ID.
2823 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
2824 net->runReadyNetworkOperations();
2825
2826 // Last oplog entry.
2827 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2828
2829 // Feature Compatibility Version.
2830 processSuccessfulFCVFetcherResponse36();
2831
2832 // Quickest path to a successful DatabasesCloner completion is to respond to the
2833 // listDatabases with an empty list of database names.
2834 assertRemoteCommandNameEquals(
2835 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2836 net->runReadyNetworkOperations();
2837
2838 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2839 // on to the DatabasesCloner's request.
2840 auto noi = net->getNextReadyRequest();
2841 auto request = noi->getRequest();
2842 assertRemoteCommandNameEquals("find", request);
2843 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2844 net->blackHole(noi);
2845
2846 // Second last oplog entry fetcher.
2847 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
2848
2849 // Last rollback checker replSetGetRBID command.
2850 request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId + 1));
2851 net->runReadyNetworkOperations();
2852 assertRemoteCommandNameEquals("replSetGetRBID", request);
2853 net->runReadyNetworkOperations();
2854 }
2855
2856 initialSyncer->join();
2857 ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, _lastApplied);
2858 }
2859
TEST_F(InitialSyncerTest,LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning)2860 TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
2861 auto initialSyncer = &getInitialSyncer();
2862 auto opCtx = makeOpCtx();
2863
2864 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2865 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2866
2867 ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
2868
2869 auto oplogEntry = makeOplogEntry(1);
2870 auto net = getNet();
2871 int baseRollbackId = 1;
2872 {
2873 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2874
2875 // Base rollback ID.
2876 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
2877 net->runReadyNetworkOperations();
2878
2879 // Last oplog entry.
2880 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()});
2881
2882 // Feature Compatibility Version.
2883 processSuccessfulFCVFetcherResponse36();
2884
2885 // Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
2886 // database names, we'll simulate copying a single database with a single collection on the
2887 // sync source.
2888 NamespaceString nss("a.a");
2889 auto request =
2890 net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
2891 assertRemoteCommandNameEquals("listDatabases", request);
2892 net->runReadyNetworkOperations();
2893
2894 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2895 // on to the DatabasesCloner's request.
2896 auto noi = net->getNextReadyRequest();
2897 request = noi->getRequest();
2898 assertRemoteCommandNameEquals("find", request);
2899 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2900 net->blackHole(noi);
2901
2902 // listCollections for "a"
2903 request = net->scheduleSuccessfulResponse(
2904 makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
2905 assertRemoteCommandNameEquals("listCollections", request);
2906
2907 // count:a
2908 request = assertRemoteCommandNameEquals(
2909 "count", net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1)));
2910 ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
2911 ASSERT_EQUALS(nss.db(), request.dbname);
2912
2913 // listIndexes:a
2914 request = assertRemoteCommandNameEquals(
2915 "listIndexes",
2916 net->scheduleSuccessfulResponse(makeCursorResponse(
2917 0LL,
2918 NamespaceString(nss.getCommandNS()),
2919 {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
2920 << "_id_"
2921 << "ns"
2922 << nss.ns())})));
2923 ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
2924 ASSERT_EQUALS(nss.db(), request.dbname);
2925
2926 // find:a
2927 request = assertRemoteCommandNameEquals("find",
2928 net->scheduleSuccessfulResponse(makeCursorResponse(
2929 0LL, nss, {BSON("_id" << 1 << "a" << 1)})));
2930 ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
2931 ASSERT_EQUALS(nss.db(), request.dbname);
2932
2933 // Second last oplog entry fetcher.
2934 processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()});
2935
2936 // Last rollback checker replSetGetRBID command.
2937 request = assertRemoteCommandNameEquals(
2938 "replSetGetRBID",
2939 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)));
2940 net->runReadyNetworkOperations();
2941
2942 // Deliver cancellation to OplogFetcher.
2943 net->runReadyNetworkOperations();
2944 }
2945
2946 initialSyncer->join();
2947 ASSERT_EQUALS(oplogEntry.getOpTime(), unittest::assertGet(_lastApplied).opTime);
2948 ASSERT_EQUALS(oplogEntry.getHash(), unittest::assertGet(_lastApplied).value);
2949 ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
2950 }
2951
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughGetNextApplierBatchScheduleError)2952 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleError) {
2953 auto initialSyncer = &getInitialSyncer();
2954 auto opCtx = makeOpCtx();
2955
2956 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
2957 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
2958
2959 ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
2960
2961 auto net = getNet();
2962 int baseRollbackId = 1;
2963 {
2964 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
2965
2966 // Base rollback ID.
2967 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
2968 net->runReadyNetworkOperations();
2969
2970 // Last oplog entry.
2971 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
2972
2973 // Feature Compatibility Version.
2974 processSuccessfulFCVFetcherResponse36();
2975
2976 // Quickest path to a successful DatabasesCloner completion is to respond to the
2977 // listDatabases with an empty list of database names.
2978 assertRemoteCommandNameEquals(
2979 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
2980 net->runReadyNetworkOperations();
2981
2982 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
2983 // on to the DatabasesCloner's request.
2984 auto noi = net->getNextReadyRequest();
2985 auto request = noi->getRequest();
2986 assertRemoteCommandNameEquals("find", request);
2987 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
2988 net->blackHole(noi);
2989
2990 // Before processing scheduled last oplog entry fetcher response, set flag in
2991 // TaskExecutorMock so that InitialSyncer will fail to schedule
2992 // _getNextApplierBatchCallback().
2993 _executorProxy->shouldFailScheduleWorkRequest = []() { return true; };
2994
2995 // Second last oplog entry fetcher.
2996 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
2997
2998 // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
2999 // setting the completion status.
3000 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3001 // _oplogFetcherCallback().
3002 net->runReadyNetworkOperations();
3003 }
3004
3005 initialSyncer->join();
3006 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3007 }
3008
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughSecondGetNextApplierBatchScheduleError)3009 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchScheduleError) {
3010 auto initialSyncer = &getInitialSyncer();
3011 auto opCtx = makeOpCtx();
3012
3013 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3014 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3015
3016 ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3017
3018 auto net = getNet();
3019 int baseRollbackId = 1;
3020 {
3021 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3022
3023 // Base rollback ID.
3024 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3025 net->runReadyNetworkOperations();
3026
3027 // Last oplog entry.
3028 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3029
3030 // Feature Compatibility Version.
3031 processSuccessfulFCVFetcherResponse36();
3032
3033 // Quickest path to a successful DatabasesCloner completion is to respond to the
3034 // listDatabases with an empty list of database names.
3035 assertRemoteCommandNameEquals(
3036 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3037 net->runReadyNetworkOperations();
3038
3039 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
3040 // on to the DatabasesCloner's request.
3041 auto noi = net->getNextReadyRequest();
3042 auto request = noi->getRequest();
3043 assertRemoteCommandNameEquals("find", request);
3044 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
3045 net->blackHole(noi);
3046
3047 // Before processing scheduled last oplog entry fetcher response, set flag in
3048 // TaskExecutorMock so that InitialSyncer will fail to schedule second
3049 // _getNextApplierBatchCallback() at (now + options.getApplierBatchCallbackRetryWait).
3050 _executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
3051
3052 // Second last oplog entry fetcher.
3053 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3054
3055 // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
3056 // setting the completion status.
3057 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3058 // _oplogFetcherCallback().
3059 net->runReadyNetworkOperations();
3060 }
3061
3062 initialSyncer->join();
3063 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3064 }
3065
TEST_F(InitialSyncerTest,InitialSyncerCancelsGetNextApplierBatchOnShutdown)3066 TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
3067 auto initialSyncer = &getInitialSyncer();
3068 auto opCtx = makeOpCtx();
3069
3070 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3071 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3072
3073 ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3074
3075 auto net = getNet();
3076 int baseRollbackId = 1;
3077 {
3078 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3079
3080 // Base rollback ID.
3081 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3082 net->runReadyNetworkOperations();
3083
3084 // Last oplog entry.
3085 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3086
3087 // Feature Compatibility Version.
3088 processSuccessfulFCVFetcherResponse36();
3089
3090 // Quickest path to a successful DatabasesCloner completion is to respond to the
3091 // listDatabases with an empty list of database names.
3092 assertRemoteCommandNameEquals(
3093 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3094 net->runReadyNetworkOperations();
3095
3096 // We do not have to respond to the OplogFetcher's oplog tailing query. Blackhole and move
3097 // on to the DatabasesCloner's request.
3098 auto noi = net->getNextReadyRequest();
3099 auto request = noi->getRequest();
3100 assertRemoteCommandNameEquals("find", request);
3101 ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
3102 net->blackHole(noi);
3103
3104 // Second last oplog entry fetcher.
3105 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3106
3107 // Since we black holed OplogFetcher's find request, _getNextApplierBatch_inlock() will
3108 // not return any operations for us to apply, leading to _getNextApplierBatchCallback()
3109 // rescheduling itself at new->now() + _options.getApplierBatchCallbackRetryWait.
3110 }
3111
3112 ASSERT_OK(initialSyncer->shutdown());
3113 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
3114
3115 initialSyncer->join();
3116 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
3117 }
3118
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughGetNextApplierBatchInLockError)3119 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockError) {
3120 auto initialSyncer = &getInitialSyncer();
3121 auto opCtx = makeOpCtx();
3122
3123 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3124 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3125
3126 ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3127
3128 // _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
3129 // version (not OplogEntry::kOplogVersion).
3130 auto oplogEntry = makeOplogEntryObj(1);
3131 auto oplogEntryWithInconsistentVersion =
3132 makeOplogEntryObj(2, OpTypeEnum::kInsert, OplogEntry::kOplogVersion + 100);
3133
3134 auto net = getNet();
3135 int baseRollbackId = 1;
3136 {
3137 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3138
3139 // Base rollback ID.
3140 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3141 net->runReadyNetworkOperations();
3142
3143 // Last oplog entry.
3144 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3145
3146 // Feature Compatibility Version.
3147 processSuccessfulFCVFetcherResponse36();
3148
3149 // Quickest path to a successful DatabasesCloner completion is to respond to the
3150 // listDatabases with an empty list of database names.
3151 assertRemoteCommandNameEquals(
3152 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3153 net->runReadyNetworkOperations();
3154
3155 // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
3156 // oplog buffer and processed by _getNextApplierBatch_inlock().
3157 auto request = assertRemoteCommandNameEquals(
3158 "find",
3159 net->scheduleSuccessfulResponse(makeCursorResponse(
3160 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion})));
3161 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3162 net->runReadyNetworkOperations();
3163
3164 // Second last oplog entry fetcher.
3165 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3166
3167 // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
3168 // completion status.
3169 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3170 // _oplogFetcherCallback().
3171 net->runReadyNetworkOperations();
3172 }
3173
3174 initialSyncer->join();
3175 ASSERT_EQUALS(ErrorCodes::BadValue, _lastApplied);
3176 }
3177
TEST_F(InitialSyncerTest,InitialSyncerReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled)3178 TEST_F(
3179 InitialSyncerTest,
3180 InitialSyncerReturnsEmptyBatchFromGetNextApplierBatchInLockIfRsSyncApplyStopFailPointIsEnabled) {
3181 auto initialSyncer = &getInitialSyncer();
3182 auto opCtx = makeOpCtx();
3183
3184 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3185 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3186
3187 ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3188
3189 // _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
3190 // version (not OplogEntry::kOplogVersion).
3191 auto oplogEntry = makeOplogEntryObj(1);
3192 auto oplogEntryWithInconsistentVersion =
3193 makeOplogEntryObj(2, OpTypeEnum::kInsert, OplogEntry::kOplogVersion + 100);
3194
3195 // Enable 'rsSyncApplyStop' so that _getNextApplierBatch_inlock() returns an empty batch of
3196 // operations instead of a batch containing an oplog entry with a bad version.
3197 auto failPoint = getGlobalFailPointRegistry()->getFailPoint("rsSyncApplyStop");
3198 failPoint->setMode(FailPoint::alwaysOn);
3199 ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
3200
3201 auto net = getNet();
3202 int baseRollbackId = 1;
3203 {
3204 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3205
3206 // Base rollback ID.
3207 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3208 net->runReadyNetworkOperations();
3209
3210 // Last oplog entry.
3211 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3212
3213 // Feature Compatibility Version.
3214 processSuccessfulFCVFetcherResponse36();
3215
3216 // Quickest path to a successful DatabasesCloner completion is to respond to the
3217 // listDatabases with an empty list of database names.
3218 assertRemoteCommandNameEquals(
3219 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3220 net->runReadyNetworkOperations();
3221
3222 // OplogFetcher's oplog tailing query. Return bad oplog entry that will be added to the
3223 // oplog buffer and processed by _getNextApplierBatch_inlock().
3224 auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
3225 1LL, _options.localOplogNS, {oplogEntry, oplogEntryWithInconsistentVersion}));
3226 assertRemoteCommandNameEquals("find", request);
3227 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3228 net->runReadyNetworkOperations();
3229
3230 // Second last oplog entry fetcher.
3231 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3232
3233 // Since the 'rsSyncApplyStop' fail point is enabled, InitialSyncer will get an empty
3234 // batch of operations from _getNextApplierBatch_inlock() even though the oplog buffer
3235 // is not empty.
3236 }
3237
3238 // If the fail point is not working, the initial sync status will be set to BadValue (due to the
3239 // bad oplog entry in the oplog buffer) and shutdown() will not be able to overwrite this status
3240 // with CallbackCanceled.
3241 // Otherwise, shutdown() will cancel both the OplogFetcher and the scheduled
3242 // _getNextApplierBatchCallback() task. The final initial sync status will be CallbackCanceled.
3243 ASSERT_OK(initialSyncer->shutdown());
3244 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
3245
3246 initialSyncer->join();
3247 ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
3248 }
3249
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughMultiApplierScheduleError)3250 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
3251 auto initialSyncer = &getInitialSyncer();
3252 auto opCtx = makeOpCtx();
3253
3254 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3255 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3256
3257 ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
3258
3259 auto net = getNet();
3260 int baseRollbackId = 1;
3261 {
3262 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3263
3264 // Base rollback ID.
3265 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3266 net->runReadyNetworkOperations();
3267
3268 // Last oplog entry.
3269 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3270
3271 // Feature Compatibility Version.
3272 processSuccessfulFCVFetcherResponse36();
3273
3274 // Quickest path to a successful DatabasesCloner completion is to respond to the
3275 // listDatabases with an empty list of database names.
3276 assertRemoteCommandNameEquals(
3277 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3278 net->runReadyNetworkOperations();
3279
3280 // OplogFetcher's oplog tailing query. Save for later.
3281 auto noi = net->getNextReadyRequest();
3282 auto request = noi->getRequest();
3283 assertRemoteCommandNameEquals("find", request);
3284 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3285 auto oplogFetcherNoi = noi;
3286
3287 // Second last oplog entry fetcher.
3288 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3289
3290 // _getNextApplierBatchCallback() should have rescheduled itself.
3291 // We'll insert some operations in the oplog buffer so that we'll attempt to schedule
3292 // MultiApplier next time _getNextApplierBatchCallback() runs.
3293 net->scheduleSuccessfulResponse(
3294 oplogFetcherNoi,
3295 makeCursorResponse(
3296 1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
3297 net->runReadyNetworkOperations();
3298
3299 // Ignore OplogFetcher's getMore request.
3300 noi = net->getNextReadyRequest();
3301 request = noi->getRequest();
3302 assertRemoteCommandNameEquals("getMore", request);
3303
3304 // Make MultiApplier::startup() fail.
3305 _executorProxy->shouldFailScheduleWorkRequest = []() { return true; };
3306
3307 // Advance clock until _getNextApplierBatchCallback() runs.
3308 auto when = net->now() + _options.getApplierBatchCallbackRetryWait;
3309 ASSERT_EQUALS(when, net->runUntil(when));
3310
3311 // _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
3312 // completion status.
3313 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3314 // _oplogFetcherCallback().
3315 net->runReadyNetworkOperations();
3316 }
3317
3318 initialSyncer->join();
3319 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3320 }
3321
TEST_F(InitialSyncerTest,InitialSyncerPassesThroughMultiApplierCallbackError)3322 TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) {
3323 auto initialSyncer = &getInitialSyncer();
3324 auto opCtx = makeOpCtx();
3325
3326 getExternalState()->multiApplyFn =
3327 [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) {
3328 return Status(ErrorCodes::OperationFailed, "multiApply failed");
3329 };
3330 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3331 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3332
3333 auto net = getNet();
3334 int baseRollbackId = 1;
3335 {
3336 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3337
3338 // Base rollback ID.
3339 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3340 net->runReadyNetworkOperations();
3341
3342 // Last oplog entry.
3343 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3344
3345 // Feature Compatibility Version.
3346 processSuccessfulFCVFetcherResponse36();
3347
3348 // Quickest path to a successful DatabasesCloner completion is to respond to the
3349 // listDatabases with an empty list of database names.
3350 assertRemoteCommandNameEquals(
3351 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3352 net->runReadyNetworkOperations();
3353
3354 // OplogFetcher's oplog tailing query. Provide enough operations to trigger MultiApplier.
3355 auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
3356 1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
3357 assertRemoteCommandNameEquals("find", request);
3358 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3359 net->runReadyNetworkOperations();
3360
3361 // Second last oplog entry fetcher.
3362 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3363
3364 // _multiApplierCallback() will shut down the OplogFetcher after setting the completion
3365 // status.
3366 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3367 // _oplogFetcherCallback().
3368 net->runReadyNetworkOperations();
3369 }
3370
3371 initialSyncer->join();
3372 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3373 }
3374
TEST_F(InitialSyncerTest,InitialSyncerCancelsGetNextApplierBatchCallbackOnOplogFetcherError)3375 TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplogFetcherError) {
3376 auto initialSyncer = &getInitialSyncer();
3377 auto opCtx = makeOpCtx();
3378
3379 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3380 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3381
3382 auto net = getNet();
3383 int baseRollbackId = 1;
3384 {
3385 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3386
3387 // Base rollback ID.
3388 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3389 net->runReadyNetworkOperations();
3390
3391 // Last oplog entry.
3392 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3393
3394 // Feature Compatibility Version.
3395 processSuccessfulFCVFetcherResponse36();
3396
3397 // Quickest path to a successful DatabasesCloner completion is to respond to the
3398 // listDatabases with an empty list of database names.
3399 assertRemoteCommandNameEquals(
3400 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3401 net->runReadyNetworkOperations();
3402
3403 // OplogFetcher's oplog tailing query. Save for later.
3404 auto noi = net->getNextReadyRequest();
3405 auto request = noi->getRequest();
3406 assertRemoteCommandNameEquals("find", request);
3407 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3408 auto oplogFetcherNoi = noi;
3409
3410 // Second last oplog entry fetcher.
3411 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3412
3413 // Send error to _oplogFetcherCallback().
3414 net->scheduleErrorResponse(oplogFetcherNoi,
3415 Status(ErrorCodes::OperationFailed, "oplog fetcher failed"));
3416
3417 // _oplogFetcherCallback() will cancel the _getNextApplierBatchCallback() task after setting
3418 // the completion status.
3419 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3420 // _oplogFetcherCallback().
3421 net->runReadyNetworkOperations();
3422 }
3423
3424 initialSyncer->join();
3425 ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
3426 }
3427
doInitialSyncWithOneBatch()3428 OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch() {
3429 auto initialSyncer = &getInitialSyncer();
3430 auto opCtx = makeOpCtx();
3431
3432 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3433 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3434
3435 auto lastOp = makeOplogEntry(2);
3436
3437 auto net = getNet();
3438 int baseRollbackId = 1;
3439 {
3440 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3441
3442 // Base rollback ID.
3443 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3444 net->runReadyNetworkOperations();
3445
3446 // Last oplog entry.
3447 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3448
3449 // Feature Compatibility Version.
3450 processSuccessfulFCVFetcherResponse36();
3451
3452 // Quickest path to a successful DatabasesCloner completion is to respond to the
3453 // listDatabases with an empty list of database names.
3454 assertRemoteCommandNameEquals(
3455 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3456 net->runReadyNetworkOperations();
3457
3458 // OplogFetcher's oplog tailing query. Response has enough operations to reach
3459 // end timestamp.
3460 auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
3461 1LL, _options.localOplogNS, {makeOplogEntryObj(1), lastOp.toBSON()}));
3462 assertRemoteCommandNameEquals("find", request);
3463 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3464 net->runReadyNetworkOperations();
3465
3466 // Second last oplog entry fetcher.
3467 processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
3468
3469 // Black hole OplogFetcher's getMore request.
3470 auto noi = net->getNextReadyRequest();
3471 request = noi->getRequest();
3472 assertRemoteCommandNameEquals("getMore", request);
3473 net->blackHole(noi);
3474
3475 // Last rollback ID.
3476 request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3477 assertRemoteCommandNameEquals("replSetGetRBID", request);
3478 net->runReadyNetworkOperations();
3479
3480 // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
3481 // the completion status.
3482 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3483 // _oplogFetcherCallback().
3484 net->runReadyNetworkOperations();
3485 }
3486
3487 initialSyncer->join();
3488 return lastOp;
3489 }
3490
doSuccessfulInitialSyncWithOneBatch()3491 void InitialSyncerTest::doSuccessfulInitialSyncWithOneBatch() {
3492 auto lastOp = doInitialSyncWithOneBatch();
3493 ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
3494 ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
3495
3496 ASSERT_EQUALS(lastOp.getOpTime().getTimestamp(), _storageInterface->getInitialDataTimestamp());
3497 }
3498
TEST_F(InitialSyncerTest,InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch)3499 TEST_F(InitialSyncerTest,
3500 InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingOneBatch) {
3501 // In this test, getCollectionUUID should not return a UUID. Hence,
3502 // upgradeUUIDSchemaVersionNonReplicated should not be called.
3503 doSuccessfulInitialSyncWithOneBatch();
3504
3505 // Ensure upgradeUUIDSchemaVersionNonReplicated was not called.
3506 LockGuard lock(_storageInterfaceWorkDoneMutex);
3507 ASSERT_FALSE(_storageInterfaceWorkDone.schemaUpgraded);
3508 }
3509
TEST_F(InitialSyncerTest,InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches)3510 TEST_F(InitialSyncerTest,
3511 InitialSyncerReturnsLastAppliedOnReachingStopTimestampAfterApplyingMultipleBatches) {
3512 auto initialSyncer = &getInitialSyncer();
3513 auto opCtx = makeOpCtx();
3514
3515 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3516 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3517
3518 // To make InitialSyncer apply multiple batches, we make the third and last operation a command
3519 // so that it will go into a separate batch from the second operation. First operation is the
3520 // last fetched entry before data cloning and is not applied.
3521 auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
3522
3523 auto net = getNet();
3524 int baseRollbackId = 1;
3525 {
3526 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3527
3528 // Base rollback ID.
3529 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3530 net->runReadyNetworkOperations();
3531
3532 // Last oplog entry.
3533 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3534
3535 // Feature Compatibility Version.
3536 processSuccessfulFCVFetcherResponse36();
3537
3538 // Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
3539 // database names, we'll simulate copying a single database with a single collection on the
3540 // sync source.
3541 NamespaceString nss("a.a");
3542 auto request =
3543 net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
3544 assertRemoteCommandNameEquals("listDatabases", request);
3545 net->runReadyNetworkOperations();
3546
3547 // OplogFetcher's oplog tailing query. Response has enough operations to reach
3548 // end timestamp.
3549 request = net->scheduleSuccessfulResponse(
3550 makeCursorResponse(1LL,
3551 _options.localOplogNS,
3552 {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
3553 assertRemoteCommandNameEquals("find", request);
3554 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3555 net->runReadyNetworkOperations();
3556
3557 // listCollections for "a"
3558 request = net->scheduleSuccessfulResponse(
3559 makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
3560 assertRemoteCommandNameEquals("listCollections", request);
3561
3562 // Black hole OplogFetcher's getMore request.
3563 auto noi = net->getNextReadyRequest();
3564 request = noi->getRequest();
3565 assertRemoteCommandNameEquals("getMore", request);
3566 net->blackHole(noi);
3567
3568 // count:a
3569 request = net->scheduleSuccessfulResponse(BSON("n" << 1 << "ok" << 1));
3570 assertRemoteCommandNameEquals("count", request);
3571 ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3572 ASSERT_EQUALS(nss.db(), request.dbname);
3573
3574 // listIndexes:a
3575 request = net->scheduleSuccessfulResponse(makeCursorResponse(
3576 0LL,
3577 NamespaceString(nss.getCommandNS()),
3578 {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
3579 << "_id_"
3580 << "ns"
3581 << nss.ns())}));
3582 assertRemoteCommandNameEquals("listIndexes", request);
3583 ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3584 ASSERT_EQUALS(nss.db(), request.dbname);
3585
3586 // find:a
3587 request = net->scheduleSuccessfulResponse(
3588 makeCursorResponse(0LL, nss, {BSON("_id" << 1 << "a" << 1)}));
3589 assertRemoteCommandNameEquals("find", request);
3590 ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3591 ASSERT_EQUALS(nss.db(), request.dbname);
3592
3593 // Second last oplog entry fetcher.
3594 processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
3595
3596 // Last rollback ID.
3597 request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3598 assertRemoteCommandNameEquals("replSetGetRBID", request);
3599 net->runReadyNetworkOperations();
3600
3601 // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
3602 // the completion status.
3603 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3604 // _oplogFetcherCallback().
3605 net->runReadyNetworkOperations();
3606 }
3607
3608 initialSyncer->join();
3609 ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
3610 ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
3611 }
3612
TEST_F(InitialSyncerTest,InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply)3613 TEST_F(
3614 InitialSyncerTest,
3615 InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) {
3616 auto initialSyncer = &getInitialSyncer();
3617 auto opCtx = makeOpCtx();
3618
3619 // Override DataReplicatorExternalState::_multiInitialSyncApply() so that it will also fetch a
3620 // missing document.
3621 // This forces InitialSyncer to evaluate its end timestamp for applying operations after each
3622 // batch.
3623 getExternalState()->multiApplyFn = [](OperationContext*,
3624 const MultiApplier::Operations& ops,
3625 MultiApplier::ApplyOperationFn applyOperation) {
3626 // 'OperationPtr*' is ignored by our overridden _multiInitialSyncApply().
3627 applyOperation(nullptr).transitional_ignore();
3628 return ops.back().getOpTime();
3629 };
3630 bool fetchCountIncremented = false;
3631 getExternalState()->multiInitialSyncApplyFn = [&fetchCountIncremented](
3632 MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32* fetchCount) {
3633 if (!fetchCountIncremented) {
3634 fetchCount->addAndFetch(1);
3635 fetchCountIncremented = true;
3636 }
3637 return Status::OK();
3638 };
3639
3640 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3641 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3642
3643 // Use command for third and last operation to ensure we have two batches to apply.
3644 auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
3645
3646 auto net = getNet();
3647 int baseRollbackId = 1;
3648 {
3649 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3650
3651 // Base rollback ID.
3652 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3653 net->runReadyNetworkOperations();
3654
3655 // Last oplog entry.
3656 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3657
3658 // Feature Compatibility Version.
3659 processSuccessfulFCVFetcherResponse36();
3660
3661 // Quickest path to a successful DatabasesCloner completion is to respond to the
3662 // listDatabases with an empty list of database names.
3663 assertRemoteCommandNameEquals(
3664 "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
3665 net->runReadyNetworkOperations();
3666
3667 // OplogFetcher's oplog tailing query. Response has enough operations to reach
3668 // end timestamp.
3669 auto request = net->scheduleSuccessfulResponse(
3670 makeCursorResponse(1LL,
3671 _options.localOplogNS,
3672 {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
3673 assertRemoteCommandNameEquals("find", request);
3674 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3675 net->runReadyNetworkOperations();
3676
3677 // Second last oplog entry fetcher.
3678 // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
3679 // applying the first batch.
3680 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
3681
3682 // Black hole OplogFetcher's getMore request.
3683 auto noi = net->getNextReadyRequest();
3684 request = noi->getRequest();
3685 assertRemoteCommandNameEquals("getMore", request);
3686 net->blackHole(noi);
3687
3688 // Third last oplog entry fetcher.
3689 processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
3690
3691 // Last rollback ID.
3692 request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3693 assertRemoteCommandNameEquals("replSetGetRBID", request);
3694 net->runReadyNetworkOperations();
3695
3696 // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
3697 // the completion status.
3698 // We call runReadyNetworkOperations() again to deliver the cancellation status to
3699 // _oplogFetcherCallback().
3700 net->runReadyNetworkOperations();
3701 }
3702
3703 initialSyncer->join();
3704 ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
3705 ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
3706
3707 ASSERT_TRUE(fetchCountIncremented);
3708
3709 auto progress = initialSyncer->getInitialSyncProgress();
3710 log() << "Progress after failed initial sync attempt: " << progress;
3711 ASSERT_EQUALS(1, progress.getIntField("fetchedMissingDocs")) << progress;
3712 }
3713
TEST_F(InitialSyncerTest,InitialSyncerReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled)3714 TEST_F(InitialSyncerTest,
3715 InitialSyncerReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) {
3716 auto initialSyncer = &getInitialSyncer();
3717 auto opCtx = makeOpCtx();
3718
3719 // This fail point makes chooseSyncSourceCallback fail with an InvalidSyncSource error.
3720 auto failPoint = getGlobalFailPointRegistry()->getFailPoint("failInitialSyncWithBadHost");
3721 failPoint->setMode(FailPoint::alwaysOn);
3722 ON_BLOCK_EXIT([failPoint]() { failPoint->setMode(FailPoint::off); });
3723
3724 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3725 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3726
3727 initialSyncer->join();
3728 ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, _lastApplied);
3729 }
3730
TEST_F(InitialSyncerTest,OplogOutOfOrderOnOplogFetchFinish)3731 TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
3732 auto initialSyncer = &getInitialSyncer();
3733 auto opCtx = makeOpCtx();
3734
3735 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
3736 ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
3737
3738 auto net = getNet();
3739 int baseRollbackId = 1;
3740 {
3741 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3742
3743 // Base rollback ID.
3744 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3745 net->runReadyNetworkOperations();
3746
3747 // Last oplog entry.
3748 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3749
3750 // Feature Compatibility Version.
3751 processSuccessfulFCVFetcherResponse36();
3752
3753 // Ignore listDatabases request.
3754 auto noi = net->getNextReadyRequest();
3755 auto request = noi->getRequest();
3756 assertRemoteCommandNameEquals("listDatabases", request);
3757 net->blackHole(noi);
3758
3759 // OplogFetcher's oplog tailing query.
3760 request = net->scheduleSuccessfulResponse(
3761 makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
3762 assertRemoteCommandNameEquals("find", request);
3763 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3764 net->runReadyNetworkOperations();
3765
3766 // Ensure that OplogFetcher fails with an OplogOutOfOrder error by responding to the getMore
3767 // request with oplog entries containing the following timestamps (most recently processed
3768 // oplog entry has a timestamp of 1):
3769 // (last=1), 5, 4
3770 request = net->scheduleSuccessfulResponse(makeCursorResponse(
3771 1LL, _options.localOplogNS, {makeOplogEntryObj(5), makeOplogEntryObj(4)}, false));
3772 assertRemoteCommandNameEquals("getMore", request);
3773 net->runReadyNetworkOperations();
3774
3775 // Deliver cancellation signal to DatabasesCloner.
3776 net->runReadyNetworkOperations();
3777 }
3778
3779 initialSyncer->join();
3780 ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, _lastApplied);
3781 }
3782
TEST_F(InitialSyncerTest,GetInitialSyncProgressReturnsCorrectProgress)3783 TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
3784 auto initialSyncer = &getInitialSyncer();
3785 auto opCtx = makeOpCtx();
3786
3787 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
3788 ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
3789
3790 auto net = getNet();
3791 int baseRollbackId = 1;
3792
3793 // Play first 2 responses to ensure initial syncer has started the oplog fetcher.
3794 {
3795 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3796
3797 // Base rollback ID.
3798 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3799 net->runReadyNetworkOperations();
3800
3801 // Last oplog entry.
3802 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3803
3804 // Feature Compatibility Version.
3805 processSuccessfulFCVFetcherResponse36();
3806 }
3807
3808 log() << "Done playing first failed response";
3809
3810 auto progress = initialSyncer->getInitialSyncProgress();
3811 log() << "Progress after first failed response: " << progress;
3812 ASSERT_EQUALS(progress.nFields(), 8) << progress;
3813 ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress;
3814 ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
3815 ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
3816 ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
3817 ASSERT_BSONOBJ_EQ(progress.getObjectField("initialSyncAttempts"), BSONObj());
3818 ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
3819 ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
3820 ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
3821
3822 // Play rest of the failed round of responses.
3823 {
3824 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3825
3826 auto request = net->scheduleErrorResponse(
3827 Status(ErrorCodes::FailedToParse, "fail on clone -- listDBs injected failure"));
3828 assertRemoteCommandNameEquals("listDatabases", request);
3829 net->runReadyNetworkOperations();
3830
3831 // Deliver cancellation to OplogFetcher
3832 net->runReadyNetworkOperations();
3833 }
3834
3835 log() << "Done playing failed responses";
3836
3837 // Play the first 2 responses of the successful round of responses to ensure that the
3838 // initial syncer starts the oplog fetcher.
3839 {
3840 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3841
3842 auto when = net->now() + _options.initialSyncRetryWait;
3843 ASSERT_EQUALS(when, net->runUntil(when));
3844
3845 // Base rollback ID.
3846 auto request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
3847 assertRemoteCommandNameEquals("replSetGetRBID", request);
3848 net->runReadyNetworkOperations();
3849
3850 // Last oplog entry.
3851 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
3852
3853 // Feature Compatibility Version.
3854 processSuccessfulFCVFetcherResponse36();
3855 }
3856
3857 log() << "Done playing first successful response";
3858
3859 progress = initialSyncer->getInitialSyncProgress();
3860 log() << "Progress after failure: " << progress;
3861 ASSERT_EQUALS(progress.nFields(), 8) << progress;
3862 ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
3863 ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
3864 ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
3865 ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
3866 ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
3867 ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
3868 ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
3869
3870 BSONObj attempts = progress["initialSyncAttempts"].Obj();
3871 ASSERT_EQUALS(attempts.nFields(), 1) << attempts;
3872 BSONObj attempt0 = attempts["0"].Obj();
3873 ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
3874 ASSERT_EQUALS(attempt0.getStringField("status"),
3875 std::string("FailedToParse: error cloning databases :: caused by :: fail on "
3876 "clone -- listDBs injected failure"))
3877 << attempt0;
3878 ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
3879 ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
3880 << attempt0;
3881
3882 // Play all but last of the successful round of responses.
3883 {
3884 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3885
3886 // listDatabases
3887 NamespaceString nss("a.a");
3888 auto request =
3889 net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
3890 assertRemoteCommandNameEquals("listDatabases", request);
3891 net->runReadyNetworkOperations();
3892
3893 // Ignore oplog tailing query.
3894 request = net->scheduleSuccessfulResponse(makeCursorResponse(1LL,
3895 _options.localOplogNS,
3896 {makeOplogEntryObj(1),
3897 makeOplogEntryObj(2),
3898 makeOplogEntryObj(3),
3899 makeOplogEntryObj(4),
3900 makeOplogEntryObj(5),
3901 makeOplogEntryObj(6),
3902 makeOplogEntryObj(7)}));
3903 assertRemoteCommandNameEquals("find", request);
3904 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
3905 net->runReadyNetworkOperations();
3906
3907 // listCollections for "a"
3908 request = net->scheduleSuccessfulResponse(
3909 makeCursorResponse(0LL, nss, {BSON("name" << nss.coll() << "options" << BSONObj())}));
3910 assertRemoteCommandNameEquals("listCollections", request);
3911
3912 auto noi = net->getNextReadyRequest();
3913 request = noi->getRequest();
3914 assertRemoteCommandNameEquals("getMore", request);
3915 net->blackHole(noi);
3916
3917 // count:a
3918 request = net->scheduleSuccessfulResponse(BSON("n" << 5 << "ok" << 1));
3919 assertRemoteCommandNameEquals("count", request);
3920 ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3921 ASSERT_EQUALS(nss.db(), request.dbname);
3922
3923 // listIndexes:a
3924 request = net->scheduleSuccessfulResponse(makeCursorResponse(
3925 0LL,
3926 NamespaceString(nss.getCommandNS()),
3927 {BSON("v" << OplogEntry::kOplogVersion << "key" << BSON("_id" << 1) << "name"
3928 << "_id_"
3929 << "ns"
3930 << nss.ns())}));
3931 assertRemoteCommandNameEquals("listIndexes", request);
3932 ASSERT_EQUALS(nss.coll(), request.cmdObj.firstElement().String());
3933 ASSERT_EQUALS(nss.db(), request.dbname);
3934
3935 // find:a - 5 batches
3936 for (int i = 1; i <= 5; ++i) {
3937 request = net->scheduleSuccessfulResponse(
3938 makeCursorResponse(i < 5 ? 2LL : 0LL, nss, {BSON("_id" << i << "a" << i)}, i == 1));
3939 ASSERT_EQUALS(i == 1 ? "find" : "getMore",
3940 request.cmdObj.firstElement().fieldNameStringData());
3941 net->runReadyNetworkOperations();
3942 }
3943
3944 // Second last oplog entry fetcher.
3945 // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
3946 // applying the first batch.
3947 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(7)});
3948 }
3949 log() << "Done playing all but last successful response";
3950
3951 progress = initialSyncer->getInitialSyncProgress();
3952 log() << "Progress after all but last successful response: " << progress;
3953 ASSERT_EQUALS(progress.nFields(), 9) << progress;
3954 ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
3955 ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
3956 ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
3957 ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
3958 ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
3959 ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
3960 // Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
3961 ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
3962 auto databasesProgress = progress.getObjectField("databases");
3963 ASSERT_EQUALS(1, databasesProgress.getIntField("databasesCloned")) << databasesProgress;
3964 auto dbProgress = databasesProgress.getObjectField("a");
3965 ASSERT_EQUALS(1, dbProgress.getIntField("collections")) << dbProgress;
3966 ASSERT_EQUALS(1, dbProgress.getIntField("clonedCollections")) << dbProgress;
3967 auto collectionProgress = dbProgress.getObjectField("a.a");
3968 ASSERT_EQUALS(
3969 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsToCopyFieldName))
3970 << collectionProgress;
3971 ASSERT_EQUALS(
3972 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsCopiedFieldName))
3973 << collectionProgress;
3974 ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress;
3975 ASSERT_EQUALS(5, collectionProgress.getIntField("fetchedBatches")) << collectionProgress;
3976
3977 attempts = progress["initialSyncAttempts"].Obj();
3978 ASSERT_EQUALS(attempts.nFields(), 1) << progress;
3979 attempt0 = attempts["0"].Obj();
3980 ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
3981 ASSERT_EQUALS(attempt0.getStringField("status"),
3982 std::string("FailedToParse: error cloning databases :: caused by :: fail on "
3983 "clone -- listDBs injected failure"))
3984 << attempt0;
3985 ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
3986 ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
3987 << attempt0;
3988
3989 // Play last successful response.
3990 {
3991 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
3992
3993 // Last rollback ID.
3994 assertRemoteCommandNameEquals(
3995 "replSetGetRBID",
3996 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)));
3997 net->runReadyNetworkOperations();
3998
3999 // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
4000 // the completion status.
4001 // We call runReadyNetworkOperations() again to deliver the cancellation status to
4002 // _oplogFetcherCallback().
4003 net->runReadyNetworkOperations();
4004 }
4005
4006 log() << "waiting for initial sync to verify it completed OK";
4007 initialSyncer->join();
4008 ASSERT_EQUALS(makeOplogEntry(7).getOpTime(), unittest::assertGet(_lastApplied).opTime);
4009
4010 progress = initialSyncer->getInitialSyncProgress();
4011 log() << "Progress at end: " << progress;
4012 ASSERT_EQUALS(progress.nFields(), 11) << progress;
4013 ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
4014 ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
4015 ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
4016 ASSERT_EQUALS(progress["initialSyncEnd"].type(), Date) << progress;
4017 ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
4018 ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
4019 ASSERT_EQUALS(progress["initialSyncElapsedMillis"].type(), NumberInt) << progress;
4020 ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
4021 // Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
4022 ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
4023
4024 attempts = progress["initialSyncAttempts"].Obj();
4025 ASSERT_EQUALS(attempts.nFields(), 2) << attempts;
4026
4027 attempt0 = attempts["0"].Obj();
4028 ASSERT_EQUALS(attempt0.nFields(), 3) << attempt0;
4029 ASSERT_EQUALS(attempt0.getStringField("status"),
4030 std::string("FailedToParse: error cloning databases :: caused by :: fail on "
4031 "clone -- listDBs injected failure"))
4032 << attempt0;
4033 ASSERT_EQUALS(attempt0["durationMillis"].type(), NumberInt) << attempt0;
4034 ASSERT_EQUALS(attempt0.getStringField("syncSource"), std::string("localhost:27017"))
4035 << attempt0;
4036
4037 BSONObj attempt1 = attempts["1"].Obj();
4038 ASSERT_EQUALS(attempt1.nFields(), 3) << attempt1;
4039 ASSERT_EQUALS(attempt1.getStringField("status"), std::string("OK")) << attempt1;
4040 ASSERT_EQUALS(attempt1["durationMillis"].type(), NumberInt) << attempt1;
4041 ASSERT_EQUALS(attempt1.getStringField("syncSource"), std::string("localhost:27017"))
4042 << attempt1;
4043 }
4044
TEST_F(InitialSyncerTest,GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExceedBsonLimit)4045 TEST_F(InitialSyncerTest, GetInitialSyncProgressOmitsClonerStatsIfClonerStatsExceedBsonLimit) {
4046 auto initialSyncer = &getInitialSyncer();
4047 auto opCtx = makeOpCtx();
4048
4049 _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 27017));
4050 ASSERT_OK(initialSyncer->startup(opCtx.get(), 2U));
4051
4052 const std::size_t numCollections = 200000U;
4053
4054 auto net = getNet();
4055 int baseRollbackId = 1;
4056 {
4057 executor::NetworkInterfaceMock::InNetworkGuard guard(net);
4058
4059 // Base rollback ID.
4060 net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
4061 net->runReadyNetworkOperations();
4062
4063 // Last oplog entry.
4064 processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
4065
4066 // Feature Compatibility Version.
4067 processSuccessfulFCVFetcherResponse36();
4068
4069 // listDatabases
4070 NamespaceString nss("a.a");
4071 auto request =
4072 net->scheduleSuccessfulResponse(makeListDatabasesResponse({nss.db().toString()}));
4073 assertRemoteCommandNameEquals("listDatabases", request);
4074 net->runReadyNetworkOperations();
4075
4076 // Ignore oplog tailing query.
4077 request = net->scheduleSuccessfulResponse(
4078 makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
4079 assertRemoteCommandNameEquals("find", request);
4080 ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
4081 net->runReadyNetworkOperations();
4082
4083 // listCollections for "a"
4084 std::vector<BSONObj> collectionInfos;
4085 for (std::size_t i = 0; i < numCollections; ++i) {
4086 const std::string collName = str::stream() << "coll-" << i;
4087 collectionInfos.push_back(BSON("name" << collName << "options" << BSONObj()));
4088 }
4089 request = net->scheduleSuccessfulResponse(
4090 makeCursorResponse(0LL, nss.getCommandNS(), collectionInfos));
4091 assertRemoteCommandNameEquals("listCollections", request);
4092 net->runReadyNetworkOperations();
4093 }
4094
4095 // This returns a valid document because we omit the cloner stats when they do not fit in a
4096 // BSON document.
4097 auto progress = initialSyncer->getInitialSyncProgress();
4098 ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
4099 ASSERT_FALSE(progress.hasField("databases")) << progress;
4100
4101 // Initial sync will attempt to log stats again at shutdown in a callback, where it should not
4102 // terminate because we now return a valid stats document.
4103 ASSERT_OK(initialSyncer->shutdown());
4104
4105 // Deliver cancellation signal to callbacks.
4106 executor::NetworkInterfaceMock::InNetworkGuard(net)->runReadyNetworkOperations();
4107
4108 initialSyncer->join();
4109 }
4110
TEST_F(InitialSyncerTest,InitialSyncerUpdatesCollectionUUIDsIfgetCollectionUUIDReturnsUUID)4111 TEST_F(InitialSyncerTest, InitialSyncerUpdatesCollectionUUIDsIfgetCollectionUUIDReturnsUUID) {
4112 // Ensure getCollectionUUID returns a UUID. This should trigger a call to
4113 // upgradeUUIDSchemaVersionNonReplicated.
4114 {
4115 LockGuard lock(_storageInterfaceWorkDoneMutex);
4116 _storageInterfaceWorkDone.uuid = UUID::gen();
4117 }
4118 doSuccessfulInitialSyncWithOneBatch();
4119
4120 // Ensure upgradeUUIDSchemaVersionNonReplicated was called.
4121 LockGuard lock(_storageInterfaceWorkDoneMutex);
4122 ASSERT_TRUE(_storageInterfaceWorkDone.schemaUpgraded);
4123 }
4124
TEST_F(InitialSyncerTest,InitialSyncerCapturesGetCollectionUUIDError)4125 TEST_F(InitialSyncerTest, InitialSyncerCapturesGetCollectionUUIDError) {
4126 // Ensure getCollectionUUID returns a bad status. This should be passed to the initial syncer.
4127 {
4128 LockGuard lock(_storageInterfaceWorkDoneMutex);
4129 _storageInterfaceWorkDone.getCollectionUUIDShouldFail = true;
4130 }
4131 doInitialSyncWithOneBatch();
4132
4133 // Ensure the getCollectionUUID status was captured.
4134 ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, _lastApplied);
4135
4136 // Ensure upgradeUUIDSchemaVersionNonReplicated was not called.
4137 LockGuard lock(_storageInterfaceWorkDoneMutex);
4138 ASSERT_FALSE(_storageInterfaceWorkDone.schemaUpgraded);
4139 }
4140
TEST_F(InitialSyncerTest,InitialSyncerCapturesUpgradeUUIDSchemaVersionError)4141 TEST_F(InitialSyncerTest, InitialSyncerCapturesUpgradeUUIDSchemaVersionError) {
4142 // Ensure getCollectionUUID returns a UUID. This should trigger a call to
4143 // upgradeUUIDSchemaVersionNonReplicated.
4144 {
4145 LockGuard lock(_storageInterfaceWorkDoneMutex);
4146 _storageInterfaceWorkDone.uuid = UUID::gen();
4147 }
4148
4149 // Ensure upgradeUUIDSchemaVersionNonReplicated returns a bad status. This should be passed to
4150 // the initial syncer.
4151 {
4152 LockGuard lock(_storageInterfaceWorkDoneMutex);
4153 _storageInterfaceWorkDone.upgradeUUIDSchemaVersionNonReplicatedShouldFail = true;
4154 }
4155 doInitialSyncWithOneBatch();
4156
4157 // Ensure the upgradeUUIDSchemaVersionNonReplicated status was captured.
4158 ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, _lastApplied);
4159 }
4160
4161 } // namespace
4162