1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/s/chunk_splitter.h"
36
37 #include "mongo/client/dbclientcursor.h"
38 #include "mongo/client/query.h"
39 #include "mongo/db/client.h"
40 #include "mongo/db/dbdirectclient.h"
41 #include "mongo/db/namespace_string.h"
42 #include "mongo/db/operation_context.h"
43 #include "mongo/db/s/sharding_state.h"
44 #include "mongo/db/s/split_chunk.h"
45 #include "mongo/db/s/split_vector.h"
46 #include "mongo/s/balancer_configuration.h"
47 #include "mongo/s/catalog/sharding_catalog_client.h"
48 #include "mongo/s/catalog/type_chunk.h"
49 #include "mongo/s/catalog_cache.h"
50 #include "mongo/s/chunk_manager.h"
51 #include "mongo/s/config_server_client.h"
52 #include "mongo/s/grid.h"
53 #include "mongo/s/shard_key_pattern.h"
54 #include "mongo/util/assert_util.h"
55 #include "mongo/util/log.h"
56
57 namespace mongo {
58 namespace {
59
60 /**
61 * Constructs the default options for the thread pool used to schedule splits.
62 */
makeDefaultThreadPoolOptions()63 ThreadPool::Options makeDefaultThreadPoolOptions() {
64 ThreadPool::Options options;
65 options.poolName = "ChunkSplitter";
66 options.minThreads = 0;
67 options.maxThreads = 20;
68
69 // Ensure all threads have a client
70 options.onCreateThread = [](const std::string& threadName) {
71 Client::initThread(threadName.c_str());
72 };
73 return options;
74 }
75
76 /**
77 * Attempts to split the chunk described by min/maxKey at the split points provided.
78 */
splitChunkAtMultiplePoints(OperationContext * opCtx,const ShardId & shardId,const NamespaceString & nss,const ShardKeyPattern & shardKeyPattern,const ChunkVersion & collectionVersion,const ChunkRange & chunkRange,const std::vector<BSONObj> & splitPoints)79 Status splitChunkAtMultiplePoints(OperationContext* opCtx,
80 const ShardId& shardId,
81 const NamespaceString& nss,
82 const ShardKeyPattern& shardKeyPattern,
83 const ChunkVersion& collectionVersion,
84 const ChunkRange& chunkRange,
85 const std::vector<BSONObj>& splitPoints) {
86 invariant(!splitPoints.empty());
87
88 const size_t kMaxSplitPoints = 8192;
89
90 if (splitPoints.size() > kMaxSplitPoints) {
91 return {ErrorCodes::BadValue,
92 str::stream() << "Cannot split chunk in more than " << kMaxSplitPoints
93 << " parts at a time."};
94 }
95
96 const auto status = splitChunk(opCtx,
97 nss,
98 shardKeyPattern.toBSON(),
99 chunkRange,
100 splitPoints,
101 shardId.toString(),
102 collectionVersion.epoch());
103
104 if (!status.isOK()) {
105 return {status.getStatus().code(),
106 str::stream() << "split failed due to " << status.getStatus().reason()};
107 }
108
109 return status.getStatus();
110 }
111
112 /**
113 * Attempts to move the chunk specified by minKey away from its current shard.
114 */
moveChunk(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & minKey)115 void moveChunk(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) {
116 // We need to have the most up-to-date view of the chunk we are about to move.
117 const auto routingInfo =
118 uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
119
120 uassert(ErrorCodes::NamespaceNotSharded,
121 "Could not move chunk. Collection is no longer sharded",
122 routingInfo.cm());
123
124 const auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(minKey);
125
126 ChunkType chunkToMove;
127 chunkToMove.setNS(nss.ns());
128 chunkToMove.setShard(suggestedChunk->getShardId());
129 chunkToMove.setMin(suggestedChunk->getMin());
130 chunkToMove.setMax(suggestedChunk->getMax());
131 chunkToMove.setVersion(suggestedChunk->getLastmod());
132
133 uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove));
134 }
135
136 /**
137 * Returns the split point that will result in one of the chunks having exactly one document. Also
138 * returns an empty document if the split point cannot be determined.
139 *
140 * doSplitAtLower - determines which side of the split will have exactly one document. True means
141 * that the split point chosen will be closer to the lower bound.
142 *
143 * NOTE: this assumes that the shard key is not "special"- that is, the shardKeyPattern is simply an
144 * ordered list of ascending/descending field names. For example {a : 1, b : -1} is not special, but
145 * {a : "hashed"} is.
146 */
findExtremeKeyForShard(OperationContext * opCtx,const NamespaceString & nss,const ShardKeyPattern & shardKeyPattern,bool doSplitAtLower)147 BSONObj findExtremeKeyForShard(OperationContext* opCtx,
148 const NamespaceString& nss,
149 const ShardKeyPattern& shardKeyPattern,
150 bool doSplitAtLower) {
151 Query q;
152
153 if (doSplitAtLower) {
154 q.sort(shardKeyPattern.toBSON());
155 } else {
156 // need to invert shard key pattern to sort backwards
157 BSONObjBuilder r;
158
159 BSONObjIterator i(shardKeyPattern.toBSON());
160 while (i.more()) {
161 BSONElement e = i.next();
162 uassert(40617, "can only handle numbers here - which i think is correct", e.isNumber());
163 r.append(e.fieldName(), -1 * e.number());
164 }
165
166 q.sort(r.obj());
167 }
168
169 DBDirectClient client(opCtx);
170
171 BSONObj end;
172
173 if (doSplitAtLower) {
174 // Splitting close to the lower bound means that the split point will be the
175 // upper bound. Chunk range upper bounds are exclusive so skip a document to
176 // make the lower half of the split end up with a single document.
177 std::unique_ptr<DBClientCursor> cursor = client.query(nss.ns(),
178 q,
179 1, /* nToReturn */
180 1 /* nToSkip */);
181
182 uassert(40618,
183 str::stream() << "failed to initialize cursor during auto split due to "
184 << "connection problem with "
185 << client.getServerAddress(),
186 cursor.get() != nullptr);
187
188 if (cursor->more()) {
189 end = cursor->next().getOwned();
190 }
191 } else {
192 end = client.findOne(nss.ns(), q);
193 }
194
195 if (end.isEmpty()) {
196 return BSONObj();
197 }
198
199 return shardKeyPattern.extractShardKeyFromDoc(end);
200 }
201
202 /**
203 * Checks if autobalance is enabled on the current sharded collection.
204 */
isAutoBalanceEnabled(OperationContext * opCtx,const NamespaceString & nss,BalancerConfiguration * balancerConfig)205 bool isAutoBalanceEnabled(OperationContext* opCtx,
206 const NamespaceString& nss,
207 BalancerConfiguration* balancerConfig) {
208 if (!balancerConfig->shouldBalanceForAutoSplit())
209 return false;
210
211 auto collStatus = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss.ns());
212 if (!collStatus.isOK()) {
213 log() << "Auto-split for " << nss << " failed to load collection metadata"
214 << causedBy(redact(collStatus.getStatus()));
215 return false;
216 }
217
218 return collStatus.getValue().value.getAllowBalance();
219 }
220
221 } // namespace
222
ChunkSplitter()223 ChunkSplitter::ChunkSplitter() : _isPrimary(false), _threadPool(makeDefaultThreadPoolOptions()) {
224 _threadPool.startup();
225 }
226
~ChunkSplitter()227 ChunkSplitter::~ChunkSplitter() {
228 _threadPool.shutdown();
229 _threadPool.join();
230 }
231
setReplicaSetMode(bool isPrimary)232 void ChunkSplitter::setReplicaSetMode(bool isPrimary) {
233 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
234 _isPrimary = isPrimary;
235 }
236
initiateChunkSplitter()237 void ChunkSplitter::initiateChunkSplitter() {
238 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
239 if (_isPrimary) {
240 return;
241 }
242 _isPrimary = true;
243
244 // log() << "The ChunkSplitter has started and will accept autosplit tasks.";
245 // TODO: Re-enable this log line when auto split is actively running on shards.
246 }
247
interruptChunkSplitter()248 void ChunkSplitter::interruptChunkSplitter() {
249 stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
250 if (!_isPrimary) {
251 return;
252 }
253 _isPrimary = false;
254
255 // log() << "The ChunkSplitter has stopped and will no longer run new autosplit tasks. Any "
256 // << "autosplit tasks that have already started will be allowed to finish.";
257 // TODO: Re-enable this log when auto split is actively running on shards.
258 }
259
trySplitting(const NamespaceString & nss,const BSONObj & min,const BSONObj & max,long dataWritten)260 void ChunkSplitter::trySplitting(const NamespaceString& nss,
261 const BSONObj& min,
262 const BSONObj& max,
263 long dataWritten) {
264 if (!_isPrimary) {
265 return;
266 }
267 uassertStatusOK(_threadPool.schedule([ this, nss, min, max, dataWritten ]() noexcept {
268 _runAutosplit(nss, min, max, dataWritten);
269 }));
270 }
271
_runAutosplit(const NamespaceString & nss,const BSONObj & min,const BSONObj & max,long dataWritten)272 void ChunkSplitter::_runAutosplit(const NamespaceString& nss,
273 const BSONObj& min,
274 const BSONObj& max,
275 long dataWritten) {
276 if (!_isPrimary) {
277 return;
278 }
279
280 try {
281 const auto opCtx = cc().makeOperationContext();
282 const auto routingInfo = uassertStatusOK(
283 Grid::get(opCtx.get())->catalogCache()->getCollectionRoutingInfo(opCtx.get(), nss));
284
285 uassert(ErrorCodes::NamespaceNotSharded,
286 "Could not split chunk. Collection is no longer sharded",
287 routingInfo.cm());
288
289 const auto cm = routingInfo.cm();
290 const auto chunk = cm->findIntersectingChunkWithSimpleCollation(min);
291
292 // Stop if chunk's range differs from the range we were expecting to split.
293 if ((0 != chunk->getMin().woCompare(min)) || (0 != chunk->getMax().woCompare(max)) ||
294 (chunk->getShardId() != ShardingState::get(opCtx.get())->getShardName())) {
295 LOG(1) << "Cannot auto-split chunk with range '"
296 << redact(ChunkRange(min, max).toString()) << "' for nss '" << nss
297 << "' on shard '" << ShardingState::get(opCtx.get())->getShardName()
298 << "' because since scheduling auto-split the chunk has been changed to '"
299 << redact(chunk->toString()) << "'";
300 return;
301 }
302
303 const ChunkRange chunkRange(chunk->getMin(), chunk->getMax());
304
305 const auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration();
306 // Ensure we have the most up-to-date balancer configuration
307 uassertStatusOK(balancerConfig->refreshAndCheck(opCtx.get()));
308
309 if (!balancerConfig->getShouldAutoSplit()) {
310 return;
311 }
312
313 const uint64_t maxChunkSizeBytes = balancerConfig->getMaxChunkSizeBytes();
314
315 LOG(1) << "about to initiate autosplit: " << redact(chunk->toString())
316 << " dataWritten since last check: " << dataWritten
317 << " maxChunkSizeBytes: " << maxChunkSizeBytes;
318
319 auto splitPoints = uassertStatusOK(splitVector(opCtx.get(),
320 nss,
321 cm->getShardKeyPattern().toBSON(),
322 chunk->getMin(),
323 chunk->getMax(),
324 false,
325 boost::none,
326 boost::none,
327 boost::none,
328 maxChunkSizeBytes));
329
330 if (splitPoints.size() <= 1) {
331 // No split points means there isn't enough data to split on; 1 split point means we
332 // have between half the chunk size to full chunk size so there is no need to split yet
333 return;
334 }
335
336 // We assume that if the chunk being split is the first (or last) one on the collection,
337 // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use the
338 // very first (or last) key as a split point.
339 //
340 // This heuristic is skipped for "special" shard key patterns that are not likely to produce
341 // monotonically increasing or decreasing values (e.g. hashed shard keys).
342
343 // Keeps track of the minKey of the top chunk after the split so we can migrate the chunk.
344 BSONObj topChunkMinKey;
345
346 if (KeyPattern::isOrderedKeyPattern(cm->getShardKeyPattern().toBSON())) {
347 if (0 ==
348 cm->getShardKeyPattern().getKeyPattern().globalMin().woCompare(chunk->getMin())) {
349 // MinKey is infinity (This is the first chunk on the collection)
350 BSONObj key =
351 findExtremeKeyForShard(opCtx.get(), nss, cm->getShardKeyPattern(), true);
352 if (!key.isEmpty()) {
353 splitPoints.front() = key.getOwned();
354 topChunkMinKey = cm->getShardKeyPattern().getKeyPattern().globalMin();
355 }
356 } else if (0 ==
357 cm->getShardKeyPattern().getKeyPattern().globalMax().woCompare(
358 chunk->getMax())) {
359 // MaxKey is infinity (This is the last chunk on the collection)
360 BSONObj key =
361 findExtremeKeyForShard(opCtx.get(), nss, cm->getShardKeyPattern(), false);
362 if (!key.isEmpty()) {
363 splitPoints.back() = key.getOwned();
364 topChunkMinKey = key.getOwned();
365 }
366 }
367 }
368
369 uassertStatusOK(splitChunkAtMultiplePoints(opCtx.get(),
370 chunk->getShardId(),
371 nss,
372 cm->getShardKeyPattern(),
373 cm->getVersion(),
374 chunkRange,
375 splitPoints));
376
377 const bool shouldBalance = isAutoBalanceEnabled(opCtx.get(), nss, balancerConfig);
378
379 log() << "autosplitted " << nss << " chunk: " << redact(chunk->toString()) << " into "
380 << (splitPoints.size() + 1) << " parts (maxChunkSizeBytes " << maxChunkSizeBytes
381 << ")"
382 << (topChunkMinKey.isEmpty() ? "" : " (top chunk migration suggested" +
383 (std::string)(shouldBalance ? ")" : ", but no migrations allowed)"));
384
385 // Balance the resulting chunks if the autobalance option is enabled and if we split at the
386 // first or last chunk on the collection as part of top chunk optimization.
387
388 if (!shouldBalance || topChunkMinKey.isEmpty()) {
389 return;
390 }
391
392 // Tries to move the top chunk out of the shard to prevent the hot spot from staying on a
393 // single shard. This is based on the assumption that succeeding inserts will fall on the
394 // top chunk.
395 moveChunk(opCtx.get(), nss, topChunkMinKey);
396 } catch (const DBException& ex) {
397 log() << "Unable to auto-split chunk " << redact(ChunkRange(min, max).toString())
398 << " in nss " << nss << causedBy(redact(ex.toStatus()));
399 } catch (const std::exception& e) {
400 log() << "caught exception while splitting chunk: " << redact(e.what());
401 }
402 }
403
404 } // namespace mongo
405