1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/s/balancer/balancer.h"
36 
37 #include <algorithm>
38 #include <string>
39 
40 #include "mongo/base/status_with.h"
41 #include "mongo/bson/bsonobjbuilder.h"
42 #include "mongo/client/read_preference.h"
43 #include "mongo/db/client.h"
44 #include "mongo/db/namespace_string.h"
45 #include "mongo/db/operation_context.h"
46 #include "mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h"
47 #include "mongo/db/s/balancer/cluster_statistics_impl.h"
48 #include "mongo/s/balancer_configuration.h"
49 #include "mongo/s/catalog/sharding_catalog_client.h"
50 #include "mongo/s/catalog/type_chunk.h"
51 #include "mongo/s/catalog_cache.h"
52 #include "mongo/s/client/shard_registry.h"
53 #include "mongo/s/cluster_identity_loader.h"
54 #include "mongo/s/grid.h"
55 #include "mongo/s/shard_util.h"
56 #include "mongo/stdx/memory.h"
57 #include "mongo/util/concurrency/idle_thread_block.h"
58 #include "mongo/util/exit.h"
59 #include "mongo/util/fail_point_service.h"
60 #include "mongo/util/log.h"
61 #include "mongo/util/timer.h"
62 #include "mongo/util/version.h"
63 
64 namespace mongo {
65 
66 using std::map;
67 using std::string;
68 using std::vector;
69 
70 namespace {
71 
72 MONGO_FP_DECLARE(overrideBalanceRoundInterval);
73 
74 const Seconds kBalanceRoundDefaultInterval(10);
75 
76 // Sleep between balancer rounds in the case where the last round found some chunks which needed to
77 // be balanced. This value should be set sufficiently low so that imbalanced clusters will quickly
78 // reach balanced state, but setting it too low may cause CRUD operations to start failing due to
79 // not being able to establish a stable shard version.
80 const Seconds kShortBalanceRoundInterval(1);
81 
82 const auto getBalancer = ServiceContext::declareDecoration<std::unique_ptr<Balancer>>();
83 
84 /**
85  * Utility class to generate timing and statistics for a single balancer round.
86  */
87 class BalanceRoundDetails {
88 public:
BalanceRoundDetails()89     BalanceRoundDetails() : _executionTimer() {}
90 
setSucceeded(int candidateChunks,int chunksMoved)91     void setSucceeded(int candidateChunks, int chunksMoved) {
92         invariant(!_errMsg);
93         _candidateChunks = candidateChunks;
94         _chunksMoved = chunksMoved;
95     }
96 
setFailed(const string & errMsg)97     void setFailed(const string& errMsg) {
98         _errMsg = errMsg;
99     }
100 
toBSON() const101     BSONObj toBSON() const {
102         BSONObjBuilder builder;
103         builder.append("executionTimeMillis", _executionTimer.millis());
104         builder.append("errorOccured", _errMsg.is_initialized());
105 
106         if (_errMsg) {
107             builder.append("errmsg", *_errMsg);
108         } else {
109             builder.append("candidateChunks", _candidateChunks);
110             builder.append("chunksMoved", _chunksMoved);
111         }
112 
113         return builder.obj();
114     }
115 
116 private:
117     const Timer _executionTimer;
118 
119     // Set only on success
120     int _candidateChunks{0};
121     int _chunksMoved{0};
122 
123     // Set only on failure
124     boost::optional<string> _errMsg;
125 };
126 
127 /**
128  * Occasionally prints a log message with shard versions if the versions are not the same
129  * in the cluster.
130  */
warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics> & clusterStats)131 void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& clusterStats) {
132     auto&& vii = VersionInfoInterface::instance();
133 
134     bool isMultiVersion = false;
135     for (const auto& stat : clusterStats) {
136         if (!vii.isSameMajorVersion(stat.mongoVersion.c_str())) {
137             isMultiVersion = true;
138             break;
139         }
140     }
141 
142     // If we're all the same version, don't message
143     if (!isMultiVersion)
144         return;
145 
146     StringBuilder sb;
147     sb << "Multi version cluster detected. Local version: " << vii.version()
148        << ", shard versions: ";
149 
150     for (const auto& stat : clusterStats) {
151         sb << stat.shardId << " is at " << stat.mongoVersion << "; ";
152     }
153 
154     warning() << sb.str();
155 }
156 
157 }  // namespace
158 
Balancer(ServiceContext * serviceContext)159 Balancer::Balancer(ServiceContext* serviceContext)
160     : _balancedLastTime(0),
161       _random(std::random_device{}()),
162       _clusterStats(stdx::make_unique<ClusterStatisticsImpl>(_random)),
163       _chunkSelectionPolicy(
164           stdx::make_unique<BalancerChunkSelectionPolicyImpl>(_clusterStats.get(), _random)),
165       _migrationManager(serviceContext) {}
166 
~Balancer()167 Balancer::~Balancer() {
168     // The balancer thread must have been stopped
169     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
170     invariant(_state == kStopped);
171 }
172 
create(ServiceContext * serviceContext)173 void Balancer::create(ServiceContext* serviceContext) {
174     invariant(!getBalancer(serviceContext));
175     getBalancer(serviceContext) = stdx::make_unique<Balancer>(serviceContext);
176 }
177 
get(ServiceContext * serviceContext)178 Balancer* Balancer::get(ServiceContext* serviceContext) {
179     return getBalancer(serviceContext).get();
180 }
181 
get(OperationContext * operationContext)182 Balancer* Balancer::get(OperationContext* operationContext) {
183     return get(operationContext->getServiceContext());
184 }
185 
initiateBalancer(OperationContext * opCtx)186 void Balancer::initiateBalancer(OperationContext* opCtx) {
187     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
188     invariant(_state == kStopped);
189     _state = kRunning;
190 
191     _migrationManager.startRecoveryAndAcquireDistLocks(opCtx);
192 
193     invariant(!_thread.joinable());
194     invariant(!_threadOperationContext);
195     _thread = stdx::thread([this] { _mainThread(); });
196 }
197 
interruptBalancer()198 void Balancer::interruptBalancer() {
199     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
200     if (_state != kRunning)
201         return;
202 
203     _state = kStopping;
204 
205     // Interrupt the balancer thread if it has been started. We are guaranteed that the operation
206     // context of that thread is still alive, because we hold the balancer mutex.
207     if (_threadOperationContext) {
208         stdx::lock_guard<Client> scopedClientLock(*_threadOperationContext->getClient());
209         _threadOperationContext->markKilled(ErrorCodes::InterruptedDueToReplStateChange);
210     }
211 
212     // Schedule a separate thread to shutdown the migration manager in order to avoid deadlock with
213     // replication step down
214     invariant(!_migrationManagerInterruptThread.joinable());
215     _migrationManagerInterruptThread =
216         stdx::thread([this] { _migrationManager.interruptAndDisableMigrations(); });
217 
218     _condVar.notify_all();
219 }
220 
waitForBalancerToStop()221 void Balancer::waitForBalancerToStop() {
222     {
223         stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
224         if (_state == kStopped)
225             return;
226 
227         invariant(_state == kStopping);
228         invariant(_thread.joinable());
229     }
230 
231     _thread.join();
232 
233     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
234     _state = kStopped;
235     _thread = {};
236 
237     LOG(1) << "Balancer thread terminated";
238 }
239 
joinCurrentRound(OperationContext * opCtx)240 void Balancer::joinCurrentRound(OperationContext* opCtx) {
241     stdx::unique_lock<stdx::mutex> scopedLock(_mutex);
242     const auto numRoundsAtStart = _numBalancerRounds;
243     opCtx->waitForConditionOrInterrupt(_condVar, scopedLock, [&] {
244         return !_inBalancerRound || _numBalancerRounds != numRoundsAtStart;
245     });
246 }
247 
rebalanceSingleChunk(OperationContext * opCtx,const ChunkType & chunk)248 Status Balancer::rebalanceSingleChunk(OperationContext* opCtx, const ChunkType& chunk) {
249     auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(opCtx, chunk);
250     if (!migrateStatus.isOK()) {
251         return migrateStatus.getStatus();
252     }
253 
254     auto migrateInfo = std::move(migrateStatus.getValue());
255     if (!migrateInfo) {
256         LOG(1) << "Unable to find more appropriate location for chunk " << redact(chunk.toString());
257         return Status::OK();
258     }
259 
260     auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
261     Status refreshStatus = balancerConfig->refreshAndCheck(opCtx);
262     if (!refreshStatus.isOK()) {
263         return refreshStatus;
264     }
265 
266     return _migrationManager.executeManualMigration(opCtx,
267                                                     *migrateInfo,
268                                                     balancerConfig->getMaxChunkSizeBytes(),
269                                                     balancerConfig->getSecondaryThrottle(),
270                                                     balancerConfig->waitForDelete());
271 }
272 
moveSingleChunk(OperationContext * opCtx,const ChunkType & chunk,const ShardId & newShardId,uint64_t maxChunkSizeBytes,const MigrationSecondaryThrottleOptions & secondaryThrottle,bool waitForDelete)273 Status Balancer::moveSingleChunk(OperationContext* opCtx,
274                                  const ChunkType& chunk,
275                                  const ShardId& newShardId,
276                                  uint64_t maxChunkSizeBytes,
277                                  const MigrationSecondaryThrottleOptions& secondaryThrottle,
278                                  bool waitForDelete) {
279     auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(opCtx, chunk, newShardId);
280     if (!moveAllowedStatus.isOK()) {
281         return moveAllowedStatus;
282     }
283 
284     return _migrationManager.executeManualMigration(
285         opCtx, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete);
286 }
287 
report(OperationContext * opCtx,BSONObjBuilder * builder)288 void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) {
289     auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
290     balancerConfig->refreshAndCheck(opCtx).transitional_ignore();
291 
292     const auto mode = balancerConfig->getBalancerMode();
293 
294     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
295     builder->append("mode", BalancerSettingsType::kBalancerModes[mode]);
296     builder->append("inBalancerRound", _inBalancerRound);
297     builder->append("numBalancerRounds", _numBalancerRounds);
298 }
299 
_mainThread()300 void Balancer::_mainThread() {
301     Client::initThread("Balancer");
302     auto opCtx = cc().makeOperationContext();
303     auto shardingContext = Grid::get(opCtx.get());
304 
305     log() << "CSRS balancer is starting";
306 
307     {
308         stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
309         _threadOperationContext = opCtx.get();
310     }
311 
312     const Seconds kInitBackoffInterval(10);
313 
314     auto balancerConfig = shardingContext->getBalancerConfiguration();
315     while (!_stopRequested()) {
316         Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get());
317         if (!refreshStatus.isOK()) {
318             warning() << "Balancer settings could not be loaded and will be retried in "
319                       << durationCount<Seconds>(kInitBackoffInterval) << " seconds"
320                       << causedBy(refreshStatus);
321 
322             _sleepFor(opCtx.get(), kInitBackoffInterval);
323             continue;
324         }
325 
326         break;
327     }
328 
329     log() << "CSRS balancer thread is recovering";
330 
331     _migrationManager.finishRecovery(opCtx.get(),
332                                      balancerConfig->getMaxChunkSizeBytes(),
333                                      balancerConfig->getSecondaryThrottle());
334 
335     log() << "CSRS balancer thread is recovered";
336 
337     // Main balancer loop
338     while (!_stopRequested()) {
339         BalanceRoundDetails roundDetails;
340 
341         _beginRound(opCtx.get());
342 
343         try {
344             shardingContext->shardRegistry()->reload(opCtx.get());
345 
346             uassert(13258, "oids broken after resetting!", _checkOIDs(opCtx.get()));
347 
348             Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get());
349             if (!refreshStatus.isOK()) {
350                 warning() << "Skipping balancing round" << causedBy(refreshStatus);
351                 _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
352                 continue;
353             }
354 
355             if (!balancerConfig->shouldBalance()) {
356                 LOG(1) << "Skipping balancing round because balancing is disabled";
357                 _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
358                 continue;
359             }
360 
361             {
362                 LOG(1) << "*** start balancing round. "
363                        << "waitForDelete: " << balancerConfig->waitForDelete()
364                        << ", secondaryThrottle: "
365                        << balancerConfig->getSecondaryThrottle().toBSON();
366 
367                 OCCASIONALLY warnOnMultiVersion(
368                     uassertStatusOK(_clusterStats->getStats(opCtx.get())));
369 
370                 Status status = _splitChunksIfNeeded(opCtx.get());
371                 if (!status.isOK()) {
372                     warning() << "Failed to split chunks" << causedBy(status);
373                 } else {
374                     LOG(1) << "Done enforcing tag range boundaries.";
375                 }
376 
377                 const auto candidateChunks = uassertStatusOK(
378                     _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), _balancedLastTime));
379 
380                 if (candidateChunks.empty()) {
381                     LOG(1) << "no need to move any chunk";
382                     _balancedLastTime = false;
383                 } else {
384                     _balancedLastTime = _moveChunks(opCtx.get(), candidateChunks);
385 
386                     roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()),
387                                               _balancedLastTime);
388 
389                     shardingContext->catalogClient()
390                         ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON())
391                         .ignore();
392                 }
393 
394                 LOG(1) << "*** End of balancing round";
395             }
396 
397 
398             auto balancerInterval = [&]() -> Milliseconds {
399                 MONGO_FAIL_POINT_BLOCK(overrideBalanceRoundInterval, data) {
400                     int interval = data.getData()["intervalMs"].numberInt();
401                     log() << "overrideBalanceRoundInterval: using shorter balancing interval: "
402                           << interval << "ms";
403 
404                     return Milliseconds(interval);
405                 }
406 
407                 return _balancedLastTime ? kShortBalanceRoundInterval
408                                          : kBalanceRoundDefaultInterval;
409             }();
410 
411             _endRound(opCtx.get(), balancerInterval);
412         } catch (const std::exception& e) {
413             log() << "caught exception while doing balance: " << e.what();
414 
415             // Just to match the opening statement if in log level 1
416             LOG(1) << "*** End of balancing round";
417 
418             // This round failed, tell the world!
419             roundDetails.setFailed(e.what());
420 
421             shardingContext->catalogClient()
422                 ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON())
423                 .transitional_ignore();
424 
425             // Sleep a fair amount before retrying because of the error
426             _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
427         }
428     }
429 
430     {
431         stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
432         invariant(_state == kStopping);
433         invariant(_migrationManagerInterruptThread.joinable());
434     }
435 
436     _migrationManagerInterruptThread.join();
437     _migrationManager.drainActiveMigrations();
438 
439     {
440         stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
441         _migrationManagerInterruptThread = {};
442         _threadOperationContext = nullptr;
443     }
444 
445     log() << "CSRS balancer is now stopped";
446 }
447 
_stopRequested()448 bool Balancer::_stopRequested() {
449     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
450     return (_state != kRunning);
451 }
452 
_beginRound(OperationContext * opCtx)453 void Balancer::_beginRound(OperationContext* opCtx) {
454     stdx::unique_lock<stdx::mutex> lock(_mutex);
455     _inBalancerRound = true;
456     _condVar.notify_all();
457 }
458 
_endRound(OperationContext * opCtx,Milliseconds waitTimeout)459 void Balancer::_endRound(OperationContext* opCtx, Milliseconds waitTimeout) {
460     {
461         stdx::lock_guard<stdx::mutex> lock(_mutex);
462         _inBalancerRound = false;
463         _numBalancerRounds++;
464         _condVar.notify_all();
465     }
466 
467     MONGO_IDLE_THREAD_BLOCK;
468     _sleepFor(opCtx, waitTimeout);
469 }
470 
_sleepFor(OperationContext * opCtx,Milliseconds waitTimeout)471 void Balancer::_sleepFor(OperationContext* opCtx, Milliseconds waitTimeout) {
472     stdx::unique_lock<stdx::mutex> lock(_mutex);
473     _condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; });
474 }
475 
_checkOIDs(OperationContext * opCtx)476 bool Balancer::_checkOIDs(OperationContext* opCtx) {
477     auto shardingContext = Grid::get(opCtx);
478 
479     vector<ShardId> all;
480     shardingContext->shardRegistry()->getAllShardIds(&all);
481 
482     // map of OID machine ID => shardId
483     map<int, ShardId> oids;
484 
485     for (const ShardId& shardId : all) {
486         if (_stopRequested()) {
487             return false;
488         }
489 
490         auto shardStatus = shardingContext->shardRegistry()->getShard(opCtx, shardId);
491         if (!shardStatus.isOK()) {
492             continue;
493         }
494         const auto s = shardStatus.getValue();
495 
496         auto result = uassertStatusOK(
497             s->runCommandWithFixedRetryAttempts(opCtx,
498                                                 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
499                                                 "admin",
500                                                 BSON("features" << 1),
501                                                 Shard::RetryPolicy::kIdempotent));
502         uassertStatusOK(result.commandStatus);
503         BSONObj f = std::move(result.response);
504 
505         if (f["oidMachine"].isNumber()) {
506             int x = f["oidMachine"].numberInt();
507             if (oids.count(x) == 0) {
508                 oids[x] = shardId;
509             } else {
510                 log() << "error: 2 machines have " << x << " as oid machine piece: " << shardId
511                       << " and " << oids[x];
512 
513                 result = uassertStatusOK(s->runCommandWithFixedRetryAttempts(
514                     opCtx,
515                     ReadPreferenceSetting{ReadPreference::PrimaryOnly},
516                     "admin",
517                     BSON("features" << 1 << "oidReset" << 1),
518                     Shard::RetryPolicy::kIdempotent));
519                 uassertStatusOK(result.commandStatus);
520 
521                 auto otherShardStatus = shardingContext->shardRegistry()->getShard(opCtx, oids[x]);
522                 if (otherShardStatus.isOK()) {
523                     result = uassertStatusOK(
524                         otherShardStatus.getValue()->runCommandWithFixedRetryAttempts(
525                             opCtx,
526                             ReadPreferenceSetting{ReadPreference::PrimaryOnly},
527                             "admin",
528                             BSON("features" << 1 << "oidReset" << 1),
529                             Shard::RetryPolicy::kIdempotent));
530                     uassertStatusOK(result.commandStatus);
531                 }
532 
533                 return false;
534             }
535         } else {
536             log() << "warning: oidMachine not set on: " << s->toString();
537         }
538     }
539 
540     return true;
541 }
542 
_splitChunksIfNeeded(OperationContext * opCtx)543 Status Balancer::_splitChunksIfNeeded(OperationContext* opCtx) {
544     auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(opCtx);
545     if (!chunksToSplitStatus.isOK()) {
546         return chunksToSplitStatus.getStatus();
547     }
548 
549     for (const auto& splitInfo : chunksToSplitStatus.getValue()) {
550         auto routingInfoStatus =
551             Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
552                 opCtx, splitInfo.nss);
553         if (!routingInfoStatus.isOK()) {
554             return routingInfoStatus.getStatus();
555         }
556 
557         auto cm = routingInfoStatus.getValue().cm();
558 
559         auto splitStatus =
560             shardutil::splitChunkAtMultiplePoints(opCtx,
561                                                   splitInfo.shardId,
562                                                   splitInfo.nss,
563                                                   cm->getShardKeyPattern(),
564                                                   splitInfo.collectionVersion,
565                                                   ChunkRange(splitInfo.minKey, splitInfo.maxKey),
566                                                   splitInfo.splitKeys);
567         if (!splitStatus.isOK()) {
568             warning() << "Failed to split chunk " << redact(splitInfo.toString())
569                       << causedBy(redact(splitStatus.getStatus()));
570         }
571     }
572 
573     return Status::OK();
574 }
575 
_moveChunks(OperationContext * opCtx,const BalancerChunkSelectionPolicy::MigrateInfoVector & candidateChunks)576 int Balancer::_moveChunks(OperationContext* opCtx,
577                           const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks) {
578     auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
579 
580     // If the balancer was disabled since we started this round, don't start new chunk moves
581     if (_stopRequested() || !balancerConfig->shouldBalance()) {
582         LOG(1) << "Skipping balancing round because balancer was stopped";
583         return 0;
584     }
585 
586     auto migrationStatuses =
587         _migrationManager.executeMigrationsForAutoBalance(opCtx,
588                                                           candidateChunks,
589                                                           balancerConfig->getMaxChunkSizeBytes(),
590                                                           balancerConfig->getSecondaryThrottle(),
591                                                           balancerConfig->waitForDelete());
592 
593     int numChunksProcessed = 0;
594 
595     for (const auto& migrationStatusEntry : migrationStatuses) {
596         const Status& status = migrationStatusEntry.second;
597         if (status.isOK()) {
598             numChunksProcessed++;
599             continue;
600         }
601 
602         const MigrationIdentifier& migrationId = migrationStatusEntry.first;
603 
604         const auto requestIt = std::find_if(candidateChunks.begin(),
605                                             candidateChunks.end(),
606                                             [&migrationId](const MigrateInfo& migrateInfo) {
607                                                 return migrateInfo.getName() == migrationId;
608                                             });
609         invariant(requestIt != candidateChunks.end());
610 
611         if (status == ErrorCodes::ChunkTooBig) {
612             numChunksProcessed++;
613 
614             log() << "Performing a split because migration " << redact(requestIt->toString())
615                   << " failed for size reasons" << causedBy(redact(status));
616 
617             _splitOrMarkJumbo(opCtx, NamespaceString(requestIt->ns), requestIt->minKey);
618             continue;
619         }
620 
621         log() << "Balancer move " << redact(requestIt->toString()) << " failed"
622               << causedBy(redact(status));
623     }
624 
625     return numChunksProcessed;
626 }
627 
_splitOrMarkJumbo(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & minKey)628 void Balancer::_splitOrMarkJumbo(OperationContext* opCtx,
629                                  const NamespaceString& nss,
630                                  const BSONObj& minKey) {
631     auto routingInfo = uassertStatusOK(
632         Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss));
633     const auto cm = routingInfo.cm().get();
634 
635     const auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey);
636 
637     try {
638         const auto splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
639             opCtx,
640             chunk->getShardId(),
641             nss,
642             cm->getShardKeyPattern(),
643             ChunkRange(chunk->getMin(), chunk->getMax()),
644             Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
645             boost::none));
646 
647         uassert(ErrorCodes::CannotSplit, "No split points found", !splitPoints.empty());
648 
649         uassertStatusOK(
650             shardutil::splitChunkAtMultiplePoints(opCtx,
651                                                   chunk->getShardId(),
652                                                   nss,
653                                                   cm->getShardKeyPattern(),
654                                                   cm->getVersion(),
655                                                   ChunkRange(chunk->getMin(), chunk->getMax()),
656                                                   splitPoints));
657     } catch (const DBException&) {
658         log() << "Marking chunk " << redact(chunk->toString()) << " as jumbo.";
659 
660         chunk->markAsJumbo();
661 
662         const std::string chunkName = ChunkType::genID(nss.ns(), chunk->getMin());
663 
664         auto status = Grid::get(opCtx)->catalogClient()->updateConfigDocument(
665             opCtx,
666             ChunkType::ConfigNS,
667             BSON(ChunkType::name(chunkName)),
668             BSON("$set" << BSON(ChunkType::jumbo(true))),
669             false,
670             ShardingCatalogClient::kMajorityWriteConcern);
671         if (!status.isOK()) {
672             log() << "Couldn't set jumbo for chunk: " << redact(chunkName)
673                   << causedBy(redact(status.getStatus()));
674         }
675     }
676 }
677 
678 }  // namespace mongo
679