1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/s/chunk_splitter.h"
36 
37 #include "mongo/client/dbclientcursor.h"
38 #include "mongo/client/query.h"
39 #include "mongo/db/client.h"
40 #include "mongo/db/dbdirectclient.h"
41 #include "mongo/db/namespace_string.h"
42 #include "mongo/db/operation_context.h"
43 #include "mongo/db/s/sharding_state.h"
44 #include "mongo/db/s/split_chunk.h"
45 #include "mongo/db/s/split_vector.h"
46 #include "mongo/s/balancer_configuration.h"
47 #include "mongo/s/catalog/sharding_catalog_client.h"
48 #include "mongo/s/catalog/type_chunk.h"
49 #include "mongo/s/catalog_cache.h"
50 #include "mongo/s/chunk_manager.h"
51 #include "mongo/s/config_server_client.h"
52 #include "mongo/s/grid.h"
53 #include "mongo/s/shard_key_pattern.h"
54 #include "mongo/util/assert_util.h"
55 #include "mongo/util/log.h"
56 
57 namespace mongo {
58 namespace {
59 
60 /**
61  * Constructs the default options for the thread pool used to schedule splits.
62  */
makeDefaultThreadPoolOptions()63 ThreadPool::Options makeDefaultThreadPoolOptions() {
64     ThreadPool::Options options;
65     options.poolName = "ChunkSplitter";
66     options.minThreads = 0;
67     options.maxThreads = 20;
68 
69     // Ensure all threads have a client
70     options.onCreateThread = [](const std::string& threadName) {
71         Client::initThread(threadName.c_str());
72     };
73     return options;
74 }
75 
76 /**
77  * Attempts to split the chunk described by min/maxKey at the split points provided.
78  */
splitChunkAtMultiplePoints(OperationContext * opCtx,const ShardId & shardId,const NamespaceString & nss,const ShardKeyPattern & shardKeyPattern,const ChunkVersion & collectionVersion,const ChunkRange & chunkRange,const std::vector<BSONObj> & splitPoints)79 Status splitChunkAtMultiplePoints(OperationContext* opCtx,
80                                   const ShardId& shardId,
81                                   const NamespaceString& nss,
82                                   const ShardKeyPattern& shardKeyPattern,
83                                   const ChunkVersion& collectionVersion,
84                                   const ChunkRange& chunkRange,
85                                   const std::vector<BSONObj>& splitPoints) {
86     invariant(!splitPoints.empty());
87 
88     const size_t kMaxSplitPoints = 8192;
89 
90     if (splitPoints.size() > kMaxSplitPoints) {
91         return {ErrorCodes::BadValue,
92                 str::stream() << "Cannot split chunk in more than " << kMaxSplitPoints
93                               << " parts at a time."};
94     }
95 
96     const auto status = splitChunk(opCtx,
97                                    nss,
98                                    shardKeyPattern.toBSON(),
99                                    chunkRange,
100                                    splitPoints,
101                                    shardId.toString(),
102                                    collectionVersion.epoch());
103 
104     if (!status.isOK()) {
105         return {status.getStatus().code(),
106                 str::stream() << "split failed due to " << status.getStatus().reason()};
107     }
108 
109     return status.getStatus();
110 }
111 
112 /**
113  * Attempts to move the chunk specified by minKey away from its current shard.
114  */
moveChunk(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & minKey)115 void moveChunk(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) {
116     // We need to have the most up-to-date view of the chunk we are about to move.
117     const auto routingInfo =
118         uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
119 
120     uassert(ErrorCodes::NamespaceNotSharded,
121             "Could not move chunk. Collection is no longer sharded",
122             routingInfo.cm());
123 
124     const auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(minKey);
125 
126     ChunkType chunkToMove;
127     chunkToMove.setNS(nss.ns());
128     chunkToMove.setShard(suggestedChunk->getShardId());
129     chunkToMove.setMin(suggestedChunk->getMin());
130     chunkToMove.setMax(suggestedChunk->getMax());
131     chunkToMove.setVersion(suggestedChunk->getLastmod());
132 
133     uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove));
134 }
135 
136 /**
137  * Returns the split point that will result in one of the chunks having exactly one document. Also
138  * returns an empty document if the split point cannot be determined.
139  *
140  * doSplitAtLower - determines which side of the split will have exactly one document. True means
141  * that the split point chosen will be closer to the lower bound.
142  *
143  * NOTE: this assumes that the shard key is not "special"- that is, the shardKeyPattern is simply an
144  * ordered list of ascending/descending field names. For example {a : 1, b : -1} is not special, but
145  * {a : "hashed"} is.
146  */
findExtremeKeyForShard(OperationContext * opCtx,const NamespaceString & nss,const ShardKeyPattern & shardKeyPattern,bool doSplitAtLower)147 BSONObj findExtremeKeyForShard(OperationContext* opCtx,
148                                const NamespaceString& nss,
149                                const ShardKeyPattern& shardKeyPattern,
150                                bool doSplitAtLower) {
151     Query q;
152 
153     if (doSplitAtLower) {
154         q.sort(shardKeyPattern.toBSON());
155     } else {
156         // need to invert shard key pattern to sort backwards
157         BSONObjBuilder r;
158 
159         BSONObjIterator i(shardKeyPattern.toBSON());
160         while (i.more()) {
161             BSONElement e = i.next();
162             uassert(40617, "can only handle numbers here - which i think is correct", e.isNumber());
163             r.append(e.fieldName(), -1 * e.number());
164         }
165 
166         q.sort(r.obj());
167     }
168 
169     DBDirectClient client(opCtx);
170 
171     BSONObj end;
172 
173     if (doSplitAtLower) {
174         // Splitting close to the lower bound means that the split point will be the
175         // upper bound. Chunk range upper bounds are exclusive so skip a document to
176         // make the lower half of the split end up with a single document.
177         std::unique_ptr<DBClientCursor> cursor = client.query(nss.ns(),
178                                                               q,
179                                                               1, /* nToReturn */
180                                                               1 /* nToSkip */);
181 
182         uassert(40618,
183                 str::stream() << "failed to initialize cursor during auto split due to "
184                               << "connection problem with "
185                               << client.getServerAddress(),
186                 cursor.get() != nullptr);
187 
188         if (cursor->more()) {
189             end = cursor->next().getOwned();
190         }
191     } else {
192         end = client.findOne(nss.ns(), q);
193     }
194 
195     if (end.isEmpty()) {
196         return BSONObj();
197     }
198 
199     return shardKeyPattern.extractShardKeyFromDoc(end);
200 }
201 
202 /**
203  * Checks if autobalance is enabled on the current sharded collection.
204  */
isAutoBalanceEnabled(OperationContext * opCtx,const NamespaceString & nss,BalancerConfiguration * balancerConfig)205 bool isAutoBalanceEnabled(OperationContext* opCtx,
206                           const NamespaceString& nss,
207                           BalancerConfiguration* balancerConfig) {
208     if (!balancerConfig->shouldBalanceForAutoSplit())
209         return false;
210 
211     auto collStatus = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss.ns());
212     if (!collStatus.isOK()) {
213         log() << "Auto-split for " << nss << " failed to load collection metadata"
214               << causedBy(redact(collStatus.getStatus()));
215         return false;
216     }
217 
218     return collStatus.getValue().value.getAllowBalance();
219 }
220 
221 }  // namespace
222 
ChunkSplitter()223 ChunkSplitter::ChunkSplitter() : _isPrimary(false), _threadPool(makeDefaultThreadPoolOptions()) {
224     _threadPool.startup();
225 }
226 
~ChunkSplitter()227 ChunkSplitter::~ChunkSplitter() {
228     _threadPool.shutdown();
229     _threadPool.join();
230 }
231 
setReplicaSetMode(bool isPrimary)232 void ChunkSplitter::setReplicaSetMode(bool isPrimary) {
233     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
234     _isPrimary = isPrimary;
235 }
236 
initiateChunkSplitter()237 void ChunkSplitter::initiateChunkSplitter() {
238     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
239     if (_isPrimary) {
240         return;
241     }
242     _isPrimary = true;
243 
244     // log() << "The ChunkSplitter has started and will accept autosplit tasks.";
245     // TODO: Re-enable this log line when auto split is actively running on shards.
246 }
247 
interruptChunkSplitter()248 void ChunkSplitter::interruptChunkSplitter() {
249     stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
250     if (!_isPrimary) {
251         return;
252     }
253     _isPrimary = false;
254 
255     // log() << "The ChunkSplitter has stopped and will no longer run new autosplit tasks. Any "
256     //       << "autosplit tasks that have already started will be allowed to finish.";
257     // TODO: Re-enable this log when auto split is actively running on shards.
258 }
259 
trySplitting(const NamespaceString & nss,const BSONObj & min,const BSONObj & max,long dataWritten)260 void ChunkSplitter::trySplitting(const NamespaceString& nss,
261                                  const BSONObj& min,
262                                  const BSONObj& max,
263                                  long dataWritten) {
264     if (!_isPrimary) {
265         return;
266     }
267     uassertStatusOK(_threadPool.schedule([ this, nss, min, max, dataWritten ]() noexcept {
268         _runAutosplit(nss, min, max, dataWritten);
269     }));
270 }
271 
_runAutosplit(const NamespaceString & nss,const BSONObj & min,const BSONObj & max,long dataWritten)272 void ChunkSplitter::_runAutosplit(const NamespaceString& nss,
273                                   const BSONObj& min,
274                                   const BSONObj& max,
275                                   long dataWritten) {
276     if (!_isPrimary) {
277         return;
278     }
279 
280     try {
281         const auto opCtx = cc().makeOperationContext();
282         const auto routingInfo = uassertStatusOK(
283             Grid::get(opCtx.get())->catalogCache()->getCollectionRoutingInfo(opCtx.get(), nss));
284 
285         uassert(ErrorCodes::NamespaceNotSharded,
286                 "Could not split chunk. Collection is no longer sharded",
287                 routingInfo.cm());
288 
289         const auto cm = routingInfo.cm();
290         const auto chunk = cm->findIntersectingChunkWithSimpleCollation(min);
291 
292         // Stop if chunk's range differs from the range we were expecting to split.
293         if ((0 != chunk->getMin().woCompare(min)) || (0 != chunk->getMax().woCompare(max)) ||
294             (chunk->getShardId() != ShardingState::get(opCtx.get())->getShardName())) {
295             LOG(1) << "Cannot auto-split chunk with range '"
296                    << redact(ChunkRange(min, max).toString()) << "' for nss '" << nss
297                    << "' on shard '" << ShardingState::get(opCtx.get())->getShardName()
298                    << "' because since scheduling auto-split the chunk has been changed to '"
299                    << redact(chunk->toString()) << "'";
300             return;
301         }
302 
303         const ChunkRange chunkRange(chunk->getMin(), chunk->getMax());
304 
305         const auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration();
306         // Ensure we have the most up-to-date balancer configuration
307         uassertStatusOK(balancerConfig->refreshAndCheck(opCtx.get()));
308 
309         if (!balancerConfig->getShouldAutoSplit()) {
310             return;
311         }
312 
313         const uint64_t maxChunkSizeBytes = balancerConfig->getMaxChunkSizeBytes();
314 
315         LOG(1) << "about to initiate autosplit: " << redact(chunk->toString())
316                << " dataWritten since last check: " << dataWritten
317                << " maxChunkSizeBytes: " << maxChunkSizeBytes;
318 
319         auto splitPoints = uassertStatusOK(splitVector(opCtx.get(),
320                                                        nss,
321                                                        cm->getShardKeyPattern().toBSON(),
322                                                        chunk->getMin(),
323                                                        chunk->getMax(),
324                                                        false,
325                                                        boost::none,
326                                                        boost::none,
327                                                        boost::none,
328                                                        maxChunkSizeBytes));
329 
330         if (splitPoints.size() <= 1) {
331             // No split points means there isn't enough data to split on; 1 split point means we
332             // have between half the chunk size to full chunk size so there is no need to split yet
333             return;
334         }
335 
336         // We assume that if the chunk being split is the first (or last) one on the collection,
337         // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use the
338         // very first (or last) key as a split point.
339         //
340         // This heuristic is skipped for "special" shard key patterns that are not likely to produce
341         // monotonically increasing or decreasing values (e.g. hashed shard keys).
342 
343         // Keeps track of the minKey of the top chunk after the split so we can migrate the chunk.
344         BSONObj topChunkMinKey;
345 
346         if (KeyPattern::isOrderedKeyPattern(cm->getShardKeyPattern().toBSON())) {
347             if (0 ==
348                 cm->getShardKeyPattern().getKeyPattern().globalMin().woCompare(chunk->getMin())) {
349                 // MinKey is infinity (This is the first chunk on the collection)
350                 BSONObj key =
351                     findExtremeKeyForShard(opCtx.get(), nss, cm->getShardKeyPattern(), true);
352                 if (!key.isEmpty()) {
353                     splitPoints.front() = key.getOwned();
354                     topChunkMinKey = cm->getShardKeyPattern().getKeyPattern().globalMin();
355                 }
356             } else if (0 ==
357                        cm->getShardKeyPattern().getKeyPattern().globalMax().woCompare(
358                            chunk->getMax())) {
359                 // MaxKey is infinity (This is the last chunk on the collection)
360                 BSONObj key =
361                     findExtremeKeyForShard(opCtx.get(), nss, cm->getShardKeyPattern(), false);
362                 if (!key.isEmpty()) {
363                     splitPoints.back() = key.getOwned();
364                     topChunkMinKey = key.getOwned();
365                 }
366             }
367         }
368 
369         uassertStatusOK(splitChunkAtMultiplePoints(opCtx.get(),
370                                                    chunk->getShardId(),
371                                                    nss,
372                                                    cm->getShardKeyPattern(),
373                                                    cm->getVersion(),
374                                                    chunkRange,
375                                                    splitPoints));
376 
377         const bool shouldBalance = isAutoBalanceEnabled(opCtx.get(), nss, balancerConfig);
378 
379         log() << "autosplitted " << nss << " chunk: " << redact(chunk->toString()) << " into "
380               << (splitPoints.size() + 1) << " parts (maxChunkSizeBytes " << maxChunkSizeBytes
381               << ")"
382               << (topChunkMinKey.isEmpty() ? "" : " (top chunk migration suggested" +
383                           (std::string)(shouldBalance ? ")" : ", but no migrations allowed)"));
384 
385         // Balance the resulting chunks if the autobalance option is enabled and if we split at the
386         // first or last chunk on the collection as part of top chunk optimization.
387 
388         if (!shouldBalance || topChunkMinKey.isEmpty()) {
389             return;
390         }
391 
392         // Tries to move the top chunk out of the shard to prevent the hot spot from staying on a
393         // single shard. This is based on the assumption that succeeding inserts will fall on the
394         // top chunk.
395         moveChunk(opCtx.get(), nss, topChunkMinKey);
396     } catch (const DBException& ex) {
397         log() << "Unable to auto-split chunk " << redact(ChunkRange(min, max).toString())
398               << " in nss " << nss << causedBy(redact(ex.toStatus()));
399     } catch (const std::exception& e) {
400         log() << "caught exception while splitting chunk: " << redact(e.what());
401     }
402 }
403 
404 }  // namespace mongo
405