1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/s/migration_source_manager.h"
36 
37 #include "mongo/bson/bsonobjbuilder.h"
38 #include "mongo/db/catalog/catalog_raii.h"
39 #include "mongo/db/concurrency/write_conflict_exception.h"
40 #include "mongo/db/operation_context.h"
41 #include "mongo/db/repl/replication_coordinator.h"
42 #include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
43 #include "mongo/db/s/migration_util.h"
44 #include "mongo/db/s/shard_metadata_util.h"
45 #include "mongo/db/s/sharding_state.h"
46 #include "mongo/db/s/sharding_state_recovery.h"
47 #include "mongo/db/s/sharding_statistics.h"
48 #include "mongo/executor/task_executor.h"
49 #include "mongo/executor/task_executor_pool.h"
50 #include "mongo/s/catalog/sharding_catalog_client.h"
51 #include "mongo/s/catalog/type_chunk.h"
52 #include "mongo/s/catalog/type_shard_collection.h"
53 #include "mongo/s/catalog_cache_loader.h"
54 #include "mongo/s/client/shard_registry.h"
55 #include "mongo/s/grid.h"
56 #include "mongo/s/request_types/commit_chunk_migration_request_type.h"
57 #include "mongo/s/set_shard_version_request.h"
58 #include "mongo/s/shard_key_pattern.h"
59 #include "mongo/s/stale_exception.h"
60 #include "mongo/stdx/memory.h"
61 #include "mongo/util/elapsed_tracker.h"
62 #include "mongo/util/exit.h"
63 #include "mongo/util/fail_point_service.h"
64 #include "mongo/util/log.h"
65 #include "mongo/util/scopeguard.h"
66 
67 namespace mongo {
68 
69 using namespace shardmetadatautil;
70 
71 namespace {
72 
73 // Wait at most this much time for the recipient to catch up sufficiently so critical section can be
74 // entered
75 const Hours kMaxWaitToEnterCriticalSectionTimeout(6);
76 const char kMigratedChunkVersionField[] = "migratedChunkVersion";
77 const char kControlChunkVersionField[] = "controlChunkVersion";
78 const char kWriteConcernField[] = "writeConcern";
79 const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
80                                                 WriteConcernOptions::SyncMode::UNSET,
81                                                 WriteConcernOptions::kWriteConcernTimeoutMigration);
82 
83 /**
84  * Best-effort attempt to ensure the recipient shard has refreshed its routing table to
85  * 'newCollVersion'. Fires and forgets an asychronous remote setShardVersion command.
86  */
refreshRecipientRoutingTable(OperationContext * opCtx,const NamespaceString & nss,ShardId toShard,const HostAndPort & toShardHost,const ChunkVersion & newCollVersion)87 void refreshRecipientRoutingTable(OperationContext* opCtx,
88                                   const NamespaceString& nss,
89                                   ShardId toShard,
90                                   const HostAndPort& toShardHost,
91                                   const ChunkVersion& newCollVersion) {
92     SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
93         Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString(),
94         toShard,
95         ConnectionString(toShardHost),
96         nss,
97         newCollVersion,
98         false);
99 
100     const executor::RemoteCommandRequest request(
101         toShardHost,
102         NamespaceString::kAdminDb.toString(),
103         ssv.toBSON(),
104         ReadPreferenceSetting{ReadPreference::PrimaryOnly}.toContainingBSON(),
105         opCtx,
106         executor::RemoteCommandRequest::kNoTimeout);
107 
108     executor::TaskExecutor* const executor =
109         Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
110     Status s =
111         executor
112             ->scheduleRemoteCommand(
113                 request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {})
114             .getStatus();
115     std::move(s).ignore();
116 }
117 
checkCollectionEpochMatches(const ScopedCollectionMetadata & metadata,OID expectedEpoch)118 Status checkCollectionEpochMatches(const ScopedCollectionMetadata& metadata, OID expectedEpoch) {
119     if (metadata && metadata->getCollVersion().epoch() == expectedEpoch)
120         return Status::OK();
121 
122     return {ErrorCodes::IncompatibleShardingMetadata,
123             str::stream() << "The collection was dropped or recreated since the migration began. "
124                           << "Expected collection epoch: "
125                           << expectedEpoch.toString()
126                           << ", but found: "
127                           << (metadata ? metadata->getCollVersion().epoch().toString()
128                                        : "unsharded collection.")};
129 }
130 
131 }  // namespace
132 
133 MONGO_FP_DECLARE(doNotRefreshRecipientAfterCommit);
134 MONGO_FP_DECLARE(failMigrationCommit);
135 MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection);
136 MONGO_FP_DECLARE(migrationCommitNetworkError);
137 
MigrationSourceManager(OperationContext * opCtx,MoveChunkRequest request,ConnectionString donorConnStr,HostAndPort recipientHost)138 MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
139                                                MoveChunkRequest request,
140                                                ConnectionString donorConnStr,
141                                                HostAndPort recipientHost)
142     : _args(std::move(request)),
143       _donorConnStr(std::move(donorConnStr)),
144       _recipientHost(std::move(recipientHost)),
145       _stats(ShardingStatistics::get(opCtx)) {
146     invariant(!opCtx->lockState()->isLocked());
147 
148     // Disallow moving a chunk to ourselves
149     uassert(ErrorCodes::InvalidOptions,
150             "Destination shard cannot be the same as source",
151             _args.getFromShardId() != _args.getToShardId());
152 
153     log() << "Starting chunk migration " << redact(_args.toString())
154           << " with expected collection version epoch " << _args.getVersionEpoch();
155 
156     // Force refresh of the metadata to ensure we have the latest
157     {
158         auto const shardingState = ShardingState::get(opCtx);
159 
160         ChunkVersion unusedShardVersion;
161         Status refreshStatus =
162             shardingState->refreshMetadataNow(opCtx, getNss(), &unusedShardVersion);
163         uassert(refreshStatus.code(),
164                 str::stream() << "cannot start migrate of chunk " << _args.toString() << " due to "
165                               << refreshStatus.reason(),
166                 refreshStatus.isOK());
167     }
168 
169     // Snapshot the committed metadata from the time the migration starts
170     const auto collectionMetadataAndUUID = [&] {
171         AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
172         uassert(ErrorCodes::InvalidOptions,
173                 "cannot move chunks for a collection that doesn't exist",
174                 autoColl.getCollection());
175 
176         boost::optional<UUID> collectionUUID;
177         if (autoColl.getCollection()->uuid()) {
178             collectionUUID = autoColl.getCollection()->uuid().value();
179         }
180 
181         auto metadata = CollectionShardingState::get(opCtx, getNss())->getMetadata();
182         uassert(ErrorCodes::IncompatibleShardingMetadata,
183                 str::stream() << "cannot move chunks for an unsharded collection",
184                 metadata);
185 
186         return std::make_tuple(std::move(metadata), std::move(collectionUUID));
187     }();
188 
189     const auto& collectionMetadata = std::get<0>(collectionMetadataAndUUID);
190 
191     const auto collectionVersion = collectionMetadata->getCollVersion();
192     const auto shardVersion = collectionMetadata->getShardVersion();
193 
194     // If the shard major version is zero, this means we do not have any chunks locally to migrate
195     uassert(ErrorCodes::IncompatibleShardingMetadata,
196             str::stream() << "cannot move chunk " << _args.toString()
197                           << " because the shard doesn't contain any chunks",
198             shardVersion.majorVersion() > 0);
199 
200     uassert(ErrorCodes::StaleEpoch,
201             str::stream() << "cannot move chunk " << _args.toString()
202                           << " because collection may have been dropped. "
203                           << "current epoch: "
204                           << collectionVersion.epoch()
205                           << ", cmd epoch: "
206                           << _args.getVersionEpoch(),
207             _args.getVersionEpoch() == collectionVersion.epoch());
208 
209     ChunkType chunkToMove;
210     chunkToMove.setMin(_args.getMinKey());
211     chunkToMove.setMax(_args.getMaxKey());
212 
213     Status chunkValidateStatus = collectionMetadata->checkChunkIsValid(chunkToMove);
214     uassert(chunkValidateStatus.code(),
215             str::stream() << "Unable to move chunk with arguments '" << redact(_args.toString())
216                           << "' due to error "
217                           << redact(chunkValidateStatus.reason()),
218             chunkValidateStatus.isOK());
219 
220     _collectionEpoch = collectionVersion.epoch();
221     _collectionUuid = std::get<1>(collectionMetadataAndUUID);
222 }
223 
~MigrationSourceManager()224 MigrationSourceManager::~MigrationSourceManager() {
225     invariant(!_cloneDriver);
226     _stats.totalDonorMoveChunkTimeMillis.addAndFetch(_entireOpTimer.millis());
227 }
228 
getNss() const229 NamespaceString MigrationSourceManager::getNss() const {
230     return _args.getNss();
231 }
232 
startClone(OperationContext * opCtx)233 Status MigrationSourceManager::startClone(OperationContext* opCtx) {
234     invariant(!opCtx->lockState()->isLocked());
235     invariant(_state == kCreated);
236     auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
237     _stats.countDonorMoveChunkStarted.addAndFetch(1);
238 
239     Grid::get(opCtx)
240         ->catalogClient()
241         ->logChange(opCtx,
242                     "moveChunk.start",
243                     getNss().ns(),
244                     BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
245                                << _args.getFromShardId()
246                                << "to"
247                                << _args.getToShardId()),
248                     ShardingCatalogClient::kMajorityWriteConcern)
249         .ignore();
250 
251     _cloneAndCommitTimer.reset();
252 
253     {
254         // Register for notifications from the replication subsystem
255         AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
256         auto css = CollectionShardingState::get(opCtx, getNss().ns());
257 
258         const auto metadata = css->getMetadata();
259         Status status = checkCollectionEpochMatches(metadata, _collectionEpoch);
260         if (!status.isOK())
261             return status;
262 
263         // Having the metadata manager registered on the collection sharding state is what indicates
264         // that a chunk on that collection is being migrated. With an active migration, write
265         // operations require the cloner to be present in order to track changes to the chunk which
266         // needs to be transmitted to the recipient.
267         _cloneDriver = stdx::make_unique<MigrationChunkClonerSourceLegacy>(
268             _args, metadata->getKeyPattern(), _donorConnStr, _recipientHost);
269 
270         css->setMigrationSourceManager(opCtx, this);
271     }
272 
273     Status startCloneStatus = _cloneDriver->startClone(opCtx);
274     if (!startCloneStatus.isOK()) {
275         return startCloneStatus;
276     }
277 
278     _state = kCloning;
279     scopedGuard.Dismiss();
280     return Status::OK();
281 }
282 
awaitToCatchUp(OperationContext * opCtx)283 Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) {
284     invariant(!opCtx->lockState()->isLocked());
285     invariant(_state == kCloning);
286     auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
287     _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
288     _cloneAndCommitTimer.reset();
289 
290     // Block until the cloner deems it appropriate to enter the critical section.
291     Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate(
292         opCtx, kMaxWaitToEnterCriticalSectionTimeout);
293     if (!catchUpStatus.isOK()) {
294         return catchUpStatus;
295     }
296 
297     _state = kCloneCaughtUp;
298     scopedGuard.Dismiss();
299     return Status::OK();
300 }
301 
enterCriticalSection(OperationContext * opCtx)302 Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) {
303     invariant(!opCtx->lockState()->isLocked());
304     invariant(_state == kCloneCaughtUp);
305     auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
306     _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
307     _cloneAndCommitTimer.reset();
308 
309     {
310         const auto metadata = [&] {
311             AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
312             return CollectionShardingState::get(opCtx, _args.getNss())->getMetadata();
313         }();
314 
315         Status status = checkCollectionEpochMatches(metadata, _collectionEpoch);
316         if (!status.isOK())
317             return status;
318 
319         _notifyChangeStreamsOnRecipientFirstChunk(opCtx, metadata);
320     }
321 
322     // Mark the shard as running critical operation, which requires recovery on crash.
323     //
324     // NOTE: The 'migrateChunkToNewShard' oplog message written by the above call to
325     // '_notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its local
326     // write to majority committed.
327     Status status = ShardingStateRecovery::startMetadataOp(opCtx);
328     if (!status.isOK()) {
329         return status;
330     }
331 
332     {
333         // The critical section must be entered with collection X lock in order to ensure there are
334         // no writes which could have entered and passed the version check just before we entered
335         // the crticial section, but managed to complete after we left it.
336         AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
337 
338         // IMPORTANT: After this line, the critical section is in place and needs to be signaled
339         _critSecSignal = std::make_shared<Notification<void>>();
340     }
341 
342     _state = kCriticalSection;
343 
344     // Persist a signal to secondaries that we've entered the critical section. This is will cause
345     // secondaries to refresh their routing table when next accessed, which will block behind the
346     // critical section. This ensures causal consistency by preventing a stale mongos with a cluster
347     // time inclusive of the migration config commit update from accessing secondary data.
348     // Note: this write must occur after the critSec flag is set, to ensure the secondary refresh
349     // will stall behind the flag.
350     Status signalStatus =
351         updateShardCollectionsEntry(opCtx,
352                                     BSON(ShardCollectionType::ns() << getNss().ns()),
353                                     BSONObj(),
354                                     BSON(ShardCollectionType::enterCriticalSectionCounter() << 1),
355                                     false /*upsert*/);
356     if (!signalStatus.isOK()) {
357         return {
358             ErrorCodes::OperationFailed,
359             str::stream() << "Failed to persist critical section signal for secondaries due to: "
360                           << signalStatus.toString()};
361     }
362 
363     log() << "Migration successfully entered critical section";
364 
365     scopedGuard.Dismiss();
366     return Status::OK();
367 }
368 
commitChunkOnRecipient(OperationContext * opCtx)369 Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) {
370     invariant(!opCtx->lockState()->isLocked());
371     invariant(_state == kCriticalSection);
372     auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
373 
374     // Tell the recipient shard to fetch the latest changes.
375     auto commitCloneStatus = _cloneDriver->commitClone(opCtx);
376 
377     if (MONGO_FAIL_POINT(failMigrationCommit) && commitCloneStatus.isOK()) {
378         commitCloneStatus = {ErrorCodes::InternalError,
379                              "Failing _recvChunkCommit due to failpoint."};
380     }
381 
382     if (!commitCloneStatus.isOK()) {
383         return commitCloneStatus.getStatus().withContext("commit clone failed");
384     }
385 
386     _recipientCloneCounts = commitCloneStatus.getValue()["counts"].Obj().getOwned();
387 
388     _state = kCloneCompleted;
389     scopedGuard.Dismiss();
390     return Status::OK();
391 }
392 
commitChunkMetadataOnConfig(OperationContext * opCtx)393 Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opCtx) {
394     invariant(!opCtx->lockState()->isLocked());
395     invariant(_state == kCloneCompleted);
396     auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
397 
398     // If we have chunks left on the FROM shard, bump the version of one of them as well. This will
399     // change the local collection major version, which indicates to other processes that the chunk
400     // metadata has changed and they should refresh.
401     BSONObjBuilder builder;
402 
403     {
404         const auto metadata = [&] {
405             AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
406             return CollectionShardingState::get(opCtx, _args.getNss())->getMetadata();
407         }();
408 
409         Status status = checkCollectionEpochMatches(metadata, _collectionEpoch);
410         if (!status.isOK())
411             return status;
412 
413         boost::optional<ChunkType> controlChunkType = boost::none;
414         if (metadata->getNumChunks() > 1) {
415             ChunkType differentChunk;
416             invariant(metadata->getDifferentChunk(_args.getMinKey(), &differentChunk));
417             invariant(differentChunk.getMin().woCompare(_args.getMinKey()) != 0);
418             controlChunkType = std::move(differentChunk);
419         } else {
420             log() << "Moving last chunk for the collection out";
421         }
422 
423         ChunkType migratedChunkType;
424         migratedChunkType.setMin(_args.getMinKey());
425         migratedChunkType.setMax(_args.getMaxKey());
426 
427         CommitChunkMigrationRequest::appendAsCommand(&builder,
428                                                      getNss(),
429                                                      _args.getFromShardId(),
430                                                      _args.getToShardId(),
431                                                      migratedChunkType,
432                                                      controlChunkType,
433                                                      metadata->getCollVersion());
434 
435         builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON());
436     }
437 
438     // Read operations must begin to wait on the critical section just before we send the commit
439     // operation to the config server
440     {
441         AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
442         _readsShouldWaitOnCritSec = true;
443     }
444 
445     Timer t;
446 
447     auto commitChunkMigrationResponse =
448         Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
449             opCtx,
450             ReadPreferenceSetting{ReadPreference::PrimaryOnly},
451             "admin",
452             builder.obj(),
453             Shard::RetryPolicy::kIdempotent);
454 
455     if (MONGO_FAIL_POINT(migrationCommitNetworkError)) {
456         commitChunkMigrationResponse = Status(
457             ErrorCodes::InternalError, "Failpoint 'migrationCommitNetworkError' generated error");
458     }
459 
460     Status migrationCommitStatus = commitChunkMigrationResponse.getStatus();
461     if (migrationCommitStatus.isOK()) {
462         migrationCommitStatus = commitChunkMigrationResponse.getValue().commandStatus;
463         if (migrationCommitStatus.isOK()) {
464             migrationCommitStatus = commitChunkMigrationResponse.getValue().writeConcernStatus;
465         }
466     }
467 
468     if (!migrationCommitStatus.isOK()) {
469         // Need to get the latest optime in case the refresh request goes to a secondary --
470         // otherwise the read won't wait for the write that _configsvrCommitChunkMigration may have
471         // done
472         log() << "Error occurred while committing the migration. Performing a majority write "
473                  "against the config server to obtain its latest optime"
474               << causedBy(redact(migrationCommitStatus));
475 
476         Status status = Grid::get(opCtx)->catalogClient()->logChange(
477             opCtx,
478             "moveChunk.validating",
479             getNss().ns(),
480             BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
481                        << _args.getFromShardId()
482                        << "to"
483                        << _args.getToShardId()),
484             ShardingCatalogClient::kMajorityWriteConcern);
485 
486         if ((ErrorCodes::isInterruption(status.code()) ||
487              ErrorCodes::isShutdownError(status.code()) ||
488              status == ErrorCodes::CallbackCanceled) &&
489             globalInShutdownDeprecated()) {
490             // Since the server is already doing a clean shutdown, this call will just join the
491             // previous shutdown call
492             shutdown(waitForShutdown());
493         }
494 
495         // If we failed to get the latest config optime because we stepped down as primary, then it
496         // is safe to fail without crashing because the new primary will fetch the latest optime
497         // when it recovers the sharding state recovery document, as long as we also clear the
498         // metadata for this collection, forcing subsequent callers to do a full refresh. Check if
499         // this node can accept writes for this collection as a proxy for it being primary.
500         if (!status.isOK()) {
501             AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
502             if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, getNss())) {
503                 CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr);
504                 uassertStatusOK(status.withContext(
505                     str::stream() << "Unable to verify migration commit for chunk: "
506                                   << redact(_args.toString())
507                                   << " because the node's replication role changed. Metadata "
508                                      "was cleared for: "
509                                   << getNss().ns()
510                                   << ", so it will get a full refresh when accessed again."));
511             }
512         }
513 
514         fassertStatusOK(
515             40137,
516             {status.code(),
517              str::stream() << "Failed to commit migration for chunk " << _args.toString()
518                            << " due to "
519                            << redact(migrationCommitStatus)
520                            << ". Updating the optime with a write before refreshing the "
521                            << "metadata also failed with "
522                            << redact(status)});
523     }
524 
525     // Because the CatalogCache's WithRefresh methods (on which forceShardFilteringMetadataRefresh
526     // depends) are not causally consistent, we need to perform up to two refresh rounds if refresh
527     // returns that the shard still owns the chunk
528     ChunkVersion collectionVersionAfterRefresh;
529 
530     for (int retriesLeft = 1;; --retriesLeft) {
531         ChunkVersion unusedShardVersion;
532         Status refreshStatus =
533             ShardingState::get(opCtx)->refreshMetadataNow(opCtx, getNss(), &unusedShardVersion);
534 
535         // If the refresh fails, there is no way to confirm whether the migration commit actually
536         // went through or not. Because of that, the collection's metadata is reset to UNSHARDED so
537         // that subsequent versioned requests will get StaleShardVersion and will retry the refresh.
538         if (!refreshStatus.isOK()) {
539             AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
540 
541             CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr);
542 
543             log() << "Failed to refresh metadata after a "
544                   << (migrationCommitStatus.isOK() ? "failed commit attempt" : "successful commit")
545                   << ". Metadata was cleared so it will get a full refresh when accessed again."
546                   << causedBy(redact(refreshStatus));
547 
548             return {
549                 migrationCommitStatus.code(),
550                 str::stream() << "Orphaned range not cleaned up. Failed to refresh metadata after"
551                                  " migration commit due to '"
552                               << refreshStatus.toString()
553                               << "', and commit failed due to '"
554                               << migrationCommitStatus.toString()
555                               << "'"};
556         }
557 
558         auto refreshedMetadata = [&] {
559             AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
560             return CollectionShardingState::get(opCtx, getNss())->getMetadata();
561         }();
562 
563         if (!refreshedMetadata) {
564             return {ErrorCodes::NamespaceNotSharded,
565                     str::stream() << "Chunk move failed because collection '" << getNss().ns()
566                                   << "' is no longer sharded. The migration commit error was: "
567                                   << migrationCommitStatus.toString()};
568         }
569 
570         // If after a successful refresh the metadata indicates that the node still owns the chunk,
571         // we must do one more refresh in order to ensure that the previous refresh round didn't
572         // join an already active catalog cache refresh and missed its own commit
573         if (!refreshedMetadata->keyBelongsToMe(_args.getMinKey())) {
574             collectionVersionAfterRefresh = refreshedMetadata->getCollVersion();
575             break;
576         }
577 
578         if (retriesLeft)
579             continue;
580 
581         // This condition may only happen if the migration commit has failed for any reason
582         if (migrationCommitStatus.isOK()) {
583             severe() << "The migration commit succeeded, but the new chunk placement was not "
584                         "reflected after metadata refresh, which is an indication of an "
585                         "afterOpTime bug.";
586             severe() << "The current config server opTime is " << Grid::get(opCtx)->configOpTime();
587             severe() << "The commit response came from "
588                      << redact(commitChunkMigrationResponse.getValue().hostAndPort->toString())
589                      << " and contained:";
590             severe() << "  metadata: "
591                      << redact(commitChunkMigrationResponse.getValue().metadata.toString());
592             severe() << "  response: "
593                      << redact(commitChunkMigrationResponse.getValue().response.toString());
594 
595             fassertFailed(50878);
596         }
597 
598         // The chunk modification was not applied, so report the original error
599         return {migrationCommitStatus.code(),
600                 str::stream() << "Chunk move was not successful due to "
601                               << migrationCommitStatus.reason()};
602     }
603 
604     invariant(collectionVersionAfterRefresh.isSet());
605 
606     // Migration succeeded
607     log() << "Migration succeeded and updated collection version to "
608           << collectionVersionAfterRefresh;
609 
610     MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection);
611 
612     scopedGuard.Dismiss();
613 
614     _stats.totalCriticalSectionCommitTimeMillis.addAndFetch(t.millis());
615 
616     // Exit the critical section and ensure that all the necessary state is fully persisted before
617     // scheduling orphan cleanup.
618     _cleanup(opCtx);
619 
620     Grid::get(opCtx)
621         ->catalogClient()
622         ->logChange(opCtx,
623                     "moveChunk.commit",
624                     getNss().ns(),
625                     BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
626                                << _args.getFromShardId()
627                                << "to"
628                                << _args.getToShardId()
629                                << "counts"
630                                << _recipientCloneCounts),
631                     ShardingCatalogClient::kMajorityWriteConcern)
632         .ignore();
633 
634     const ChunkRange range(_args.getMinKey(), _args.getMaxKey());
635 
636     auto notification = [&] {
637         auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingState::kNow
638                                                           : CollectionShardingState::kDelayed;
639         AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
640         return CollectionShardingState::get(opCtx, getNss())
641             ->cleanUpRange(opCtx, autoColl.getCollection(), range, whenToClean);
642     }();
643 
644     if (!MONGO_FAIL_POINT(doNotRefreshRecipientAfterCommit)) {
645         // Best-effort make the recipient refresh its routing table to the new collection version.
646         refreshRecipientRoutingTable(
647             opCtx, getNss(), _args.getToShardId(), _recipientHost, collectionVersionAfterRefresh);
648     }
649 
650     if (_args.getWaitForDelete()) {
651         log() << "Waiting for cleanup of " << getNss().ns() << " range "
652               << redact(range.toString());
653         return notification.waitStatus(opCtx);
654     }
655 
656     if (notification.ready() && !notification.waitStatus(opCtx).isOK()) {
657         warning() << "Failed to initiate cleanup of " << getNss().ns() << " range "
658                   << redact(range.toString())
659                   << " due to: " << redact(notification.waitStatus(opCtx));
660     } else {
661         log() << "Leaving cleanup of " << getNss().ns() << " range " << redact(range.toString())
662               << " to complete in background";
663         notification.abandon();
664     }
665 
666     return Status::OK();
667 }
668 
cleanupOnError(OperationContext * opCtx)669 void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) {
670     if (_state == kDone) {
671         return;
672     }
673 
674     Grid::get(opCtx)
675         ->catalogClient()
676         ->logChange(opCtx,
677                     "moveChunk.error",
678                     getNss().ns(),
679                     BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
680                                << _args.getFromShardId()
681                                << "to"
682                                << _args.getToShardId()),
683                     ShardingCatalogClient::kMajorityWriteConcern)
684         .ignore();
685 
686     try {
687         _cleanup(opCtx);
688     } catch (const ExceptionForCat<ErrorCategory::NotMasterError>& ex) {
689         warning() << "Failed to clean up migration: " << redact(_args.toString())
690                   << "due to: " << redact(ex);
691     }
692 }
693 
_notifyChangeStreamsOnRecipientFirstChunk(OperationContext * opCtx,const ScopedCollectionMetadata & metadata)694 void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
695     OperationContext* opCtx, const ScopedCollectionMetadata& metadata) {
696     // Change streams are only supported in 3.6 and above
697     if (serverGlobalParams.featureCompatibility.getVersion() !=
698         ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo36)
699         return;
700 
701     // If this is not the first donation, there is nothing to be done
702     if (metadata->getChunkManager()->getVersion(_args.getToShardId()).isSet())
703         return;
704 
705     const std::string dbgMessage = str::stream()
706         << "Migrating chunk from shard " << _args.getFromShardId() << " to shard "
707         << _args.getToShardId() << " with no chunks for this collection";
708 
709     // The message expected by change streams
710     const auto o2Message = BSON("type"
711                                 << "migrateChunkToNewShard"
712                                 << "from"
713                                 << _args.getFromShardId()
714                                 << "to"
715                                 << _args.getToShardId());
716 
717     auto const serviceContext = opCtx->getClient()->getServiceContext();
718 
719     AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
720     writeConflictRetry(
721         opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
722             WriteUnitOfWork uow(opCtx);
723             serviceContext->getOpObserver()->onInternalOpMessage(
724                 opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message);
725             uow.commit();
726         });
727 }
728 
_cleanup(OperationContext * opCtx)729 void MigrationSourceManager::_cleanup(OperationContext* opCtx) {
730     invariant(_state != kDone);
731 
732     auto cloneDriver = [&]() {
733         // Unregister from the collection's sharding state
734         AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
735 
736         auto css = CollectionShardingState::get(opCtx, getNss().ns());
737 
738         // The migration source manager is not visible anymore after it is unregistered from the
739         // collection
740         css->clearMigrationSourceManager(opCtx);
741 
742         // Leave the critical section.
743         if (_critSecSignal) {
744             _critSecSignal->set();
745         }
746 
747         return std::move(_cloneDriver);
748     }();
749 
750     // The cleanup operations below are potentially blocking or acquire other locks, so perform them
751     // outside of the collection X lock
752 
753     if (cloneDriver) {
754         cloneDriver->cancelClone(opCtx);
755     }
756 
757     if (_state == kCriticalSection || _state == kCloneCompleted) {
758         _stats.totalCriticalSectionTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
759 
760         // NOTE: The order of the operations below is important and the comments explain the
761         // reasoning behind it
762 
763         // Wait for the updates to the cache of the routing table to be fully written to disk before
764         // clearing the 'minOpTime recovery' document. This way, we ensure that all nodes from a
765         // shard, which donated a chunk will always be at the shard version of the last migration it
766         // performed.
767         //
768         // If the metadata is not persisted before clearing the 'inMigration' flag below, it is
769         // possible that the persisted metadata is rolled back after step down, but the write which
770         // cleared the 'inMigration' flag is not, a secondary node will report itself at an older
771         // shard version.
772         CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, getNss());
773 
774         // Clear the 'minOpTime recovery' document so that the next time a node from this shard
775         // becomes a primary, it won't have to recover the config server optime.
776         ShardingStateRecovery::endMetadataOp(opCtx);
777     }
778 
779     _state = kDone;
780 }
781 
getMigrationCriticalSectionSignal(bool isForReadOnlyOperation) const782 std::shared_ptr<Notification<void>> MigrationSourceManager::getMigrationCriticalSectionSignal(
783     bool isForReadOnlyOperation) const {
784     if (!isForReadOnlyOperation) {
785         return _critSecSignal;
786     }
787 
788     if (_readsShouldWaitOnCritSec) {
789         return _critSecSignal;
790     }
791 
792     return nullptr;
793 }
794 
getMigrationStatusReport() const795 BSONObj MigrationSourceManager::getMigrationStatusReport() const {
796     return migrationutil::makeMigrationStatusDocument(getNss(),
797                                                       _args.getFromShardId(),
798                                                       _args.getToShardId(),
799                                                       true,
800                                                       _args.getMinKey(),
801                                                       _args.getMaxKey());
802 }
803 
804 }  // namespace mongo
805