1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/s/sharding_state.h"
36
37 #include "mongo/base/init.h"
38 #include "mongo/bson/util/bson_extract.h"
39 #include "mongo/client/connection_string.h"
40 #include "mongo/client/replica_set_monitor.h"
41 #include "mongo/db/auth/authorization_session.h"
42 #include "mongo/db/catalog/catalog_raii.h"
43 #include "mongo/db/client.h"
44 #include "mongo/db/dbhelpers.h"
45 #include "mongo/db/operation_context.h"
46 #include "mongo/db/ops/update.h"
47 #include "mongo/db/ops/update_lifecycle_impl.h"
48 #include "mongo/db/repl/optime.h"
49 #include "mongo/db/repl/replication_coordinator.h"
50 #include "mongo/db/s/operation_sharding_state.h"
51 #include "mongo/db/s/sharded_connection_info.h"
52 #include "mongo/db/s/sharding_initialization_mongod.h"
53 #include "mongo/db/s/sharding_statistics.h"
54 #include "mongo/db/s/type_shard_identity.h"
55 #include "mongo/executor/network_interface_factory.h"
56 #include "mongo/executor/network_interface_thread_pool.h"
57 #include "mongo/executor/task_executor_pool.h"
58 #include "mongo/rpc/metadata/config_server_metadata.h"
59 #include "mongo/rpc/metadata/metadata_hook.h"
60 #include "mongo/s/catalog/sharding_catalog_client.h"
61 #include "mongo/s/catalog/type_chunk.h"
62 #include "mongo/s/catalog_cache.h"
63 #include "mongo/s/chunk_version.h"
64 #include "mongo/s/client/shard_registry.h"
65 #include "mongo/s/client/sharding_network_connection_hook.h"
66 #include "mongo/s/grid.h"
67 #include "mongo/s/sharding_initialization.h"
68 #include "mongo/util/log.h"
69 #include "mongo/util/mongoutils/str.h"
70
71 namespace mongo {
72
73 using std::shared_ptr;
74 using std::string;
75 using std::vector;
76
77 using CallbackArgs = executor::TaskExecutor::CallbackArgs;
78
79 namespace {
80
81 const auto getShardingState = ServiceContext::declareDecoration<ShardingState>();
82
83 /**
84 * Updates the config server field of the shardIdentity document with the given connection string
85 * if setName is equal to the config server replica set name.
86 *
87 * Note: This is intended to be used on a new thread that hasn't called Client::initThread.
88 * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes
89 * to replica set membership.
90 */
updateShardIdentityConfigStringCB(const string & setName,const string & newConnectionString)91 void updateShardIdentityConfigStringCB(const string& setName, const string& newConnectionString) {
92 auto configsvrConnStr = grid.shardRegistry()->getConfigServerConnectionString();
93 if (configsvrConnStr.getSetName() != setName) {
94 // Ignore all change notification for other sets that are not the config server.
95 return;
96 }
97
98 Client::initThread("updateShardIdentityConfigConnString");
99 auto uniqOpCtx = Client::getCurrent()->makeOperationContext();
100
101 auto status = ShardingState::get(uniqOpCtx.get())
102 ->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString);
103 if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) {
104 warning() << "error encountered while trying to update config connection string to "
105 << newConnectionString << causedBy(redact(status));
106 }
107 }
108
109 } // namespace
110
ShardingState()111 ShardingState::ShardingState()
112 : _chunkSplitter(stdx::make_unique<ChunkSplitter>()),
113 _initializationState(static_cast<uint32_t>(InitializationState::kNew)),
114 _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")),
115 _globalInit(&initializeGlobalShardingStateForMongod) {}
116
117 ShardingState::~ShardingState() = default;
118
get(ServiceContext * serviceContext)119 ShardingState* ShardingState::get(ServiceContext* serviceContext) {
120 return &getShardingState(serviceContext);
121 }
122
get(OperationContext * operationContext)123 ShardingState* ShardingState::get(OperationContext* operationContext) {
124 return ShardingState::get(operationContext->getServiceContext());
125 }
126
enabled() const127 bool ShardingState::enabled() const {
128 return _getInitializationState() == InitializationState::kInitialized;
129 }
130
setEnabledForTest(const std::string & shardName)131 void ShardingState::setEnabledForTest(const std::string& shardName) {
132 _setInitializationState(InitializationState::kInitialized);
133 _shardName = shardName;
134 }
135
canAcceptShardedCommands() const136 Status ShardingState::canAcceptShardedCommands() const {
137 if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
138 return {ErrorCodes::NoShardingEnabled,
139 "Cannot accept sharding commands if not started with --shardsvr"};
140 } else if (!enabled()) {
141 return {ErrorCodes::ShardingStateNotInitialized,
142 "Cannot accept sharding commands if sharding state has not "
143 "been initialized with a shardIdentity document"};
144 } else {
145 return Status::OK();
146 }
147 }
148
getConfigServer(OperationContext * opCtx)149 ConnectionString ShardingState::getConfigServer(OperationContext* opCtx) {
150 invariant(enabled());
151 stdx::lock_guard<stdx::mutex> lk(_mutex);
152 return Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString();
153 }
154
getShardName()155 string ShardingState::getShardName() {
156 invariant(enabled());
157 stdx::lock_guard<stdx::mutex> lk(_mutex);
158 return _shardName;
159 }
160
shutDown(OperationContext * opCtx)161 void ShardingState::shutDown(OperationContext* opCtx) {
162 stdx::unique_lock<stdx::mutex> lk(_mutex);
163 if (enabled()) {
164 Grid::get(opCtx)->getExecutorPool()->shutdownAndJoin();
165 Grid::get(opCtx)->catalogClient()->shutDown(opCtx);
166 }
167 }
168
updateConfigServerOpTimeFromMetadata(OperationContext * opCtx)169 Status ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* opCtx) {
170 if (!enabled()) {
171 // Nothing to do if sharding state has not been initialized.
172 return Status::OK();
173 }
174
175 boost::optional<repl::OpTime> opTime = rpc::ConfigServerMetadata::get(opCtx).getOpTime();
176 if (opTime) {
177 if (!AuthorizationSession::get(opCtx->getClient())
178 ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
179 ActionType::internal)) {
180 return Status(ErrorCodes::Unauthorized, "Unauthorized to update config opTime");
181 }
182
183 Grid::get(opCtx)->advanceConfigOpTime(opCtx, *opTime, "request from");
184 }
185
186 return Status::OK();
187 }
188
getChunkSplitter()189 ChunkSplitter* ShardingState::getChunkSplitter() {
190 return _chunkSplitter.get();
191 }
192
initiateChunkSplitter()193 void ShardingState::initiateChunkSplitter() {
194 _chunkSplitter->initiateChunkSplitter();
195 }
196
interruptChunkSplitter()197 void ShardingState::interruptChunkSplitter() {
198 _chunkSplitter->interruptChunkSplitter();
199 }
200
setGlobalInitMethodForTest(GlobalInitFunc func)201 void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) {
202 _globalInit = func;
203 }
204
onStaleShardVersion(OperationContext * opCtx,const NamespaceString & nss,const ChunkVersion & expectedVersion)205 Status ShardingState::onStaleShardVersion(OperationContext* opCtx,
206 const NamespaceString& nss,
207 const ChunkVersion& expectedVersion) {
208 invariant(!opCtx->getClient()->isInDirectClient());
209 invariant(!opCtx->lockState()->isLocked());
210 invariant(enabled());
211
212 LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version "
213 << expectedVersion;
214
215 ShardingStatistics::get(opCtx).countStaleConfigErrors.addAndFetch(1);
216
217 // Ensure any ongoing migrations have completed
218 auto& oss = OperationShardingState::get(opCtx);
219 oss.waitForMigrationCriticalSectionSignal(opCtx);
220
221 const auto collectionShardVersion = [&] {
222 // Fast path - check if the requested version is at a higher version than the current
223 // metadata version or a different epoch before verifying against config server
224 AutoGetCollection autoColl(opCtx, nss, MODE_IS);
225 const auto currentMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata();
226 if (currentMetadata) {
227 return currentMetadata->getShardVersion();
228 }
229
230 return ChunkVersion::UNSHARDED();
231 }();
232
233 if (collectionShardVersion.epoch() == expectedVersion.epoch() &&
234 collectionShardVersion >= expectedVersion) {
235 // Don't need to remotely reload if we're in the same epoch and the requested version is
236 // smaller than the one we know about. This means that the remote side is behind.
237 return Status::OK();
238 }
239
240 try {
241 _refreshMetadata(opCtx, nss);
242 return Status::OK();
243 } catch (const DBException& ex) {
244 log() << "Failed to refresh metadata for collection" << nss << causedBy(redact(ex));
245 return ex.toStatus();
246 }
247 }
248
refreshMetadataNow(OperationContext * opCtx,const NamespaceString & nss,ChunkVersion * latestShardVersion)249 Status ShardingState::refreshMetadataNow(OperationContext* opCtx,
250 const NamespaceString& nss,
251 ChunkVersion* latestShardVersion) {
252 try {
253 *latestShardVersion = _refreshMetadata(opCtx, nss);
254 return Status::OK();
255 } catch (const DBException& ex) {
256 return ex.toStatus();
257 }
258 }
259
260 // NOTE: This method will be called inside a database lock so it should never take any database
261 // locks, perform I/O, or any long running operations.
initializeFromShardIdentity(OperationContext * opCtx,const ShardIdentityType & shardIdentity)262 Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
263 const ShardIdentityType& shardIdentity) {
264 invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
265 invariant(opCtx->lockState()->isLocked());
266
267 Status validationStatus = shardIdentity.validate();
268 if (!validationStatus.isOK()) {
269 return Status(
270 validationStatus.code(),
271 str::stream()
272 << "Invalid shard identity document found when initializing sharding state: "
273 << validationStatus.reason());
274 }
275
276 log() << "initializing sharding state with: " << shardIdentity;
277
278 stdx::unique_lock<stdx::mutex> lk(_mutex);
279
280 auto configSvrConnStr = shardIdentity.getConfigsvrConnString();
281
282 if (enabled()) {
283 invariant(!_shardName.empty());
284 fassert(40372, _shardName == shardIdentity.getShardName());
285
286 auto prevConfigsvrConnStr =
287 Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString();
288 invariant(prevConfigsvrConnStr.type() == ConnectionString::SET);
289 fassert(40373, prevConfigsvrConnStr.getSetName() == configSvrConnStr.getSetName());
290
291 invariant(_clusterId.isSet());
292 fassert(40374, _clusterId == shardIdentity.getClusterId());
293
294 return Status::OK();
295 }
296
297 if (_getInitializationState() == InitializationState::kError) {
298 return {ErrorCodes::ManualInterventionRequired,
299 str::stream() << "Server's sharding metadata manager failed to initialize and will "
300 "remain in this state until the instance is manually reset"
301 << causedBy(_initializationStatus)};
302 }
303
304 ShardedConnectionInfo::addHook(opCtx->getServiceContext());
305
306 try {
307 Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx));
308 if (status.isOK()) {
309 ReplicaSetMonitor::setSynchronousConfigChangeHook(
310 &ShardRegistry::replicaSetChangeShardRegistryUpdateHook);
311 ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
312
313 // Determine primary/secondary/standalone state in order to properly initialize sharding
314 // components.
315 auto replCoord = repl::ReplicationCoordinator::get(opCtx);
316 bool isReplSet =
317 replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
318 bool isStandaloneOrPrimary =
319 !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
320 repl::MemberState::RS_PRIMARY);
321
322 CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary);
323
324 _chunkSplitter->setReplicaSetMode(isStandaloneOrPrimary);
325
326 Grid::get(opCtx)->setShardingInitialized();
327
328 log() << "initialized sharding components for "
329 << (isStandaloneOrPrimary ? "primary" : "secondary") << " node.";
330 _setInitializationState(InitializationState::kInitialized);
331 } else {
332 log() << "failed to initialize sharding components" << causedBy(status);
333 _initializationStatus = status;
334 _setInitializationState(InitializationState::kError);
335 }
336 _shardName = shardIdentity.getShardName();
337 _clusterId = shardIdentity.getClusterId();
338
339 return status;
340 } catch (const DBException& ex) {
341 auto errorStatus = ex.toStatus();
342 _initializationStatus = errorStatus;
343 _setInitializationState(InitializationState::kError);
344 return errorStatus;
345 }
346
347 MONGO_UNREACHABLE;
348 }
349
_getInitializationState() const350 ShardingState::InitializationState ShardingState::_getInitializationState() const {
351 return static_cast<InitializationState>(_initializationState.load());
352 }
353
_setInitializationState(InitializationState newState)354 void ShardingState::_setInitializationState(InitializationState newState) {
355 _initializationState.store(static_cast<uint32_t>(newState));
356 }
357
initializeShardingAwarenessIfNeeded(OperationContext * opCtx)358 StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) {
359 invariant(!opCtx->lockState()->isLocked());
360
361 // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require*
362 // a shardIdentity document to be passed through --overrideShardIdentity.
363 if (storageGlobalParams.readOnly) {
364 if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
365 if (serverGlobalParams.overrideShardIdentity.isEmpty()) {
366 return {ErrorCodes::InvalidOptions,
367 "If started with --shardsvr in queryableBackupMode, a shardIdentity "
368 "document must be provided through --overrideShardIdentity"};
369 }
370 auto swOverrideShardIdentity =
371 ShardIdentityType::fromBSON(serverGlobalParams.overrideShardIdentity);
372 if (!swOverrideShardIdentity.isOK()) {
373 return swOverrideShardIdentity.getStatus();
374 }
375 {
376 // Global lock is required to call initializeFromShardIdenetity().
377 Lock::GlobalWrite lk(opCtx);
378 auto status =
379 initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue());
380 if (!status.isOK()) {
381 return status;
382 }
383 }
384 return true;
385 } else {
386 // Error if --overrideShardIdentity is used but *not* started with --shardsvr.
387 if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
388 return {
389 ErrorCodes::InvalidOptions,
390 str::stream()
391 << "Not started with --shardsvr, but a shardIdentity document was provided "
392 "through --overrideShardIdentity: "
393 << serverGlobalParams.overrideShardIdentity};
394 }
395 return false;
396 }
397 }
398 // In sharded *non*-readOnly mode, error if --overrideShardIdentity is provided. Use the
399 // shardIdentity document on disk if one exists, but it is okay if no shardIdentity document is
400 // provided at all (sharding awareness will be initialized when a shardIdentity document is
401 // inserted).
402 else {
403 if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
404 return {
405 ErrorCodes::InvalidOptions,
406 str::stream() << "--overrideShardIdentity is only allowed in sharded "
407 "queryableBackupMode. If not in queryableBackupMode, you can edit "
408 "the shardIdentity document by starting the server *without* "
409 "--shardsvr, manually updating the shardIdentity document in the "
410 << NamespaceString::kServerConfigurationNamespace.toString()
411 << " collection, and restarting the server with --shardsvr."};
412 }
413
414 // Load the shardIdentity document from disk.
415 BSONObj shardIdentityBSON;
416 bool foundShardIdentity = false;
417 try {
418 AutoGetCollection autoColl(
419 opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IS);
420 foundShardIdentity = Helpers::findOne(opCtx,
421 autoColl.getCollection(),
422 BSON("_id" << ShardIdentityType::IdName),
423 shardIdentityBSON);
424 } catch (const DBException& ex) {
425 return ex.toStatus();
426 }
427
428 if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
429 if (!foundShardIdentity) {
430 warning() << "Started with --shardsvr, but no shardIdentity document was found on "
431 "disk in "
432 << NamespaceString::kServerConfigurationNamespace
433 << ". This most likely means this server has not yet been added to a "
434 "sharded cluster.";
435 return false;
436 }
437
438 invariant(!shardIdentityBSON.isEmpty());
439
440 auto swShardIdentity = ShardIdentityType::fromBSON(shardIdentityBSON);
441 if (!swShardIdentity.isOK()) {
442 return swShardIdentity.getStatus();
443 }
444 {
445 // Global lock is required to call initializeFromShardIdenetity().
446 Lock::GlobalWrite lk(opCtx);
447 auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue());
448 if (!status.isOK()) {
449 return status;
450 }
451 }
452 return true;
453 } else {
454 // Warn if a shardIdentity document is found on disk but *not* started with --shardsvr.
455 if (!shardIdentityBSON.isEmpty()) {
456 warning() << "Not started with --shardsvr, but a shardIdentity document was found "
457 "on disk in "
458 << NamespaceString::kServerConfigurationNamespace << ": "
459 << shardIdentityBSON;
460 }
461 return false;
462 }
463 }
464 }
465
_refreshMetadata(OperationContext * opCtx,const NamespaceString & nss)466 ChunkVersion ShardingState::_refreshMetadata(OperationContext* opCtx, const NamespaceString& nss) {
467 invariant(!opCtx->lockState()->isLocked());
468 invariant(enabled());
469
470 const ShardId shardId = getShardName();
471
472 uassert(ErrorCodes::NotYetInitialized,
473 str::stream() << "Cannot refresh metadata for " << nss.ns()
474 << " before shard name has been set",
475 shardId.isValid());
476
477 const auto routingInfo = uassertStatusOK(
478 Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss));
479 const auto cm = routingInfo.cm();
480
481 if (!cm) {
482 // No chunk manager, so unsharded.
483
484 // Exclusive collection lock needed since we're now changing the metadata
485 AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
486
487 auto css = CollectionShardingState::get(opCtx, nss);
488 css->refreshMetadata(opCtx, nullptr);
489
490 return ChunkVersion::UNSHARDED();
491 }
492
493 {
494 AutoGetCollection autoColl(opCtx, nss, MODE_IS);
495 auto css = CollectionShardingState::get(opCtx, nss);
496
497 // We already have newer version
498 if (css->getMetadata() &&
499 css->getMetadata()->getCollVersion().epoch() == cm->getVersion().epoch() &&
500 css->getMetadata()->getCollVersion() >= cm->getVersion()) {
501 LOG(1) << "Skipping refresh of metadata for " << nss << " "
502 << css->getMetadata()->getCollVersion() << " with an older " << cm->getVersion();
503 return css->getMetadata()->getShardVersion();
504 }
505 }
506
507 // Exclusive collection lock needed since we're now changing the metadata
508 AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
509
510 auto css = CollectionShardingState::get(opCtx, nss);
511
512 // We already have newer version
513 if (css->getMetadata() &&
514 css->getMetadata()->getCollVersion().epoch() == cm->getVersion().epoch() &&
515 css->getMetadata()->getCollVersion() >= cm->getVersion()) {
516 LOG(1) << "Skipping refresh of metadata for " << nss << " "
517 << css->getMetadata()->getCollVersion() << " with an older " << cm->getVersion();
518 return css->getMetadata()->getShardVersion();
519 }
520
521 std::unique_ptr<CollectionMetadata> newCollectionMetadata =
522 stdx::make_unique<CollectionMetadata>(cm, shardId);
523
524 css->refreshMetadata(opCtx, std::move(newCollectionMetadata));
525
526 return css->getMetadata()->getShardVersion();
527 }
528
registerDonateChunk(const MoveChunkRequest & args)529 StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk(
530 const MoveChunkRequest& args) {
531 return _activeMigrationsRegistry.registerDonateChunk(args);
532 }
533
registerReceiveChunk(const NamespaceString & nss,const ChunkRange & chunkRange,const ShardId & fromShardId)534 StatusWith<ScopedRegisterReceiveChunk> ShardingState::registerReceiveChunk(
535 const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) {
536 return _activeMigrationsRegistry.registerReceiveChunk(nss, chunkRange, fromShardId);
537 }
538
getActiveDonateChunkNss()539 boost::optional<NamespaceString> ShardingState::getActiveDonateChunkNss() {
540 return _activeMigrationsRegistry.getActiveDonateChunkNss();
541 }
542
getActiveMigrationStatusReport(OperationContext * opCtx)543 BSONObj ShardingState::getActiveMigrationStatusReport(OperationContext* opCtx) {
544 return _activeMigrationsRegistry.getActiveMigrationStatusReport(opCtx);
545 }
546
appendInfo(OperationContext * opCtx,BSONObjBuilder & builder)547 void ShardingState::appendInfo(OperationContext* opCtx, BSONObjBuilder& builder) {
548 const bool isEnabled = enabled();
549 builder.appendBool("enabled", isEnabled);
550 if (!isEnabled)
551 return;
552
553 stdx::lock_guard<stdx::mutex> lk(_mutex);
554
555 builder.append("configServer",
556 Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString().toString());
557 builder.append("shardName", _shardName);
558 builder.append("clusterId", _clusterId);
559 }
560
needCollectionMetadata(OperationContext * opCtx,const string & ns)561 bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const string& ns) {
562 if (!enabled())
563 return false;
564
565 Client* client = opCtx->getClient();
566
567 // Shard version information received from mongos may either by attached to the Client or
568 // directly to the OperationContext.
569 return ShardedConnectionInfo::get(client, false) ||
570 OperationShardingState::get(opCtx).hasShardVersion();
571 }
572
updateShardIdentityConfigString(OperationContext * opCtx,const std::string & newConnectionString)573 Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx,
574 const std::string& newConnectionString) {
575 BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString));
576
577 UpdateRequest updateReq(NamespaceString::kServerConfigurationNamespace);
578 updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName));
579 updateReq.setUpdates(updateObj);
580 UpdateLifecycleImpl updateLifecycle(NamespaceString::kServerConfigurationNamespace);
581 updateReq.setLifecycle(&updateLifecycle);
582
583 try {
584 AutoGetOrCreateDb autoDb(
585 opCtx, NamespaceString::kServerConfigurationNamespace.db(), MODE_X);
586
587 auto result = update(opCtx, autoDb.getDb(), updateReq);
588 if (result.numMatched == 0) {
589 warning() << "failed to update config string of shard identity document because "
590 << "it does not exist. This shard could have been removed from the cluster";
591 } else {
592 LOG(2) << "Updated config server connection string in shardIdentity document to"
593 << newConnectionString;
594 }
595 } catch (const DBException& exception) {
596 return exception.toStatus();
597 }
598
599 return Status::OK();
600 }
601
getRangeDeleterTaskExecutor()602 executor::TaskExecutor* ShardingState::getRangeDeleterTaskExecutor() {
603 stdx::lock_guard<stdx::mutex> lk(_rangeDeleterExecutor.lock);
604 if (_rangeDeleterExecutor.taskExecutor.get() == nullptr) {
605 static const char kExecName[] = "NetworkInterfaceCollectionRangeDeleter-TaskExecutor";
606 auto net = executor::makeNetworkInterface(kExecName);
607 auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
608 _rangeDeleterExecutor.taskExecutor =
609 stdx::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
610 _rangeDeleterExecutor.taskExecutor->startup();
611 }
612 return _rangeDeleterExecutor.taskExecutor.get();
613 }
614
~RangeDeleterExecutor()615 ShardingState::RangeDeleterExecutor::~RangeDeleterExecutor() {
616 if (taskExecutor) {
617 taskExecutor->shutdown();
618 taskExecutor->join();
619 }
620 }
621
622 } // namespace mongo
623