1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/s/migration_source_manager.h"
36
37 #include "mongo/bson/bsonobjbuilder.h"
38 #include "mongo/db/catalog/catalog_raii.h"
39 #include "mongo/db/concurrency/write_conflict_exception.h"
40 #include "mongo/db/operation_context.h"
41 #include "mongo/db/repl/replication_coordinator.h"
42 #include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
43 #include "mongo/db/s/migration_util.h"
44 #include "mongo/db/s/shard_metadata_util.h"
45 #include "mongo/db/s/sharding_state.h"
46 #include "mongo/db/s/sharding_state_recovery.h"
47 #include "mongo/db/s/sharding_statistics.h"
48 #include "mongo/executor/task_executor.h"
49 #include "mongo/executor/task_executor_pool.h"
50 #include "mongo/s/catalog/sharding_catalog_client.h"
51 #include "mongo/s/catalog/type_chunk.h"
52 #include "mongo/s/catalog/type_shard_collection.h"
53 #include "mongo/s/catalog_cache_loader.h"
54 #include "mongo/s/client/shard_registry.h"
55 #include "mongo/s/grid.h"
56 #include "mongo/s/request_types/commit_chunk_migration_request_type.h"
57 #include "mongo/s/set_shard_version_request.h"
58 #include "mongo/s/shard_key_pattern.h"
59 #include "mongo/s/stale_exception.h"
60 #include "mongo/stdx/memory.h"
61 #include "mongo/util/elapsed_tracker.h"
62 #include "mongo/util/exit.h"
63 #include "mongo/util/fail_point_service.h"
64 #include "mongo/util/log.h"
65 #include "mongo/util/scopeguard.h"
66
67 namespace mongo {
68
69 using namespace shardmetadatautil;
70
71 namespace {
72
73 // Wait at most this much time for the recipient to catch up sufficiently so critical section can be
74 // entered
75 const Hours kMaxWaitToEnterCriticalSectionTimeout(6);
76 const char kMigratedChunkVersionField[] = "migratedChunkVersion";
77 const char kControlChunkVersionField[] = "controlChunkVersion";
78 const char kWriteConcernField[] = "writeConcern";
79 const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
80 WriteConcernOptions::SyncMode::UNSET,
81 WriteConcernOptions::kWriteConcernTimeoutMigration);
82
83 /**
84 * Best-effort attempt to ensure the recipient shard has refreshed its routing table to
85 * 'newCollVersion'. Fires and forgets an asychronous remote setShardVersion command.
86 */
refreshRecipientRoutingTable(OperationContext * opCtx,const NamespaceString & nss,ShardId toShard,const HostAndPort & toShardHost,const ChunkVersion & newCollVersion)87 void refreshRecipientRoutingTable(OperationContext* opCtx,
88 const NamespaceString& nss,
89 ShardId toShard,
90 const HostAndPort& toShardHost,
91 const ChunkVersion& newCollVersion) {
92 SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
93 Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString(),
94 toShard,
95 ConnectionString(toShardHost),
96 nss,
97 newCollVersion,
98 false);
99
100 const executor::RemoteCommandRequest request(
101 toShardHost,
102 NamespaceString::kAdminDb.toString(),
103 ssv.toBSON(),
104 ReadPreferenceSetting{ReadPreference::PrimaryOnly}.toContainingBSON(),
105 opCtx,
106 executor::RemoteCommandRequest::kNoTimeout);
107
108 executor::TaskExecutor* const executor =
109 Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
110 Status s =
111 executor
112 ->scheduleRemoteCommand(
113 request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {})
114 .getStatus();
115 std::move(s).ignore();
116 }
117
checkCollectionEpochMatches(const ScopedCollectionMetadata & metadata,OID expectedEpoch)118 Status checkCollectionEpochMatches(const ScopedCollectionMetadata& metadata, OID expectedEpoch) {
119 if (metadata && metadata->getCollVersion().epoch() == expectedEpoch)
120 return Status::OK();
121
122 return {ErrorCodes::IncompatibleShardingMetadata,
123 str::stream() << "The collection was dropped or recreated since the migration began. "
124 << "Expected collection epoch: "
125 << expectedEpoch.toString()
126 << ", but found: "
127 << (metadata ? metadata->getCollVersion().epoch().toString()
128 : "unsharded collection.")};
129 }
130
131 } // namespace
132
133 MONGO_FP_DECLARE(doNotRefreshRecipientAfterCommit);
134 MONGO_FP_DECLARE(failMigrationCommit);
135 MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection);
136 MONGO_FP_DECLARE(migrationCommitNetworkError);
137
MigrationSourceManager(OperationContext * opCtx,MoveChunkRequest request,ConnectionString donorConnStr,HostAndPort recipientHost)138 MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
139 MoveChunkRequest request,
140 ConnectionString donorConnStr,
141 HostAndPort recipientHost)
142 : _args(std::move(request)),
143 _donorConnStr(std::move(donorConnStr)),
144 _recipientHost(std::move(recipientHost)),
145 _stats(ShardingStatistics::get(opCtx)) {
146 invariant(!opCtx->lockState()->isLocked());
147
148 // Disallow moving a chunk to ourselves
149 uassert(ErrorCodes::InvalidOptions,
150 "Destination shard cannot be the same as source",
151 _args.getFromShardId() != _args.getToShardId());
152
153 log() << "Starting chunk migration " << redact(_args.toString())
154 << " with expected collection version epoch " << _args.getVersionEpoch();
155
156 // Force refresh of the metadata to ensure we have the latest
157 {
158 auto const shardingState = ShardingState::get(opCtx);
159
160 ChunkVersion unusedShardVersion;
161 Status refreshStatus =
162 shardingState->refreshMetadataNow(opCtx, getNss(), &unusedShardVersion);
163 uassert(refreshStatus.code(),
164 str::stream() << "cannot start migrate of chunk " << _args.toString() << " due to "
165 << refreshStatus.reason(),
166 refreshStatus.isOK());
167 }
168
169 // Snapshot the committed metadata from the time the migration starts
170 const auto collectionMetadataAndUUID = [&] {
171 AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
172 uassert(ErrorCodes::InvalidOptions,
173 "cannot move chunks for a collection that doesn't exist",
174 autoColl.getCollection());
175
176 boost::optional<UUID> collectionUUID;
177 if (autoColl.getCollection()->uuid()) {
178 collectionUUID = autoColl.getCollection()->uuid().value();
179 }
180
181 auto metadata = CollectionShardingState::get(opCtx, getNss())->getMetadata();
182 uassert(ErrorCodes::IncompatibleShardingMetadata,
183 str::stream() << "cannot move chunks for an unsharded collection",
184 metadata);
185
186 return std::make_tuple(std::move(metadata), std::move(collectionUUID));
187 }();
188
189 const auto& collectionMetadata = std::get<0>(collectionMetadataAndUUID);
190
191 const auto collectionVersion = collectionMetadata->getCollVersion();
192 const auto shardVersion = collectionMetadata->getShardVersion();
193
194 // If the shard major version is zero, this means we do not have any chunks locally to migrate
195 uassert(ErrorCodes::IncompatibleShardingMetadata,
196 str::stream() << "cannot move chunk " << _args.toString()
197 << " because the shard doesn't contain any chunks",
198 shardVersion.majorVersion() > 0);
199
200 uassert(ErrorCodes::StaleEpoch,
201 str::stream() << "cannot move chunk " << _args.toString()
202 << " because collection may have been dropped. "
203 << "current epoch: "
204 << collectionVersion.epoch()
205 << ", cmd epoch: "
206 << _args.getVersionEpoch(),
207 _args.getVersionEpoch() == collectionVersion.epoch());
208
209 ChunkType chunkToMove;
210 chunkToMove.setMin(_args.getMinKey());
211 chunkToMove.setMax(_args.getMaxKey());
212
213 Status chunkValidateStatus = collectionMetadata->checkChunkIsValid(chunkToMove);
214 uassert(chunkValidateStatus.code(),
215 str::stream() << "Unable to move chunk with arguments '" << redact(_args.toString())
216 << "' due to error "
217 << redact(chunkValidateStatus.reason()),
218 chunkValidateStatus.isOK());
219
220 _collectionEpoch = collectionVersion.epoch();
221 _collectionUuid = std::get<1>(collectionMetadataAndUUID);
222 }
223
~MigrationSourceManager()224 MigrationSourceManager::~MigrationSourceManager() {
225 invariant(!_cloneDriver);
226 _stats.totalDonorMoveChunkTimeMillis.addAndFetch(_entireOpTimer.millis());
227 }
228
getNss() const229 NamespaceString MigrationSourceManager::getNss() const {
230 return _args.getNss();
231 }
232
startClone(OperationContext * opCtx)233 Status MigrationSourceManager::startClone(OperationContext* opCtx) {
234 invariant(!opCtx->lockState()->isLocked());
235 invariant(_state == kCreated);
236 auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
237 _stats.countDonorMoveChunkStarted.addAndFetch(1);
238
239 Grid::get(opCtx)
240 ->catalogClient()
241 ->logChange(opCtx,
242 "moveChunk.start",
243 getNss().ns(),
244 BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
245 << _args.getFromShardId()
246 << "to"
247 << _args.getToShardId()),
248 ShardingCatalogClient::kMajorityWriteConcern)
249 .ignore();
250
251 _cloneAndCommitTimer.reset();
252
253 {
254 // Register for notifications from the replication subsystem
255 AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
256 auto css = CollectionShardingState::get(opCtx, getNss().ns());
257
258 const auto metadata = css->getMetadata();
259 Status status = checkCollectionEpochMatches(metadata, _collectionEpoch);
260 if (!status.isOK())
261 return status;
262
263 // Having the metadata manager registered on the collection sharding state is what indicates
264 // that a chunk on that collection is being migrated. With an active migration, write
265 // operations require the cloner to be present in order to track changes to the chunk which
266 // needs to be transmitted to the recipient.
267 _cloneDriver = stdx::make_unique<MigrationChunkClonerSourceLegacy>(
268 _args, metadata->getKeyPattern(), _donorConnStr, _recipientHost);
269
270 css->setMigrationSourceManager(opCtx, this);
271 }
272
273 Status startCloneStatus = _cloneDriver->startClone(opCtx);
274 if (!startCloneStatus.isOK()) {
275 return startCloneStatus;
276 }
277
278 _state = kCloning;
279 scopedGuard.Dismiss();
280 return Status::OK();
281 }
282
awaitToCatchUp(OperationContext * opCtx)283 Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) {
284 invariant(!opCtx->lockState()->isLocked());
285 invariant(_state == kCloning);
286 auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
287 _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
288 _cloneAndCommitTimer.reset();
289
290 // Block until the cloner deems it appropriate to enter the critical section.
291 Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate(
292 opCtx, kMaxWaitToEnterCriticalSectionTimeout);
293 if (!catchUpStatus.isOK()) {
294 return catchUpStatus;
295 }
296
297 _state = kCloneCaughtUp;
298 scopedGuard.Dismiss();
299 return Status::OK();
300 }
301
enterCriticalSection(OperationContext * opCtx)302 Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) {
303 invariant(!opCtx->lockState()->isLocked());
304 invariant(_state == kCloneCaughtUp);
305 auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
306 _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
307 _cloneAndCommitTimer.reset();
308
309 {
310 const auto metadata = [&] {
311 AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
312 return CollectionShardingState::get(opCtx, _args.getNss())->getMetadata();
313 }();
314
315 Status status = checkCollectionEpochMatches(metadata, _collectionEpoch);
316 if (!status.isOK())
317 return status;
318
319 _notifyChangeStreamsOnRecipientFirstChunk(opCtx, metadata);
320 }
321
322 // Mark the shard as running critical operation, which requires recovery on crash.
323 //
324 // NOTE: The 'migrateChunkToNewShard' oplog message written by the above call to
325 // '_notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its local
326 // write to majority committed.
327 Status status = ShardingStateRecovery::startMetadataOp(opCtx);
328 if (!status.isOK()) {
329 return status;
330 }
331
332 {
333 // The critical section must be entered with collection X lock in order to ensure there are
334 // no writes which could have entered and passed the version check just before we entered
335 // the crticial section, but managed to complete after we left it.
336 AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
337
338 // IMPORTANT: After this line, the critical section is in place and needs to be signaled
339 _critSecSignal = std::make_shared<Notification<void>>();
340 }
341
342 _state = kCriticalSection;
343
344 // Persist a signal to secondaries that we've entered the critical section. This is will cause
345 // secondaries to refresh their routing table when next accessed, which will block behind the
346 // critical section. This ensures causal consistency by preventing a stale mongos with a cluster
347 // time inclusive of the migration config commit update from accessing secondary data.
348 // Note: this write must occur after the critSec flag is set, to ensure the secondary refresh
349 // will stall behind the flag.
350 Status signalStatus =
351 updateShardCollectionsEntry(opCtx,
352 BSON(ShardCollectionType::ns() << getNss().ns()),
353 BSONObj(),
354 BSON(ShardCollectionType::enterCriticalSectionCounter() << 1),
355 false /*upsert*/);
356 if (!signalStatus.isOK()) {
357 return {
358 ErrorCodes::OperationFailed,
359 str::stream() << "Failed to persist critical section signal for secondaries due to: "
360 << signalStatus.toString()};
361 }
362
363 log() << "Migration successfully entered critical section";
364
365 scopedGuard.Dismiss();
366 return Status::OK();
367 }
368
commitChunkOnRecipient(OperationContext * opCtx)369 Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) {
370 invariant(!opCtx->lockState()->isLocked());
371 invariant(_state == kCriticalSection);
372 auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
373
374 // Tell the recipient shard to fetch the latest changes.
375 auto commitCloneStatus = _cloneDriver->commitClone(opCtx);
376
377 if (MONGO_FAIL_POINT(failMigrationCommit) && commitCloneStatus.isOK()) {
378 commitCloneStatus = {ErrorCodes::InternalError,
379 "Failing _recvChunkCommit due to failpoint."};
380 }
381
382 if (!commitCloneStatus.isOK()) {
383 return commitCloneStatus.getStatus().withContext("commit clone failed");
384 }
385
386 _recipientCloneCounts = commitCloneStatus.getValue()["counts"].Obj().getOwned();
387
388 _state = kCloneCompleted;
389 scopedGuard.Dismiss();
390 return Status::OK();
391 }
392
commitChunkMetadataOnConfig(OperationContext * opCtx)393 Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opCtx) {
394 invariant(!opCtx->lockState()->isLocked());
395 invariant(_state == kCloneCompleted);
396 auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
397
398 // If we have chunks left on the FROM shard, bump the version of one of them as well. This will
399 // change the local collection major version, which indicates to other processes that the chunk
400 // metadata has changed and they should refresh.
401 BSONObjBuilder builder;
402
403 {
404 const auto metadata = [&] {
405 AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
406 return CollectionShardingState::get(opCtx, _args.getNss())->getMetadata();
407 }();
408
409 Status status = checkCollectionEpochMatches(metadata, _collectionEpoch);
410 if (!status.isOK())
411 return status;
412
413 boost::optional<ChunkType> controlChunkType = boost::none;
414 if (metadata->getNumChunks() > 1) {
415 ChunkType differentChunk;
416 invariant(metadata->getDifferentChunk(_args.getMinKey(), &differentChunk));
417 invariant(differentChunk.getMin().woCompare(_args.getMinKey()) != 0);
418 controlChunkType = std::move(differentChunk);
419 } else {
420 log() << "Moving last chunk for the collection out";
421 }
422
423 ChunkType migratedChunkType;
424 migratedChunkType.setMin(_args.getMinKey());
425 migratedChunkType.setMax(_args.getMaxKey());
426
427 CommitChunkMigrationRequest::appendAsCommand(&builder,
428 getNss(),
429 _args.getFromShardId(),
430 _args.getToShardId(),
431 migratedChunkType,
432 controlChunkType,
433 metadata->getCollVersion());
434
435 builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON());
436 }
437
438 // Read operations must begin to wait on the critical section just before we send the commit
439 // operation to the config server
440 {
441 AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
442 _readsShouldWaitOnCritSec = true;
443 }
444
445 Timer t;
446
447 auto commitChunkMigrationResponse =
448 Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
449 opCtx,
450 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
451 "admin",
452 builder.obj(),
453 Shard::RetryPolicy::kIdempotent);
454
455 if (MONGO_FAIL_POINT(migrationCommitNetworkError)) {
456 commitChunkMigrationResponse = Status(
457 ErrorCodes::InternalError, "Failpoint 'migrationCommitNetworkError' generated error");
458 }
459
460 Status migrationCommitStatus = commitChunkMigrationResponse.getStatus();
461 if (migrationCommitStatus.isOK()) {
462 migrationCommitStatus = commitChunkMigrationResponse.getValue().commandStatus;
463 if (migrationCommitStatus.isOK()) {
464 migrationCommitStatus = commitChunkMigrationResponse.getValue().writeConcernStatus;
465 }
466 }
467
468 if (!migrationCommitStatus.isOK()) {
469 // Need to get the latest optime in case the refresh request goes to a secondary --
470 // otherwise the read won't wait for the write that _configsvrCommitChunkMigration may have
471 // done
472 log() << "Error occurred while committing the migration. Performing a majority write "
473 "against the config server to obtain its latest optime"
474 << causedBy(redact(migrationCommitStatus));
475
476 Status status = Grid::get(opCtx)->catalogClient()->logChange(
477 opCtx,
478 "moveChunk.validating",
479 getNss().ns(),
480 BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
481 << _args.getFromShardId()
482 << "to"
483 << _args.getToShardId()),
484 ShardingCatalogClient::kMajorityWriteConcern);
485
486 if ((ErrorCodes::isInterruption(status.code()) ||
487 ErrorCodes::isShutdownError(status.code()) ||
488 status == ErrorCodes::CallbackCanceled) &&
489 globalInShutdownDeprecated()) {
490 // Since the server is already doing a clean shutdown, this call will just join the
491 // previous shutdown call
492 shutdown(waitForShutdown());
493 }
494
495 // If we failed to get the latest config optime because we stepped down as primary, then it
496 // is safe to fail without crashing because the new primary will fetch the latest optime
497 // when it recovers the sharding state recovery document, as long as we also clear the
498 // metadata for this collection, forcing subsequent callers to do a full refresh. Check if
499 // this node can accept writes for this collection as a proxy for it being primary.
500 if (!status.isOK()) {
501 AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
502 if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, getNss())) {
503 CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr);
504 uassertStatusOK(status.withContext(
505 str::stream() << "Unable to verify migration commit for chunk: "
506 << redact(_args.toString())
507 << " because the node's replication role changed. Metadata "
508 "was cleared for: "
509 << getNss().ns()
510 << ", so it will get a full refresh when accessed again."));
511 }
512 }
513
514 fassertStatusOK(
515 40137,
516 {status.code(),
517 str::stream() << "Failed to commit migration for chunk " << _args.toString()
518 << " due to "
519 << redact(migrationCommitStatus)
520 << ". Updating the optime with a write before refreshing the "
521 << "metadata also failed with "
522 << redact(status)});
523 }
524
525 // Because the CatalogCache's WithRefresh methods (on which forceShardFilteringMetadataRefresh
526 // depends) are not causally consistent, we need to perform up to two refresh rounds if refresh
527 // returns that the shard still owns the chunk
528 ChunkVersion collectionVersionAfterRefresh;
529
530 for (int retriesLeft = 1;; --retriesLeft) {
531 ChunkVersion unusedShardVersion;
532 Status refreshStatus =
533 ShardingState::get(opCtx)->refreshMetadataNow(opCtx, getNss(), &unusedShardVersion);
534
535 // If the refresh fails, there is no way to confirm whether the migration commit actually
536 // went through or not. Because of that, the collection's metadata is reset to UNSHARDED so
537 // that subsequent versioned requests will get StaleShardVersion and will retry the refresh.
538 if (!refreshStatus.isOK()) {
539 AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
540
541 CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr);
542
543 log() << "Failed to refresh metadata after a "
544 << (migrationCommitStatus.isOK() ? "failed commit attempt" : "successful commit")
545 << ". Metadata was cleared so it will get a full refresh when accessed again."
546 << causedBy(redact(refreshStatus));
547
548 return {
549 migrationCommitStatus.code(),
550 str::stream() << "Orphaned range not cleaned up. Failed to refresh metadata after"
551 " migration commit due to '"
552 << refreshStatus.toString()
553 << "', and commit failed due to '"
554 << migrationCommitStatus.toString()
555 << "'"};
556 }
557
558 auto refreshedMetadata = [&] {
559 AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
560 return CollectionShardingState::get(opCtx, getNss())->getMetadata();
561 }();
562
563 if (!refreshedMetadata) {
564 return {ErrorCodes::NamespaceNotSharded,
565 str::stream() << "Chunk move failed because collection '" << getNss().ns()
566 << "' is no longer sharded. The migration commit error was: "
567 << migrationCommitStatus.toString()};
568 }
569
570 // If after a successful refresh the metadata indicates that the node still owns the chunk,
571 // we must do one more refresh in order to ensure that the previous refresh round didn't
572 // join an already active catalog cache refresh and missed its own commit
573 if (!refreshedMetadata->keyBelongsToMe(_args.getMinKey())) {
574 collectionVersionAfterRefresh = refreshedMetadata->getCollVersion();
575 break;
576 }
577
578 if (retriesLeft)
579 continue;
580
581 // This condition may only happen if the migration commit has failed for any reason
582 if (migrationCommitStatus.isOK()) {
583 severe() << "The migration commit succeeded, but the new chunk placement was not "
584 "reflected after metadata refresh, which is an indication of an "
585 "afterOpTime bug.";
586 severe() << "The current config server opTime is " << Grid::get(opCtx)->configOpTime();
587 severe() << "The commit response came from "
588 << redact(commitChunkMigrationResponse.getValue().hostAndPort->toString())
589 << " and contained:";
590 severe() << " metadata: "
591 << redact(commitChunkMigrationResponse.getValue().metadata.toString());
592 severe() << " response: "
593 << redact(commitChunkMigrationResponse.getValue().response.toString());
594
595 fassertFailed(50878);
596 }
597
598 // The chunk modification was not applied, so report the original error
599 return {migrationCommitStatus.code(),
600 str::stream() << "Chunk move was not successful due to "
601 << migrationCommitStatus.reason()};
602 }
603
604 invariant(collectionVersionAfterRefresh.isSet());
605
606 // Migration succeeded
607 log() << "Migration succeeded and updated collection version to "
608 << collectionVersionAfterRefresh;
609
610 MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection);
611
612 scopedGuard.Dismiss();
613
614 _stats.totalCriticalSectionCommitTimeMillis.addAndFetch(t.millis());
615
616 // Exit the critical section and ensure that all the necessary state is fully persisted before
617 // scheduling orphan cleanup.
618 _cleanup(opCtx);
619
620 Grid::get(opCtx)
621 ->catalogClient()
622 ->logChange(opCtx,
623 "moveChunk.commit",
624 getNss().ns(),
625 BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
626 << _args.getFromShardId()
627 << "to"
628 << _args.getToShardId()
629 << "counts"
630 << _recipientCloneCounts),
631 ShardingCatalogClient::kMajorityWriteConcern)
632 .ignore();
633
634 const ChunkRange range(_args.getMinKey(), _args.getMaxKey());
635
636 auto notification = [&] {
637 auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingState::kNow
638 : CollectionShardingState::kDelayed;
639 AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
640 return CollectionShardingState::get(opCtx, getNss())
641 ->cleanUpRange(opCtx, autoColl.getCollection(), range, whenToClean);
642 }();
643
644 if (!MONGO_FAIL_POINT(doNotRefreshRecipientAfterCommit)) {
645 // Best-effort make the recipient refresh its routing table to the new collection version.
646 refreshRecipientRoutingTable(
647 opCtx, getNss(), _args.getToShardId(), _recipientHost, collectionVersionAfterRefresh);
648 }
649
650 if (_args.getWaitForDelete()) {
651 log() << "Waiting for cleanup of " << getNss().ns() << " range "
652 << redact(range.toString());
653 return notification.waitStatus(opCtx);
654 }
655
656 if (notification.ready() && !notification.waitStatus(opCtx).isOK()) {
657 warning() << "Failed to initiate cleanup of " << getNss().ns() << " range "
658 << redact(range.toString())
659 << " due to: " << redact(notification.waitStatus(opCtx));
660 } else {
661 log() << "Leaving cleanup of " << getNss().ns() << " range " << redact(range.toString())
662 << " to complete in background";
663 notification.abandon();
664 }
665
666 return Status::OK();
667 }
668
cleanupOnError(OperationContext * opCtx)669 void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) {
670 if (_state == kDone) {
671 return;
672 }
673
674 Grid::get(opCtx)
675 ->catalogClient()
676 ->logChange(opCtx,
677 "moveChunk.error",
678 getNss().ns(),
679 BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
680 << _args.getFromShardId()
681 << "to"
682 << _args.getToShardId()),
683 ShardingCatalogClient::kMajorityWriteConcern)
684 .ignore();
685
686 try {
687 _cleanup(opCtx);
688 } catch (const ExceptionForCat<ErrorCategory::NotMasterError>& ex) {
689 warning() << "Failed to clean up migration: " << redact(_args.toString())
690 << "due to: " << redact(ex);
691 }
692 }
693
_notifyChangeStreamsOnRecipientFirstChunk(OperationContext * opCtx,const ScopedCollectionMetadata & metadata)694 void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
695 OperationContext* opCtx, const ScopedCollectionMetadata& metadata) {
696 // Change streams are only supported in 3.6 and above
697 if (serverGlobalParams.featureCompatibility.getVersion() !=
698 ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo36)
699 return;
700
701 // If this is not the first donation, there is nothing to be done
702 if (metadata->getChunkManager()->getVersion(_args.getToShardId()).isSet())
703 return;
704
705 const std::string dbgMessage = str::stream()
706 << "Migrating chunk from shard " << _args.getFromShardId() << " to shard "
707 << _args.getToShardId() << " with no chunks for this collection";
708
709 // The message expected by change streams
710 const auto o2Message = BSON("type"
711 << "migrateChunkToNewShard"
712 << "from"
713 << _args.getFromShardId()
714 << "to"
715 << _args.getToShardId());
716
717 auto const serviceContext = opCtx->getClient()->getServiceContext();
718
719 AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
720 writeConflictRetry(
721 opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
722 WriteUnitOfWork uow(opCtx);
723 serviceContext->getOpObserver()->onInternalOpMessage(
724 opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message);
725 uow.commit();
726 });
727 }
728
_cleanup(OperationContext * opCtx)729 void MigrationSourceManager::_cleanup(OperationContext* opCtx) {
730 invariant(_state != kDone);
731
732 auto cloneDriver = [&]() {
733 // Unregister from the collection's sharding state
734 AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
735
736 auto css = CollectionShardingState::get(opCtx, getNss().ns());
737
738 // The migration source manager is not visible anymore after it is unregistered from the
739 // collection
740 css->clearMigrationSourceManager(opCtx);
741
742 // Leave the critical section.
743 if (_critSecSignal) {
744 _critSecSignal->set();
745 }
746
747 return std::move(_cloneDriver);
748 }();
749
750 // The cleanup operations below are potentially blocking or acquire other locks, so perform them
751 // outside of the collection X lock
752
753 if (cloneDriver) {
754 cloneDriver->cancelClone(opCtx);
755 }
756
757 if (_state == kCriticalSection || _state == kCloneCompleted) {
758 _stats.totalCriticalSectionTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
759
760 // NOTE: The order of the operations below is important and the comments explain the
761 // reasoning behind it
762
763 // Wait for the updates to the cache of the routing table to be fully written to disk before
764 // clearing the 'minOpTime recovery' document. This way, we ensure that all nodes from a
765 // shard, which donated a chunk will always be at the shard version of the last migration it
766 // performed.
767 //
768 // If the metadata is not persisted before clearing the 'inMigration' flag below, it is
769 // possible that the persisted metadata is rolled back after step down, but the write which
770 // cleared the 'inMigration' flag is not, a secondary node will report itself at an older
771 // shard version.
772 CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, getNss());
773
774 // Clear the 'minOpTime recovery' document so that the next time a node from this shard
775 // becomes a primary, it won't have to recover the config server optime.
776 ShardingStateRecovery::endMetadataOp(opCtx);
777 }
778
779 _state = kDone;
780 }
781
getMigrationCriticalSectionSignal(bool isForReadOnlyOperation) const782 std::shared_ptr<Notification<void>> MigrationSourceManager::getMigrationCriticalSectionSignal(
783 bool isForReadOnlyOperation) const {
784 if (!isForReadOnlyOperation) {
785 return _critSecSignal;
786 }
787
788 if (_readsShouldWaitOnCritSec) {
789 return _critSecSignal;
790 }
791
792 return nullptr;
793 }
794
getMigrationStatusReport() const795 BSONObj MigrationSourceManager::getMigrationStatusReport() const {
796 return migrationutil::makeMigrationStatusDocument(getNss(),
797 _args.getFromShardId(),
798 _args.getToShardId(),
799 true,
800 _args.getMinKey(),
801 _args.getMaxKey());
802 }
803
804 } // namespace mongo
805