1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/s/balancer/balancer.h"
36
37 #include <algorithm>
38 #include <string>
39
40 #include "mongo/base/status_with.h"
41 #include "mongo/bson/bsonobjbuilder.h"
42 #include "mongo/client/read_preference.h"
43 #include "mongo/db/client.h"
44 #include "mongo/db/namespace_string.h"
45 #include "mongo/db/operation_context.h"
46 #include "mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h"
47 #include "mongo/db/s/balancer/cluster_statistics_impl.h"
48 #include "mongo/s/balancer_configuration.h"
49 #include "mongo/s/catalog/sharding_catalog_client.h"
50 #include "mongo/s/catalog/type_chunk.h"
51 #include "mongo/s/catalog_cache.h"
52 #include "mongo/s/client/shard_registry.h"
53 #include "mongo/s/cluster_identity_loader.h"
54 #include "mongo/s/grid.h"
55 #include "mongo/s/shard_util.h"
56 #include "mongo/stdx/memory.h"
57 #include "mongo/util/concurrency/idle_thread_block.h"
58 #include "mongo/util/exit.h"
59 #include "mongo/util/fail_point_service.h"
60 #include "mongo/util/log.h"
61 #include "mongo/util/timer.h"
62 #include "mongo/util/version.h"
63
64 namespace mongo {
65
66 using std::map;
67 using std::string;
68 using std::vector;
69
70 namespace {
71
72 MONGO_FP_DECLARE(overrideBalanceRoundInterval);
73
74 const Seconds kBalanceRoundDefaultInterval(10);
75
76 // Sleep between balancer rounds in the case where the last round found some chunks which needed to
77 // be balanced. This value should be set sufficiently low so that imbalanced clusters will quickly
78 // reach balanced state, but setting it too low may cause CRUD operations to start failing due to
79 // not being able to establish a stable shard version.
80 const Seconds kShortBalanceRoundInterval(1);
81
82 const auto getBalancer = ServiceContext::declareDecoration<std::unique_ptr<Balancer>>();
83
84 /**
85 * Utility class to generate timing and statistics for a single balancer round.
86 */
87 class BalanceRoundDetails {
88 public:
BalanceRoundDetails()89 BalanceRoundDetails() : _executionTimer() {}
90
setSucceeded(int candidateChunks,int chunksMoved)91 void setSucceeded(int candidateChunks, int chunksMoved) {
92 invariant(!_errMsg);
93 _candidateChunks = candidateChunks;
94 _chunksMoved = chunksMoved;
95 }
96
setFailed(const string & errMsg)97 void setFailed(const string& errMsg) {
98 _errMsg = errMsg;
99 }
100
toBSON() const101 BSONObj toBSON() const {
102 BSONObjBuilder builder;
103 builder.append("executionTimeMillis", _executionTimer.millis());
104 builder.append("errorOccured", _errMsg.is_initialized());
105
106 if (_errMsg) {
107 builder.append("errmsg", *_errMsg);
108 } else {
109 builder.append("candidateChunks", _candidateChunks);
110 builder.append("chunksMoved", _chunksMoved);
111 }
112
113 return builder.obj();
114 }
115
116 private:
117 const Timer _executionTimer;
118
119 // Set only on success
120 int _candidateChunks{0};
121 int _chunksMoved{0};
122
123 // Set only on failure
124 boost::optional<string> _errMsg;
125 };
126
127 /**
128 * Occasionally prints a log message with shard versions if the versions are not the same
129 * in the cluster.
130 */
warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics> & clusterStats)131 void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& clusterStats) {
132 auto&& vii = VersionInfoInterface::instance();
133
134 bool isMultiVersion = false;
135 for (const auto& stat : clusterStats) {
136 if (!vii.isSameMajorVersion(stat.mongoVersion.c_str())) {
137 isMultiVersion = true;
138 break;
139 }
140 }
141
142 // If we're all the same version, don't message
143 if (!isMultiVersion)
144 return;
145
146 StringBuilder sb;
147 sb << "Multi version cluster detected. Local version: " << vii.version()
148 << ", shard versions: ";
149
150 for (const auto& stat : clusterStats) {
151 sb << stat.shardId << " is at " << stat.mongoVersion << "; ";
152 }
153
154 warning() << sb.str();
155 }
156
157 } // namespace
158
Balancer(ServiceContext * serviceContext)159 Balancer::Balancer(ServiceContext* serviceContext)
160 : _balancedLastTime(0),
161 _random(std::random_device{}()),
162 _clusterStats(stdx::make_unique<ClusterStatisticsImpl>(_random)),
163 _chunkSelectionPolicy(
164 stdx::make_unique<BalancerChunkSelectionPolicyImpl>(_clusterStats.get(), _random)),
165 _migrationManager(serviceContext) {}
166
~Balancer()167 Balancer::~Balancer() {
168 // The balancer thread must have been stopped
169 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
170 invariant(_state == kStopped);
171 }
172
create(ServiceContext * serviceContext)173 void Balancer::create(ServiceContext* serviceContext) {
174 invariant(!getBalancer(serviceContext));
175 getBalancer(serviceContext) = stdx::make_unique<Balancer>(serviceContext);
176 }
177
get(ServiceContext * serviceContext)178 Balancer* Balancer::get(ServiceContext* serviceContext) {
179 return getBalancer(serviceContext).get();
180 }
181
get(OperationContext * operationContext)182 Balancer* Balancer::get(OperationContext* operationContext) {
183 return get(operationContext->getServiceContext());
184 }
185
initiateBalancer(OperationContext * opCtx)186 void Balancer::initiateBalancer(OperationContext* opCtx) {
187 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
188 invariant(_state == kStopped);
189 _state = kRunning;
190
191 _migrationManager.startRecoveryAndAcquireDistLocks(opCtx);
192
193 invariant(!_thread.joinable());
194 invariant(!_threadOperationContext);
195 _thread = stdx::thread([this] { _mainThread(); });
196 }
197
interruptBalancer()198 void Balancer::interruptBalancer() {
199 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
200 if (_state != kRunning)
201 return;
202
203 _state = kStopping;
204
205 // Interrupt the balancer thread if it has been started. We are guaranteed that the operation
206 // context of that thread is still alive, because we hold the balancer mutex.
207 if (_threadOperationContext) {
208 stdx::lock_guard<Client> scopedClientLock(*_threadOperationContext->getClient());
209 _threadOperationContext->markKilled(ErrorCodes::InterruptedDueToReplStateChange);
210 }
211
212 // Schedule a separate thread to shutdown the migration manager in order to avoid deadlock with
213 // replication step down
214 invariant(!_migrationManagerInterruptThread.joinable());
215 _migrationManagerInterruptThread =
216 stdx::thread([this] { _migrationManager.interruptAndDisableMigrations(); });
217
218 _condVar.notify_all();
219 }
220
waitForBalancerToStop()221 void Balancer::waitForBalancerToStop() {
222 {
223 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
224 if (_state == kStopped)
225 return;
226
227 invariant(_state == kStopping);
228 invariant(_thread.joinable());
229 }
230
231 _thread.join();
232
233 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
234 _state = kStopped;
235 _thread = {};
236
237 LOG(1) << "Balancer thread terminated";
238 }
239
joinCurrentRound(OperationContext * opCtx)240 void Balancer::joinCurrentRound(OperationContext* opCtx) {
241 stdx::unique_lock<stdx::mutex> scopedLock(_mutex);
242 const auto numRoundsAtStart = _numBalancerRounds;
243 opCtx->waitForConditionOrInterrupt(_condVar, scopedLock, [&] {
244 return !_inBalancerRound || _numBalancerRounds != numRoundsAtStart;
245 });
246 }
247
rebalanceSingleChunk(OperationContext * opCtx,const ChunkType & chunk)248 Status Balancer::rebalanceSingleChunk(OperationContext* opCtx, const ChunkType& chunk) {
249 auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(opCtx, chunk);
250 if (!migrateStatus.isOK()) {
251 return migrateStatus.getStatus();
252 }
253
254 auto migrateInfo = std::move(migrateStatus.getValue());
255 if (!migrateInfo) {
256 LOG(1) << "Unable to find more appropriate location for chunk " << redact(chunk.toString());
257 return Status::OK();
258 }
259
260 auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
261 Status refreshStatus = balancerConfig->refreshAndCheck(opCtx);
262 if (!refreshStatus.isOK()) {
263 return refreshStatus;
264 }
265
266 return _migrationManager.executeManualMigration(opCtx,
267 *migrateInfo,
268 balancerConfig->getMaxChunkSizeBytes(),
269 balancerConfig->getSecondaryThrottle(),
270 balancerConfig->waitForDelete());
271 }
272
moveSingleChunk(OperationContext * opCtx,const ChunkType & chunk,const ShardId & newShardId,uint64_t maxChunkSizeBytes,const MigrationSecondaryThrottleOptions & secondaryThrottle,bool waitForDelete)273 Status Balancer::moveSingleChunk(OperationContext* opCtx,
274 const ChunkType& chunk,
275 const ShardId& newShardId,
276 uint64_t maxChunkSizeBytes,
277 const MigrationSecondaryThrottleOptions& secondaryThrottle,
278 bool waitForDelete) {
279 auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(opCtx, chunk, newShardId);
280 if (!moveAllowedStatus.isOK()) {
281 return moveAllowedStatus;
282 }
283
284 return _migrationManager.executeManualMigration(
285 opCtx, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete);
286 }
287
report(OperationContext * opCtx,BSONObjBuilder * builder)288 void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) {
289 auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
290 balancerConfig->refreshAndCheck(opCtx).transitional_ignore();
291
292 const auto mode = balancerConfig->getBalancerMode();
293
294 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
295 builder->append("mode", BalancerSettingsType::kBalancerModes[mode]);
296 builder->append("inBalancerRound", _inBalancerRound);
297 builder->append("numBalancerRounds", _numBalancerRounds);
298 }
299
_mainThread()300 void Balancer::_mainThread() {
301 Client::initThread("Balancer");
302 auto opCtx = cc().makeOperationContext();
303 auto shardingContext = Grid::get(opCtx.get());
304
305 log() << "CSRS balancer is starting";
306
307 {
308 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
309 _threadOperationContext = opCtx.get();
310 }
311
312 const Seconds kInitBackoffInterval(10);
313
314 auto balancerConfig = shardingContext->getBalancerConfiguration();
315 while (!_stopRequested()) {
316 Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get());
317 if (!refreshStatus.isOK()) {
318 warning() << "Balancer settings could not be loaded and will be retried in "
319 << durationCount<Seconds>(kInitBackoffInterval) << " seconds"
320 << causedBy(refreshStatus);
321
322 _sleepFor(opCtx.get(), kInitBackoffInterval);
323 continue;
324 }
325
326 break;
327 }
328
329 log() << "CSRS balancer thread is recovering";
330
331 _migrationManager.finishRecovery(opCtx.get(),
332 balancerConfig->getMaxChunkSizeBytes(),
333 balancerConfig->getSecondaryThrottle());
334
335 log() << "CSRS balancer thread is recovered";
336
337 // Main balancer loop
338 while (!_stopRequested()) {
339 BalanceRoundDetails roundDetails;
340
341 _beginRound(opCtx.get());
342
343 try {
344 shardingContext->shardRegistry()->reload(opCtx.get());
345
346 uassert(13258, "oids broken after resetting!", _checkOIDs(opCtx.get()));
347
348 Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get());
349 if (!refreshStatus.isOK()) {
350 warning() << "Skipping balancing round" << causedBy(refreshStatus);
351 _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
352 continue;
353 }
354
355 if (!balancerConfig->shouldBalance()) {
356 LOG(1) << "Skipping balancing round because balancing is disabled";
357 _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
358 continue;
359 }
360
361 {
362 LOG(1) << "*** start balancing round. "
363 << "waitForDelete: " << balancerConfig->waitForDelete()
364 << ", secondaryThrottle: "
365 << balancerConfig->getSecondaryThrottle().toBSON();
366
367 OCCASIONALLY warnOnMultiVersion(
368 uassertStatusOK(_clusterStats->getStats(opCtx.get())));
369
370 Status status = _splitChunksIfNeeded(opCtx.get());
371 if (!status.isOK()) {
372 warning() << "Failed to split chunks" << causedBy(status);
373 } else {
374 LOG(1) << "Done enforcing tag range boundaries.";
375 }
376
377 const auto candidateChunks = uassertStatusOK(
378 _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), _balancedLastTime));
379
380 if (candidateChunks.empty()) {
381 LOG(1) << "no need to move any chunk";
382 _balancedLastTime = false;
383 } else {
384 _balancedLastTime = _moveChunks(opCtx.get(), candidateChunks);
385
386 roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()),
387 _balancedLastTime);
388
389 shardingContext->catalogClient()
390 ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON())
391 .ignore();
392 }
393
394 LOG(1) << "*** End of balancing round";
395 }
396
397
398 auto balancerInterval = [&]() -> Milliseconds {
399 MONGO_FAIL_POINT_BLOCK(overrideBalanceRoundInterval, data) {
400 int interval = data.getData()["intervalMs"].numberInt();
401 log() << "overrideBalanceRoundInterval: using shorter balancing interval: "
402 << interval << "ms";
403
404 return Milliseconds(interval);
405 }
406
407 return _balancedLastTime ? kShortBalanceRoundInterval
408 : kBalanceRoundDefaultInterval;
409 }();
410
411 _endRound(opCtx.get(), balancerInterval);
412 } catch (const std::exception& e) {
413 log() << "caught exception while doing balance: " << e.what();
414
415 // Just to match the opening statement if in log level 1
416 LOG(1) << "*** End of balancing round";
417
418 // This round failed, tell the world!
419 roundDetails.setFailed(e.what());
420
421 shardingContext->catalogClient()
422 ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON())
423 .transitional_ignore();
424
425 // Sleep a fair amount before retrying because of the error
426 _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
427 }
428 }
429
430 {
431 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
432 invariant(_state == kStopping);
433 invariant(_migrationManagerInterruptThread.joinable());
434 }
435
436 _migrationManagerInterruptThread.join();
437 _migrationManager.drainActiveMigrations();
438
439 {
440 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
441 _migrationManagerInterruptThread = {};
442 _threadOperationContext = nullptr;
443 }
444
445 log() << "CSRS balancer is now stopped";
446 }
447
_stopRequested()448 bool Balancer::_stopRequested() {
449 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
450 return (_state != kRunning);
451 }
452
_beginRound(OperationContext * opCtx)453 void Balancer::_beginRound(OperationContext* opCtx) {
454 stdx::unique_lock<stdx::mutex> lock(_mutex);
455 _inBalancerRound = true;
456 _condVar.notify_all();
457 }
458
_endRound(OperationContext * opCtx,Milliseconds waitTimeout)459 void Balancer::_endRound(OperationContext* opCtx, Milliseconds waitTimeout) {
460 {
461 stdx::lock_guard<stdx::mutex> lock(_mutex);
462 _inBalancerRound = false;
463 _numBalancerRounds++;
464 _condVar.notify_all();
465 }
466
467 MONGO_IDLE_THREAD_BLOCK;
468 _sleepFor(opCtx, waitTimeout);
469 }
470
_sleepFor(OperationContext * opCtx,Milliseconds waitTimeout)471 void Balancer::_sleepFor(OperationContext* opCtx, Milliseconds waitTimeout) {
472 stdx::unique_lock<stdx::mutex> lock(_mutex);
473 _condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; });
474 }
475
_checkOIDs(OperationContext * opCtx)476 bool Balancer::_checkOIDs(OperationContext* opCtx) {
477 auto shardingContext = Grid::get(opCtx);
478
479 vector<ShardId> all;
480 shardingContext->shardRegistry()->getAllShardIds(&all);
481
482 // map of OID machine ID => shardId
483 map<int, ShardId> oids;
484
485 for (const ShardId& shardId : all) {
486 if (_stopRequested()) {
487 return false;
488 }
489
490 auto shardStatus = shardingContext->shardRegistry()->getShard(opCtx, shardId);
491 if (!shardStatus.isOK()) {
492 continue;
493 }
494 const auto s = shardStatus.getValue();
495
496 auto result = uassertStatusOK(
497 s->runCommandWithFixedRetryAttempts(opCtx,
498 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
499 "admin",
500 BSON("features" << 1),
501 Shard::RetryPolicy::kIdempotent));
502 uassertStatusOK(result.commandStatus);
503 BSONObj f = std::move(result.response);
504
505 if (f["oidMachine"].isNumber()) {
506 int x = f["oidMachine"].numberInt();
507 if (oids.count(x) == 0) {
508 oids[x] = shardId;
509 } else {
510 log() << "error: 2 machines have " << x << " as oid machine piece: " << shardId
511 << " and " << oids[x];
512
513 result = uassertStatusOK(s->runCommandWithFixedRetryAttempts(
514 opCtx,
515 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
516 "admin",
517 BSON("features" << 1 << "oidReset" << 1),
518 Shard::RetryPolicy::kIdempotent));
519 uassertStatusOK(result.commandStatus);
520
521 auto otherShardStatus = shardingContext->shardRegistry()->getShard(opCtx, oids[x]);
522 if (otherShardStatus.isOK()) {
523 result = uassertStatusOK(
524 otherShardStatus.getValue()->runCommandWithFixedRetryAttempts(
525 opCtx,
526 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
527 "admin",
528 BSON("features" << 1 << "oidReset" << 1),
529 Shard::RetryPolicy::kIdempotent));
530 uassertStatusOK(result.commandStatus);
531 }
532
533 return false;
534 }
535 } else {
536 log() << "warning: oidMachine not set on: " << s->toString();
537 }
538 }
539
540 return true;
541 }
542
_splitChunksIfNeeded(OperationContext * opCtx)543 Status Balancer::_splitChunksIfNeeded(OperationContext* opCtx) {
544 auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(opCtx);
545 if (!chunksToSplitStatus.isOK()) {
546 return chunksToSplitStatus.getStatus();
547 }
548
549 for (const auto& splitInfo : chunksToSplitStatus.getValue()) {
550 auto routingInfoStatus =
551 Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
552 opCtx, splitInfo.nss);
553 if (!routingInfoStatus.isOK()) {
554 return routingInfoStatus.getStatus();
555 }
556
557 auto cm = routingInfoStatus.getValue().cm();
558
559 auto splitStatus =
560 shardutil::splitChunkAtMultiplePoints(opCtx,
561 splitInfo.shardId,
562 splitInfo.nss,
563 cm->getShardKeyPattern(),
564 splitInfo.collectionVersion,
565 ChunkRange(splitInfo.minKey, splitInfo.maxKey),
566 splitInfo.splitKeys);
567 if (!splitStatus.isOK()) {
568 warning() << "Failed to split chunk " << redact(splitInfo.toString())
569 << causedBy(redact(splitStatus.getStatus()));
570 }
571 }
572
573 return Status::OK();
574 }
575
_moveChunks(OperationContext * opCtx,const BalancerChunkSelectionPolicy::MigrateInfoVector & candidateChunks)576 int Balancer::_moveChunks(OperationContext* opCtx,
577 const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks) {
578 auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
579
580 // If the balancer was disabled since we started this round, don't start new chunk moves
581 if (_stopRequested() || !balancerConfig->shouldBalance()) {
582 LOG(1) << "Skipping balancing round because balancer was stopped";
583 return 0;
584 }
585
586 auto migrationStatuses =
587 _migrationManager.executeMigrationsForAutoBalance(opCtx,
588 candidateChunks,
589 balancerConfig->getMaxChunkSizeBytes(),
590 balancerConfig->getSecondaryThrottle(),
591 balancerConfig->waitForDelete());
592
593 int numChunksProcessed = 0;
594
595 for (const auto& migrationStatusEntry : migrationStatuses) {
596 const Status& status = migrationStatusEntry.second;
597 if (status.isOK()) {
598 numChunksProcessed++;
599 continue;
600 }
601
602 const MigrationIdentifier& migrationId = migrationStatusEntry.first;
603
604 const auto requestIt = std::find_if(candidateChunks.begin(),
605 candidateChunks.end(),
606 [&migrationId](const MigrateInfo& migrateInfo) {
607 return migrateInfo.getName() == migrationId;
608 });
609 invariant(requestIt != candidateChunks.end());
610
611 if (status == ErrorCodes::ChunkTooBig) {
612 numChunksProcessed++;
613
614 log() << "Performing a split because migration " << redact(requestIt->toString())
615 << " failed for size reasons" << causedBy(redact(status));
616
617 _splitOrMarkJumbo(opCtx, NamespaceString(requestIt->ns), requestIt->minKey);
618 continue;
619 }
620
621 log() << "Balancer move " << redact(requestIt->toString()) << " failed"
622 << causedBy(redact(status));
623 }
624
625 return numChunksProcessed;
626 }
627
_splitOrMarkJumbo(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & minKey)628 void Balancer::_splitOrMarkJumbo(OperationContext* opCtx,
629 const NamespaceString& nss,
630 const BSONObj& minKey) {
631 auto routingInfo = uassertStatusOK(
632 Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss));
633 const auto cm = routingInfo.cm().get();
634
635 const auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey);
636
637 try {
638 const auto splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
639 opCtx,
640 chunk->getShardId(),
641 nss,
642 cm->getShardKeyPattern(),
643 ChunkRange(chunk->getMin(), chunk->getMax()),
644 Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
645 boost::none));
646
647 uassert(ErrorCodes::CannotSplit, "No split points found", !splitPoints.empty());
648
649 uassertStatusOK(
650 shardutil::splitChunkAtMultiplePoints(opCtx,
651 chunk->getShardId(),
652 nss,
653 cm->getShardKeyPattern(),
654 cm->getVersion(),
655 ChunkRange(chunk->getMin(), chunk->getMax()),
656 splitPoints));
657 } catch (const DBException&) {
658 log() << "Marking chunk " << redact(chunk->toString()) << " as jumbo.";
659
660 chunk->markAsJumbo();
661
662 const std::string chunkName = ChunkType::genID(nss.ns(), chunk->getMin());
663
664 auto status = Grid::get(opCtx)->catalogClient()->updateConfigDocument(
665 opCtx,
666 ChunkType::ConfigNS,
667 BSON(ChunkType::name(chunkName)),
668 BSON("$set" << BSON(ChunkType::jumbo(true))),
669 false,
670 ShardingCatalogClient::kMajorityWriteConcern);
671 if (!status.isOK()) {
672 log() << "Couldn't set jumbo for chunk: " << redact(chunkName)
673 << causedBy(redact(status.getStatus()));
674 }
675 }
676 }
677
678 } // namespace mongo
679