1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/s/sharding_state.h"
36 
37 #include "mongo/base/init.h"
38 #include "mongo/bson/util/bson_extract.h"
39 #include "mongo/client/connection_string.h"
40 #include "mongo/client/replica_set_monitor.h"
41 #include "mongo/db/auth/authorization_session.h"
42 #include "mongo/db/catalog/catalog_raii.h"
43 #include "mongo/db/client.h"
44 #include "mongo/db/dbhelpers.h"
45 #include "mongo/db/operation_context.h"
46 #include "mongo/db/ops/update.h"
47 #include "mongo/db/ops/update_lifecycle_impl.h"
48 #include "mongo/db/repl/optime.h"
49 #include "mongo/db/repl/replication_coordinator.h"
50 #include "mongo/db/s/operation_sharding_state.h"
51 #include "mongo/db/s/sharded_connection_info.h"
52 #include "mongo/db/s/sharding_initialization_mongod.h"
53 #include "mongo/db/s/sharding_statistics.h"
54 #include "mongo/db/s/type_shard_identity.h"
55 #include "mongo/executor/network_interface_factory.h"
56 #include "mongo/executor/network_interface_thread_pool.h"
57 #include "mongo/executor/task_executor_pool.h"
58 #include "mongo/rpc/metadata/config_server_metadata.h"
59 #include "mongo/rpc/metadata/metadata_hook.h"
60 #include "mongo/s/catalog/sharding_catalog_client.h"
61 #include "mongo/s/catalog/type_chunk.h"
62 #include "mongo/s/catalog_cache.h"
63 #include "mongo/s/chunk_version.h"
64 #include "mongo/s/client/shard_registry.h"
65 #include "mongo/s/client/sharding_network_connection_hook.h"
66 #include "mongo/s/grid.h"
67 #include "mongo/s/sharding_initialization.h"
68 #include "mongo/util/log.h"
69 #include "mongo/util/mongoutils/str.h"
70 
71 namespace mongo {
72 
73 using std::shared_ptr;
74 using std::string;
75 using std::vector;
76 
77 using CallbackArgs = executor::TaskExecutor::CallbackArgs;
78 
79 namespace {
80 
81 const auto getShardingState = ServiceContext::declareDecoration<ShardingState>();
82 
83 /**
84  * Updates the config server field of the shardIdentity document with the given connection string
85  * if setName is equal to the config server replica set name.
86  *
87  * Note: This is intended to be used on a new thread that hasn't called Client::initThread.
88  * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes
89  * to replica set membership.
90  */
updateShardIdentityConfigStringCB(const string & setName,const string & newConnectionString)91 void updateShardIdentityConfigStringCB(const string& setName, const string& newConnectionString) {
92     auto configsvrConnStr = grid.shardRegistry()->getConfigServerConnectionString();
93     if (configsvrConnStr.getSetName() != setName) {
94         // Ignore all change notification for other sets that are not the config server.
95         return;
96     }
97 
98     Client::initThread("updateShardIdentityConfigConnString");
99     auto uniqOpCtx = Client::getCurrent()->makeOperationContext();
100 
101     auto status = ShardingState::get(uniqOpCtx.get())
102                       ->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString);
103     if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) {
104         warning() << "error encountered while trying to update config connection string to "
105                   << newConnectionString << causedBy(redact(status));
106     }
107 }
108 
109 }  // namespace
110 
ShardingState()111 ShardingState::ShardingState()
112     : _chunkSplitter(stdx::make_unique<ChunkSplitter>()),
113       _initializationState(static_cast<uint32_t>(InitializationState::kNew)),
114       _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")),
115       _globalInit(&initializeGlobalShardingStateForMongod) {}
116 
117 ShardingState::~ShardingState() = default;
118 
get(ServiceContext * serviceContext)119 ShardingState* ShardingState::get(ServiceContext* serviceContext) {
120     return &getShardingState(serviceContext);
121 }
122 
get(OperationContext * operationContext)123 ShardingState* ShardingState::get(OperationContext* operationContext) {
124     return ShardingState::get(operationContext->getServiceContext());
125 }
126 
enabled() const127 bool ShardingState::enabled() const {
128     return _getInitializationState() == InitializationState::kInitialized;
129 }
130 
setEnabledForTest(const std::string & shardName)131 void ShardingState::setEnabledForTest(const std::string& shardName) {
132     _setInitializationState(InitializationState::kInitialized);
133     _shardName = shardName;
134 }
135 
canAcceptShardedCommands() const136 Status ShardingState::canAcceptShardedCommands() const {
137     if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
138         return {ErrorCodes::NoShardingEnabled,
139                 "Cannot accept sharding commands if not started with --shardsvr"};
140     } else if (!enabled()) {
141         return {ErrorCodes::ShardingStateNotInitialized,
142                 "Cannot accept sharding commands if sharding state has not "
143                 "been initialized with a shardIdentity document"};
144     } else {
145         return Status::OK();
146     }
147 }
148 
getConfigServer(OperationContext * opCtx)149 ConnectionString ShardingState::getConfigServer(OperationContext* opCtx) {
150     invariant(enabled());
151     stdx::lock_guard<stdx::mutex> lk(_mutex);
152     return Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString();
153 }
154 
getShardName()155 string ShardingState::getShardName() {
156     invariant(enabled());
157     stdx::lock_guard<stdx::mutex> lk(_mutex);
158     return _shardName;
159 }
160 
shutDown(OperationContext * opCtx)161 void ShardingState::shutDown(OperationContext* opCtx) {
162     stdx::unique_lock<stdx::mutex> lk(_mutex);
163     if (enabled()) {
164         Grid::get(opCtx)->getExecutorPool()->shutdownAndJoin();
165         Grid::get(opCtx)->catalogClient()->shutDown(opCtx);
166     }
167 }
168 
updateConfigServerOpTimeFromMetadata(OperationContext * opCtx)169 Status ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* opCtx) {
170     if (!enabled()) {
171         // Nothing to do if sharding state has not been initialized.
172         return Status::OK();
173     }
174 
175     boost::optional<repl::OpTime> opTime = rpc::ConfigServerMetadata::get(opCtx).getOpTime();
176     if (opTime) {
177         if (!AuthorizationSession::get(opCtx->getClient())
178                  ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
179                                                     ActionType::internal)) {
180             return Status(ErrorCodes::Unauthorized, "Unauthorized to update config opTime");
181         }
182 
183         Grid::get(opCtx)->advanceConfigOpTime(opCtx, *opTime, "request from");
184     }
185 
186     return Status::OK();
187 }
188 
getChunkSplitter()189 ChunkSplitter* ShardingState::getChunkSplitter() {
190     return _chunkSplitter.get();
191 }
192 
initiateChunkSplitter()193 void ShardingState::initiateChunkSplitter() {
194     _chunkSplitter->initiateChunkSplitter();
195 }
196 
interruptChunkSplitter()197 void ShardingState::interruptChunkSplitter() {
198     _chunkSplitter->interruptChunkSplitter();
199 }
200 
setGlobalInitMethodForTest(GlobalInitFunc func)201 void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) {
202     _globalInit = func;
203 }
204 
onStaleShardVersion(OperationContext * opCtx,const NamespaceString & nss,const ChunkVersion & expectedVersion)205 Status ShardingState::onStaleShardVersion(OperationContext* opCtx,
206                                           const NamespaceString& nss,
207                                           const ChunkVersion& expectedVersion) {
208     invariant(!opCtx->getClient()->isInDirectClient());
209     invariant(!opCtx->lockState()->isLocked());
210     invariant(enabled());
211 
212     LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version "
213            << expectedVersion;
214 
215     ShardingStatistics::get(opCtx).countStaleConfigErrors.addAndFetch(1);
216 
217     // Ensure any ongoing migrations have completed
218     auto& oss = OperationShardingState::get(opCtx);
219     oss.waitForMigrationCriticalSectionSignal(opCtx);
220 
221     const auto collectionShardVersion = [&] {
222         // Fast path - check if the requested version is at a higher version than the current
223         // metadata version or a different epoch before verifying against config server
224         AutoGetCollection autoColl(opCtx, nss, MODE_IS);
225         const auto currentMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata();
226         if (currentMetadata) {
227             return currentMetadata->getShardVersion();
228         }
229 
230         return ChunkVersion::UNSHARDED();
231     }();
232 
233     if (collectionShardVersion.epoch() == expectedVersion.epoch() &&
234         collectionShardVersion >= expectedVersion) {
235         // Don't need to remotely reload if we're in the same epoch and the requested version is
236         // smaller than the one we know about. This means that the remote side is behind.
237         return Status::OK();
238     }
239 
240     try {
241         _refreshMetadata(opCtx, nss);
242         return Status::OK();
243     } catch (const DBException& ex) {
244         log() << "Failed to refresh metadata for collection" << nss << causedBy(redact(ex));
245         return ex.toStatus();
246     }
247 }
248 
refreshMetadataNow(OperationContext * opCtx,const NamespaceString & nss,ChunkVersion * latestShardVersion)249 Status ShardingState::refreshMetadataNow(OperationContext* opCtx,
250                                          const NamespaceString& nss,
251                                          ChunkVersion* latestShardVersion) {
252     try {
253         *latestShardVersion = _refreshMetadata(opCtx, nss);
254         return Status::OK();
255     } catch (const DBException& ex) {
256         return ex.toStatus();
257     }
258 }
259 
260 // NOTE: This method will be called inside a database lock so it should never take any database
261 // locks, perform I/O, or any long running operations.
initializeFromShardIdentity(OperationContext * opCtx,const ShardIdentityType & shardIdentity)262 Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
263                                                   const ShardIdentityType& shardIdentity) {
264     invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
265     invariant(opCtx->lockState()->isLocked());
266 
267     Status validationStatus = shardIdentity.validate();
268     if (!validationStatus.isOK()) {
269         return Status(
270             validationStatus.code(),
271             str::stream()
272                 << "Invalid shard identity document found when initializing sharding state: "
273                 << validationStatus.reason());
274     }
275 
276     log() << "initializing sharding state with: " << shardIdentity;
277 
278     stdx::unique_lock<stdx::mutex> lk(_mutex);
279 
280     auto configSvrConnStr = shardIdentity.getConfigsvrConnString();
281 
282     if (enabled()) {
283         invariant(!_shardName.empty());
284         fassert(40372, _shardName == shardIdentity.getShardName());
285 
286         auto prevConfigsvrConnStr =
287             Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString();
288         invariant(prevConfigsvrConnStr.type() == ConnectionString::SET);
289         fassert(40373, prevConfigsvrConnStr.getSetName() == configSvrConnStr.getSetName());
290 
291         invariant(_clusterId.isSet());
292         fassert(40374, _clusterId == shardIdentity.getClusterId());
293 
294         return Status::OK();
295     }
296 
297     if (_getInitializationState() == InitializationState::kError) {
298         return {ErrorCodes::ManualInterventionRequired,
299                 str::stream() << "Server's sharding metadata manager failed to initialize and will "
300                                  "remain in this state until the instance is manually reset"
301                               << causedBy(_initializationStatus)};
302     }
303 
304     ShardedConnectionInfo::addHook(opCtx->getServiceContext());
305 
306     try {
307         Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx));
308         if (status.isOK()) {
309             ReplicaSetMonitor::setSynchronousConfigChangeHook(
310                 &ShardRegistry::replicaSetChangeShardRegistryUpdateHook);
311             ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
312 
313             // Determine primary/secondary/standalone state in order to properly initialize sharding
314             // components.
315             auto replCoord = repl::ReplicationCoordinator::get(opCtx);
316             bool isReplSet =
317                 replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
318             bool isStandaloneOrPrimary =
319                 !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
320                                repl::MemberState::RS_PRIMARY);
321 
322             CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary);
323 
324             _chunkSplitter->setReplicaSetMode(isStandaloneOrPrimary);
325 
326             Grid::get(opCtx)->setShardingInitialized();
327 
328             log() << "initialized sharding components for "
329                   << (isStandaloneOrPrimary ? "primary" : "secondary") << " node.";
330             _setInitializationState(InitializationState::kInitialized);
331         } else {
332             log() << "failed to initialize sharding components" << causedBy(status);
333             _initializationStatus = status;
334             _setInitializationState(InitializationState::kError);
335         }
336         _shardName = shardIdentity.getShardName();
337         _clusterId = shardIdentity.getClusterId();
338 
339         return status;
340     } catch (const DBException& ex) {
341         auto errorStatus = ex.toStatus();
342         _initializationStatus = errorStatus;
343         _setInitializationState(InitializationState::kError);
344         return errorStatus;
345     }
346 
347     MONGO_UNREACHABLE;
348 }
349 
_getInitializationState() const350 ShardingState::InitializationState ShardingState::_getInitializationState() const {
351     return static_cast<InitializationState>(_initializationState.load());
352 }
353 
_setInitializationState(InitializationState newState)354 void ShardingState::_setInitializationState(InitializationState newState) {
355     _initializationState.store(static_cast<uint32_t>(newState));
356 }
357 
initializeShardingAwarenessIfNeeded(OperationContext * opCtx)358 StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) {
359     invariant(!opCtx->lockState()->isLocked());
360 
361     // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require*
362     // a shardIdentity document to be passed through --overrideShardIdentity.
363     if (storageGlobalParams.readOnly) {
364         if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
365             if (serverGlobalParams.overrideShardIdentity.isEmpty()) {
366                 return {ErrorCodes::InvalidOptions,
367                         "If started with --shardsvr in queryableBackupMode, a shardIdentity "
368                         "document must be provided through --overrideShardIdentity"};
369             }
370             auto swOverrideShardIdentity =
371                 ShardIdentityType::fromBSON(serverGlobalParams.overrideShardIdentity);
372             if (!swOverrideShardIdentity.isOK()) {
373                 return swOverrideShardIdentity.getStatus();
374             }
375             {
376                 // Global lock is required to call initializeFromShardIdenetity().
377                 Lock::GlobalWrite lk(opCtx);
378                 auto status =
379                     initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue());
380                 if (!status.isOK()) {
381                     return status;
382                 }
383             }
384             return true;
385         } else {
386             // Error if --overrideShardIdentity is used but *not* started with --shardsvr.
387             if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
388                 return {
389                     ErrorCodes::InvalidOptions,
390                     str::stream()
391                         << "Not started with --shardsvr, but a shardIdentity document was provided "
392                            "through --overrideShardIdentity: "
393                         << serverGlobalParams.overrideShardIdentity};
394             }
395             return false;
396         }
397     }
398     // In sharded *non*-readOnly mode, error if --overrideShardIdentity is provided. Use the
399     // shardIdentity document on disk if one exists, but it is okay if no shardIdentity document is
400     // provided at all (sharding awareness will be initialized when a shardIdentity document is
401     // inserted).
402     else {
403         if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
404             return {
405                 ErrorCodes::InvalidOptions,
406                 str::stream() << "--overrideShardIdentity is only allowed in sharded "
407                                  "queryableBackupMode. If not in queryableBackupMode, you can edit "
408                                  "the shardIdentity document by starting the server *without* "
409                                  "--shardsvr, manually updating the shardIdentity document in the "
410                               << NamespaceString::kServerConfigurationNamespace.toString()
411                               << " collection, and restarting the server with --shardsvr."};
412         }
413 
414         // Load the shardIdentity document from disk.
415         BSONObj shardIdentityBSON;
416         bool foundShardIdentity = false;
417         try {
418             AutoGetCollection autoColl(
419                 opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IS);
420             foundShardIdentity = Helpers::findOne(opCtx,
421                                                   autoColl.getCollection(),
422                                                   BSON("_id" << ShardIdentityType::IdName),
423                                                   shardIdentityBSON);
424         } catch (const DBException& ex) {
425             return ex.toStatus();
426         }
427 
428         if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
429             if (!foundShardIdentity) {
430                 warning() << "Started with --shardsvr, but no shardIdentity document was found on "
431                              "disk in "
432                           << NamespaceString::kServerConfigurationNamespace
433                           << ". This most likely means this server has not yet been added to a "
434                              "sharded cluster.";
435                 return false;
436             }
437 
438             invariant(!shardIdentityBSON.isEmpty());
439 
440             auto swShardIdentity = ShardIdentityType::fromBSON(shardIdentityBSON);
441             if (!swShardIdentity.isOK()) {
442                 return swShardIdentity.getStatus();
443             }
444             {
445                 // Global lock is required to call initializeFromShardIdenetity().
446                 Lock::GlobalWrite lk(opCtx);
447                 auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue());
448                 if (!status.isOK()) {
449                     return status;
450                 }
451             }
452             return true;
453         } else {
454             // Warn if a shardIdentity document is found on disk but *not* started with --shardsvr.
455             if (!shardIdentityBSON.isEmpty()) {
456                 warning() << "Not started with --shardsvr, but a shardIdentity document was found "
457                              "on disk in "
458                           << NamespaceString::kServerConfigurationNamespace << ": "
459                           << shardIdentityBSON;
460             }
461             return false;
462         }
463     }
464 }
465 
_refreshMetadata(OperationContext * opCtx,const NamespaceString & nss)466 ChunkVersion ShardingState::_refreshMetadata(OperationContext* opCtx, const NamespaceString& nss) {
467     invariant(!opCtx->lockState()->isLocked());
468     invariant(enabled());
469 
470     const ShardId shardId = getShardName();
471 
472     uassert(ErrorCodes::NotYetInitialized,
473             str::stream() << "Cannot refresh metadata for " << nss.ns()
474                           << " before shard name has been set",
475             shardId.isValid());
476 
477     const auto routingInfo = uassertStatusOK(
478         Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss));
479     const auto cm = routingInfo.cm();
480 
481     if (!cm) {
482         // No chunk manager, so unsharded.
483 
484         // Exclusive collection lock needed since we're now changing the metadata
485         AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
486 
487         auto css = CollectionShardingState::get(opCtx, nss);
488         css->refreshMetadata(opCtx, nullptr);
489 
490         return ChunkVersion::UNSHARDED();
491     }
492 
493     {
494         AutoGetCollection autoColl(opCtx, nss, MODE_IS);
495         auto css = CollectionShardingState::get(opCtx, nss);
496 
497         // We already have newer version
498         if (css->getMetadata() &&
499             css->getMetadata()->getCollVersion().epoch() == cm->getVersion().epoch() &&
500             css->getMetadata()->getCollVersion() >= cm->getVersion()) {
501             LOG(1) << "Skipping refresh of metadata for " << nss << " "
502                    << css->getMetadata()->getCollVersion() << " with an older " << cm->getVersion();
503             return css->getMetadata()->getShardVersion();
504         }
505     }
506 
507     // Exclusive collection lock needed since we're now changing the metadata
508     AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
509 
510     auto css = CollectionShardingState::get(opCtx, nss);
511 
512     // We already have newer version
513     if (css->getMetadata() &&
514         css->getMetadata()->getCollVersion().epoch() == cm->getVersion().epoch() &&
515         css->getMetadata()->getCollVersion() >= cm->getVersion()) {
516         LOG(1) << "Skipping refresh of metadata for " << nss << " "
517                << css->getMetadata()->getCollVersion() << " with an older " << cm->getVersion();
518         return css->getMetadata()->getShardVersion();
519     }
520 
521     std::unique_ptr<CollectionMetadata> newCollectionMetadata =
522         stdx::make_unique<CollectionMetadata>(cm, shardId);
523 
524     css->refreshMetadata(opCtx, std::move(newCollectionMetadata));
525 
526     return css->getMetadata()->getShardVersion();
527 }
528 
registerDonateChunk(const MoveChunkRequest & args)529 StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk(
530     const MoveChunkRequest& args) {
531     return _activeMigrationsRegistry.registerDonateChunk(args);
532 }
533 
registerReceiveChunk(const NamespaceString & nss,const ChunkRange & chunkRange,const ShardId & fromShardId)534 StatusWith<ScopedRegisterReceiveChunk> ShardingState::registerReceiveChunk(
535     const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) {
536     return _activeMigrationsRegistry.registerReceiveChunk(nss, chunkRange, fromShardId);
537 }
538 
getActiveDonateChunkNss()539 boost::optional<NamespaceString> ShardingState::getActiveDonateChunkNss() {
540     return _activeMigrationsRegistry.getActiveDonateChunkNss();
541 }
542 
getActiveMigrationStatusReport(OperationContext * opCtx)543 BSONObj ShardingState::getActiveMigrationStatusReport(OperationContext* opCtx) {
544     return _activeMigrationsRegistry.getActiveMigrationStatusReport(opCtx);
545 }
546 
appendInfo(OperationContext * opCtx,BSONObjBuilder & builder)547 void ShardingState::appendInfo(OperationContext* opCtx, BSONObjBuilder& builder) {
548     const bool isEnabled = enabled();
549     builder.appendBool("enabled", isEnabled);
550     if (!isEnabled)
551         return;
552 
553     stdx::lock_guard<stdx::mutex> lk(_mutex);
554 
555     builder.append("configServer",
556                    Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString().toString());
557     builder.append("shardName", _shardName);
558     builder.append("clusterId", _clusterId);
559 }
560 
needCollectionMetadata(OperationContext * opCtx,const string & ns)561 bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const string& ns) {
562     if (!enabled())
563         return false;
564 
565     Client* client = opCtx->getClient();
566 
567     // Shard version information received from mongos may either by attached to the Client or
568     // directly to the OperationContext.
569     return ShardedConnectionInfo::get(client, false) ||
570         OperationShardingState::get(opCtx).hasShardVersion();
571 }
572 
updateShardIdentityConfigString(OperationContext * opCtx,const std::string & newConnectionString)573 Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx,
574                                                       const std::string& newConnectionString) {
575     BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString));
576 
577     UpdateRequest updateReq(NamespaceString::kServerConfigurationNamespace);
578     updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName));
579     updateReq.setUpdates(updateObj);
580     UpdateLifecycleImpl updateLifecycle(NamespaceString::kServerConfigurationNamespace);
581     updateReq.setLifecycle(&updateLifecycle);
582 
583     try {
584         AutoGetOrCreateDb autoDb(
585             opCtx, NamespaceString::kServerConfigurationNamespace.db(), MODE_X);
586 
587         auto result = update(opCtx, autoDb.getDb(), updateReq);
588         if (result.numMatched == 0) {
589             warning() << "failed to update config string of shard identity document because "
590                       << "it does not exist. This shard could have been removed from the cluster";
591         } else {
592             LOG(2) << "Updated config server connection string in shardIdentity document to"
593                    << newConnectionString;
594         }
595     } catch (const DBException& exception) {
596         return exception.toStatus();
597     }
598 
599     return Status::OK();
600 }
601 
getRangeDeleterTaskExecutor()602 executor::TaskExecutor* ShardingState::getRangeDeleterTaskExecutor() {
603     stdx::lock_guard<stdx::mutex> lk(_rangeDeleterExecutor.lock);
604     if (_rangeDeleterExecutor.taskExecutor.get() == nullptr) {
605         static const char kExecName[] = "NetworkInterfaceCollectionRangeDeleter-TaskExecutor";
606         auto net = executor::makeNetworkInterface(kExecName);
607         auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
608         _rangeDeleterExecutor.taskExecutor =
609             stdx::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
610         _rangeDeleterExecutor.taskExecutor->startup();
611     }
612     return _rangeDeleterExecutor.taskExecutor.get();
613 }
614 
~RangeDeleterExecutor()615 ShardingState::RangeDeleterExecutor::~RangeDeleterExecutor() {
616     if (taskExecutor) {
617         taskExecutor->shutdown();
618         taskExecutor->join();
619     }
620 }
621 
622 }  // namespace mongo
623