1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/s/catalog/sharding_catalog_client_impl.h"
36
37 #include <iomanip>
38 #include <pcrecpp.h>
39
40 #include "mongo/base/status.h"
41 #include "mongo/base/status_with.h"
42 #include "mongo/bson/bsonobjbuilder.h"
43 #include "mongo/bson/util/bson_extract.h"
44 #include "mongo/client/read_preference.h"
45 #include "mongo/client/remote_command_targeter.h"
46 #include "mongo/client/replica_set_monitor.h"
47 #include "mongo/db/audit.h"
48 #include "mongo/db/client.h"
49 #include "mongo/db/commands.h"
50 #include "mongo/db/namespace_string.h"
51 #include "mongo/db/operation_context.h"
52 #include "mongo/db/repl/optime.h"
53 #include "mongo/db/repl/read_concern_args.h"
54 #include "mongo/db/s/type_shard_identity.h"
55 #include "mongo/executor/network_interface.h"
56 #include "mongo/executor/task_executor.h"
57 #include "mongo/rpc/get_status_from_command_result.h"
58 #include "mongo/rpc/metadata/repl_set_metadata.h"
59 #include "mongo/s/catalog/config_server_version.h"
60 #include "mongo/s/catalog/dist_lock_manager.h"
61 #include "mongo/s/catalog/type_changelog.h"
62 #include "mongo/s/catalog/type_chunk.h"
63 #include "mongo/s/catalog/type_collection.h"
64 #include "mongo/s/catalog/type_config_version.h"
65 #include "mongo/s/catalog/type_database.h"
66 #include "mongo/s/catalog/type_shard.h"
67 #include "mongo/s/catalog/type_tags.h"
68 #include "mongo/s/client/shard.h"
69 #include "mongo/s/client/shard_connection.h"
70 #include "mongo/s/client/shard_registry.h"
71 #include "mongo/s/grid.h"
72 #include "mongo/s/set_shard_version_request.h"
73 #include "mongo/s/shard_key_pattern.h"
74 #include "mongo/s/shard_util.h"
75 #include "mongo/s/write_ops/batched_command_request.h"
76 #include "mongo/s/write_ops/batched_command_response.h"
77 #include "mongo/util/assert_util.h"
78 #include "mongo/util/fail_point_service.h"
79 #include "mongo/util/log.h"
80 #include "mongo/util/mongoutils/str.h"
81 #include "mongo/util/net/hostandport.h"
82 #include "mongo/util/time_support.h"
83
84 namespace mongo {
85
86 MONGO_FP_DECLARE(failApplyChunkOps);
87
88 using repl::OpTime;
89 using std::set;
90 using std::shared_ptr;
91 using std::string;
92 using std::unique_ptr;
93 using std::vector;
94 using str::stream;
95
96 namespace {
97
98 const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
99 const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred,
100 TagSet{});
101 const int kMaxReadRetry = 3;
102 const int kMaxWriteRetry = 3;
103
104 const std::string kActionLogCollectionName("actionlog");
105 const int kActionLogCollectionSizeMB = 20 * 1024 * 1024;
106
107 const std::string kChangeLogCollectionName("changelog");
108 const int kChangeLogCollectionSizeMB = 200 * 1024 * 1024;
109
110 const NamespaceString kSettingsNamespace("config", "settings");
111
toBatchError(const Status & status,BatchedCommandResponse * response)112 void toBatchError(const Status& status, BatchedCommandResponse* response) {
113 response->clear();
114 response->setErrCode(status.code());
115 response->setErrMessage(status.reason());
116 response->setOk(false);
117 }
118
119 } // namespace
120
ShardingCatalogClientImpl(std::unique_ptr<DistLockManager> distLockManager)121 ShardingCatalogClientImpl::ShardingCatalogClientImpl(
122 std::unique_ptr<DistLockManager> distLockManager)
123 : _distLockManager(std::move(distLockManager)) {}
124
125 ShardingCatalogClientImpl::~ShardingCatalogClientImpl() = default;
126
startup()127 void ShardingCatalogClientImpl::startup() {
128 stdx::lock_guard<stdx::mutex> lk(_mutex);
129 if (_started) {
130 return;
131 }
132
133 _started = true;
134 _distLockManager->startUp();
135 }
136
shutDown(OperationContext * opCtx)137 void ShardingCatalogClientImpl::shutDown(OperationContext* opCtx) {
138 LOG(1) << "ShardingCatalogClientImpl::shutDown() called.";
139 {
140 stdx::lock_guard<stdx::mutex> lk(_mutex);
141 _inShutdown = true;
142 }
143
144 invariant(_distLockManager);
145 _distLockManager->shutDown(opCtx);
146 }
147
updateShardingCatalogEntryForCollection(OperationContext * opCtx,const std::string & collNs,const CollectionType & coll,const bool upsert)148 Status ShardingCatalogClientImpl::updateShardingCatalogEntryForCollection(
149 OperationContext* opCtx,
150 const std::string& collNs,
151 const CollectionType& coll,
152 const bool upsert) {
153 fassert(28634, coll.validate());
154
155 auto status = _updateConfigDocument(opCtx,
156 CollectionType::ConfigNS,
157 BSON(CollectionType::fullNs(collNs)),
158 coll.toBSON(),
159 upsert,
160 ShardingCatalogClient::kMajorityWriteConcern);
161 if (!status.isOK()) {
162 return {status.getStatus().code(),
163 str::stream() << "Collection metadata write failed due to "
164 << status.getStatus().reason()};
165 }
166
167 return Status::OK();
168 }
169
updateDatabase(OperationContext * opCtx,const std::string & dbName,const DatabaseType & db)170 Status ShardingCatalogClientImpl::updateDatabase(OperationContext* opCtx,
171 const std::string& dbName,
172 const DatabaseType& db) {
173 fassert(28616, db.validate());
174
175 auto status = updateConfigDocument(opCtx,
176 DatabaseType::ConfigNS,
177 BSON(DatabaseType::name(dbName)),
178 db.toBSON(),
179 true,
180 ShardingCatalogClient::kMajorityWriteConcern);
181 if (!status.isOK()) {
182 return {status.getStatus().code(),
183 str::stream() << "Database metadata write failed due to "
184 << status.getStatus().reason()};
185 }
186
187 return Status::OK();
188 }
189
logAction(OperationContext * opCtx,const std::string & what,const std::string & ns,const BSONObj & detail)190 Status ShardingCatalogClientImpl::logAction(OperationContext* opCtx,
191 const std::string& what,
192 const std::string& ns,
193 const BSONObj& detail) {
194 if (_actionLogCollectionCreated.load() == 0) {
195 Status result = _createCappedConfigCollection(opCtx,
196 kActionLogCollectionName,
197 kActionLogCollectionSizeMB,
198 ShardingCatalogClient::kMajorityWriteConcern);
199 if (result.isOK()) {
200 _actionLogCollectionCreated.store(1);
201 } else {
202 log() << "couldn't create config.actionlog collection:" << causedBy(result);
203 return result;
204 }
205 }
206
207 return _log(opCtx,
208 kActionLogCollectionName,
209 what,
210 ns,
211 detail,
212 ShardingCatalogClient::kMajorityWriteConcern);
213 }
214
logChange(OperationContext * opCtx,const std::string & what,const std::string & ns,const BSONObj & detail,const WriteConcernOptions & writeConcern)215 Status ShardingCatalogClientImpl::logChange(OperationContext* opCtx,
216 const std::string& what,
217 const std::string& ns,
218 const BSONObj& detail,
219 const WriteConcernOptions& writeConcern) {
220 invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
221 writeConcern.wMode == WriteConcernOptions::kMajority);
222 if (_changeLogCollectionCreated.load() == 0) {
223 Status result = _createCappedConfigCollection(
224 opCtx, kChangeLogCollectionName, kChangeLogCollectionSizeMB, writeConcern);
225 if (result.isOK()) {
226 _changeLogCollectionCreated.store(1);
227 } else {
228 log() << "couldn't create config.changelog collection:" << causedBy(result);
229 return result;
230 }
231 }
232
233 return _log(opCtx, kChangeLogCollectionName, what, ns, detail, writeConcern);
234 }
235
_log(OperationContext * opCtx,const StringData & logCollName,const std::string & what,const std::string & operationNS,const BSONObj & detail,const WriteConcernOptions & writeConcern)236 Status ShardingCatalogClientImpl::_log(OperationContext* opCtx,
237 const StringData& logCollName,
238 const std::string& what,
239 const std::string& operationNS,
240 const BSONObj& detail,
241 const WriteConcernOptions& writeConcern) {
242 Date_t now = Grid::get(opCtx)->getNetwork()->now();
243 const std::string hostName = Grid::get(opCtx)->getNetwork()->getHostName();
244 const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen();
245
246 ChangeLogType changeLog;
247 changeLog.setChangeId(changeId);
248 changeLog.setServer(hostName);
249 changeLog.setClientAddr(opCtx->getClient()->clientAddress(true));
250 changeLog.setTime(now);
251 changeLog.setNS(operationNS);
252 changeLog.setWhat(what);
253 changeLog.setDetails(detail);
254
255 BSONObj changeLogBSON = changeLog.toBSON();
256 log() << "about to log metadata event into " << logCollName << ": " << redact(changeLogBSON);
257
258 const NamespaceString nss("config", logCollName);
259 Status result = insertConfigDocument(opCtx, nss.ns(), changeLogBSON, writeConcern);
260
261 if (!result.isOK()) {
262 warning() << "Error encountered while logging config change with ID [" << changeId
263 << "] into collection " << logCollName << ": " << redact(result);
264 }
265
266 return result;
267 }
268
getDatabase(OperationContext * opCtx,const std::string & dbName,repl::ReadConcernLevel readConcernLevel)269 StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::getDatabase(
270 OperationContext* opCtx, const std::string& dbName, repl::ReadConcernLevel readConcernLevel) {
271 if (!NamespaceString::validDBName(dbName, NamespaceString::DollarInDbNameBehavior::Allow)) {
272 return {ErrorCodes::InvalidNamespace, stream() << dbName << " is not a valid db name"};
273 }
274
275 // The admin database is always hosted on the config server.
276 if (dbName == "admin") {
277 DatabaseType dbt;
278 dbt.setName(dbName);
279 dbt.setSharded(false);
280 dbt.setPrimary(ShardRegistry::kConfigServerShardId);
281
282 return repl::OpTimeWith<DatabaseType>(dbt);
283 }
284
285 // The config database's primary shard is always config, and it is always sharded.
286 if (dbName == "config") {
287 DatabaseType dbt;
288 dbt.setName(dbName);
289 dbt.setSharded(true);
290 dbt.setPrimary(ShardRegistry::kConfigServerShardId);
291
292 return repl::OpTimeWith<DatabaseType>(dbt);
293 }
294
295 auto result = _fetchDatabaseMetadata(opCtx, dbName, kConfigReadSelector, readConcernLevel);
296 if (result == ErrorCodes::NamespaceNotFound) {
297 // If we failed to find the database metadata on the 'nearest' config server, try again
298 // against the primary, in case the database was recently created.
299 result = _fetchDatabaseMetadata(
300 opCtx, dbName, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, readConcernLevel);
301 if (!result.isOK() && (result != ErrorCodes::NamespaceNotFound)) {
302 return {result.getStatus().code(),
303 str::stream() << "Could not confirm non-existence of database " << dbName
304 << " due to "
305 << result.getStatus().reason()};
306 }
307 }
308
309 return result;
310 }
311
_fetchDatabaseMetadata(OperationContext * opCtx,const std::string & dbName,const ReadPreferenceSetting & readPref,repl::ReadConcernLevel readConcernLevel)312 StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchDatabaseMetadata(
313 OperationContext* opCtx,
314 const std::string& dbName,
315 const ReadPreferenceSetting& readPref,
316 repl::ReadConcernLevel readConcernLevel) {
317 dassert(dbName != "admin" && dbName != "config");
318
319 auto findStatus = _exhaustiveFindOnConfig(opCtx,
320 readPref,
321 readConcernLevel,
322 NamespaceString(DatabaseType::ConfigNS),
323 BSON(DatabaseType::name(dbName)),
324 BSONObj(),
325 boost::none);
326 if (!findStatus.isOK()) {
327 return findStatus.getStatus();
328 }
329
330 const auto& docsWithOpTime = findStatus.getValue();
331 if (docsWithOpTime.value.empty()) {
332 return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"};
333 }
334
335 invariant(docsWithOpTime.value.size() == 1);
336
337 auto parseStatus = DatabaseType::fromBSON(docsWithOpTime.value.front());
338 if (!parseStatus.isOK()) {
339 return parseStatus.getStatus();
340 }
341
342 return repl::OpTimeWith<DatabaseType>(parseStatus.getValue(), docsWithOpTime.opTime);
343 }
344
getCollection(OperationContext * opCtx,const std::string & collNs)345 StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getCollection(
346 OperationContext* opCtx, const std::string& collNs) {
347 auto statusFind = _exhaustiveFindOnConfig(opCtx,
348 kConfigReadSelector,
349 repl::ReadConcernLevel::kMajorityReadConcern,
350 NamespaceString(CollectionType::ConfigNS),
351 BSON(CollectionType::fullNs(collNs)),
352 BSONObj(),
353 1);
354 if (!statusFind.isOK()) {
355 return statusFind.getStatus();
356 }
357
358 const auto& retOpTimePair = statusFind.getValue();
359 const auto& retVal = retOpTimePair.value;
360 if (retVal.empty()) {
361 return Status(ErrorCodes::NamespaceNotFound,
362 stream() << "collection " << collNs << " not found");
363 }
364
365 invariant(retVal.size() == 1);
366
367 auto parseStatus = CollectionType::fromBSON(retVal.front());
368 if (!parseStatus.isOK()) {
369 return parseStatus.getStatus();
370 }
371
372 auto collType = parseStatus.getValue();
373 if (collType.getDropped()) {
374 return Status(ErrorCodes::NamespaceNotFound,
375 stream() << "collection " << collNs << " was dropped");
376 }
377
378 return repl::OpTimeWith<CollectionType>(collType, retOpTimePair.opTime);
379 }
380
getCollections(OperationContext * opCtx,const std::string * dbName,std::vector<CollectionType> * collections,OpTime * opTime)381 Status ShardingCatalogClientImpl::getCollections(OperationContext* opCtx,
382 const std::string* dbName,
383 std::vector<CollectionType>* collections,
384 OpTime* opTime) {
385 BSONObjBuilder b;
386 if (dbName) {
387 invariant(!dbName->empty());
388 b.appendRegex(CollectionType::fullNs(),
389 string(str::stream() << "^" << pcrecpp::RE::QuoteMeta(*dbName) << "\\."));
390 }
391
392 auto findStatus = _exhaustiveFindOnConfig(opCtx,
393 kConfigReadSelector,
394 repl::ReadConcernLevel::kMajorityReadConcern,
395 NamespaceString(CollectionType::ConfigNS),
396 b.obj(),
397 BSONObj(),
398 boost::none); // no limit
399 if (!findStatus.isOK()) {
400 return findStatus.getStatus();
401 }
402
403 const auto& docsOpTimePair = findStatus.getValue();
404
405 for (const BSONObj& obj : docsOpTimePair.value) {
406 const auto collectionResult = CollectionType::fromBSON(obj);
407 if (!collectionResult.isOK()) {
408 collections->clear();
409 return {ErrorCodes::FailedToParse,
410 str::stream() << "error while parsing " << CollectionType::ConfigNS
411 << " document: "
412 << obj
413 << " : "
414 << collectionResult.getStatus().toString()};
415 }
416
417 collections->push_back(collectionResult.getValue());
418 }
419
420 if (opTime) {
421 *opTime = docsOpTimePair.opTime;
422 }
423
424 return Status::OK();
425 }
426
dropCollection(OperationContext * opCtx,const NamespaceString & ns)427 Status ShardingCatalogClientImpl::dropCollection(OperationContext* opCtx,
428 const NamespaceString& ns) {
429 logChange(opCtx,
430 "dropCollection.start",
431 ns.ns(),
432 BSONObj(),
433 ShardingCatalogClientImpl::kMajorityWriteConcern)
434 .ignore();
435
436 auto shardsStatus = getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern);
437 if (!shardsStatus.isOK()) {
438 return shardsStatus.getStatus();
439 }
440 vector<ShardType> allShards = std::move(shardsStatus.getValue().value);
441
442 LOG(1) << "dropCollection " << ns << " started";
443
444 const auto dropCommandBSON = [opCtx, &ns] {
445 BSONObjBuilder builder;
446 builder.append("drop", ns.coll());
447
448 if (!opCtx->getWriteConcern().usedDefault) {
449 builder.append(WriteConcernOptions::kWriteConcernField,
450 opCtx->getWriteConcern().toBSON());
451 }
452
453 return builder.obj();
454 }();
455
456 std::map<std::string, BSONObj> errors;
457 auto* const shardRegistry = Grid::get(opCtx)->shardRegistry();
458
459 for (const auto& shardEntry : allShards) {
460 auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName());
461 if (!swShard.isOK()) {
462 return swShard.getStatus();
463 }
464
465 const auto& shard = swShard.getValue();
466
467 auto swDropResult = shard->runCommandWithFixedRetryAttempts(
468 opCtx,
469 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
470 ns.db().toString(),
471 dropCommandBSON,
472 Shard::RetryPolicy::kIdempotent);
473
474 if (!swDropResult.isOK()) {
475 return {swDropResult.getStatus().code(),
476 str::stream() << swDropResult.getStatus().reason() << " at "
477 << shardEntry.getName()};
478 }
479
480 auto& dropResult = swDropResult.getValue();
481
482 auto dropStatus = std::move(dropResult.commandStatus);
483 auto wcStatus = std::move(dropResult.writeConcernStatus);
484 if (!dropStatus.isOK() || !wcStatus.isOK()) {
485 if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) {
486 // Generally getting NamespaceNotFound is okay to ignore as it simply means that
487 // the collection has already been dropped or doesn't exist on this shard.
488 // If, however, we get NamespaceNotFound but also have a write concern error then we
489 // can't confirm whether the fact that the namespace doesn't exist is actually
490 // committed. Thus we must still fail on NamespaceNotFound if there is also a write
491 // concern error. This can happen if we call drop, it succeeds but with a write
492 // concern error, then we retry the drop.
493 continue;
494 }
495
496 errors.emplace(shardEntry.getHost(), std::move(dropResult.response));
497 }
498 }
499
500 if (!errors.empty()) {
501 StringBuilder sb;
502 sb << "Dropping collection failed on the following hosts: ";
503
504 for (auto it = errors.cbegin(); it != errors.cend(); ++it) {
505 if (it != errors.cbegin()) {
506 sb << ", ";
507 }
508
509 sb << it->first << ": " << it->second;
510 }
511
512 return {ErrorCodes::OperationFailed, sb.str()};
513 }
514
515 LOG(1) << "dropCollection " << ns << " shard data deleted";
516
517 // Remove chunk data
518 Status result = removeConfigDocuments(opCtx,
519 ChunkType::ConfigNS,
520 BSON(ChunkType::ns(ns.ns())),
521 ShardingCatalogClient::kMajorityWriteConcern);
522 if (!result.isOK()) {
523 return result;
524 }
525
526 LOG(1) << "dropCollection " << ns << " chunk data deleted";
527
528 // Mark the collection as dropped
529 CollectionType coll;
530 coll.setNs(ns);
531 coll.setDropped(true);
532 coll.setEpoch(ChunkVersion::DROPPED().epoch());
533 coll.setUpdatedAt(Grid::get(opCtx)->getNetwork()->now());
534
535 const bool upsert = false;
536 result = updateShardingCatalogEntryForCollection(opCtx, ns.ns(), coll, upsert);
537 if (!result.isOK()) {
538 return result;
539 }
540
541 LOG(1) << "dropCollection " << ns << " collection marked as dropped";
542
543 for (const auto& shardEntry : allShards) {
544 auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName());
545 if (!swShard.isOK()) {
546 return swShard.getStatus();
547 }
548
549 const auto& shard = swShard.getValue();
550
551 SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
552 shardRegistry->getConfigServerConnectionString(),
553 shardEntry.getName(),
554 fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())),
555 ns,
556 ChunkVersion::DROPPED(),
557 true);
558
559 auto ssvResult = shard->runCommandWithFixedRetryAttempts(
560 opCtx,
561 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
562 "admin",
563 ssv.toBSON(),
564 Shard::RetryPolicy::kIdempotent);
565
566 if (!ssvResult.isOK()) {
567 return ssvResult.getStatus();
568 }
569
570 auto ssvStatus = std::move(ssvResult.getValue().commandStatus);
571 if (!ssvStatus.isOK()) {
572 return ssvStatus;
573 }
574
575 auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts(
576 opCtx,
577 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
578 "admin",
579 BSON("unsetSharding" << 1),
580 Shard::RetryPolicy::kIdempotent);
581
582 if (!unsetShardingStatus.isOK()) {
583 return unsetShardingStatus.getStatus();
584 }
585
586 auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus);
587 if (!unsetShardingResult.isOK()) {
588 return unsetShardingResult;
589 }
590 }
591
592 LOG(1) << "dropCollection " << ns << " completed";
593
594 logChange(opCtx,
595 "dropCollection",
596 ns.ns(),
597 BSONObj(),
598 ShardingCatalogClientImpl::kMajorityWriteConcern)
599 .ignore();
600
601 return Status::OK();
602 }
603
getGlobalSettings(OperationContext * opCtx,StringData key)604 StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContext* opCtx,
605 StringData key) {
606 auto findStatus = _exhaustiveFindOnConfig(opCtx,
607 kConfigReadSelector,
608 repl::ReadConcernLevel::kMajorityReadConcern,
609 kSettingsNamespace,
610 BSON("_id" << key),
611 BSONObj(),
612 1);
613 if (!findStatus.isOK()) {
614 return findStatus.getStatus();
615 }
616
617 const auto& docs = findStatus.getValue().value;
618 if (docs.empty()) {
619 return {ErrorCodes::NoMatchingDocument,
620 str::stream() << "can't find settings document with key: " << key};
621 }
622
623 invariant(docs.size() == 1);
624 return docs.front();
625 }
626
getConfigVersion(OperationContext * opCtx,repl::ReadConcernLevel readConcern)627 StatusWith<VersionType> ShardingCatalogClientImpl::getConfigVersion(
628 OperationContext* opCtx, repl::ReadConcernLevel readConcern) {
629 auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
630 opCtx,
631 kConfigReadSelector,
632 readConcern,
633 NamespaceString(VersionType::ConfigNS),
634 BSONObj(),
635 BSONObj(),
636 boost::none /* no limit */);
637 if (!findStatus.isOK()) {
638 return findStatus.getStatus();
639 }
640
641 auto queryResults = findStatus.getValue().docs;
642
643 if (queryResults.size() > 1) {
644 return {ErrorCodes::TooManyMatchingDocuments,
645 str::stream() << "should only have 1 document in " << VersionType::ConfigNS};
646 }
647
648 if (queryResults.empty()) {
649 VersionType versionInfo;
650 versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
651 versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion);
652 versionInfo.setClusterId(OID{});
653 return versionInfo;
654 }
655
656 BSONObj versionDoc = queryResults.front();
657 auto versionTypeResult = VersionType::fromBSON(versionDoc);
658 if (!versionTypeResult.isOK()) {
659 return {versionTypeResult.getStatus().code(),
660 str::stream() << "Unable to parse config.version document " << versionDoc
661 << " due to "
662 << versionTypeResult.getStatus().reason()};
663 }
664
665 auto validationStatus = versionTypeResult.getValue().validate();
666 if (!validationStatus.isOK()) {
667 return Status(validationStatus.code(),
668 str::stream() << "Unable to validate config.version document " << versionDoc
669 << " due to "
670 << validationStatus.reason());
671 }
672
673 return versionTypeResult.getValue();
674 }
675
getDatabasesForShard(OperationContext * opCtx,const ShardId & shardId,vector<string> * dbs)676 Status ShardingCatalogClientImpl::getDatabasesForShard(OperationContext* opCtx,
677 const ShardId& shardId,
678 vector<string>* dbs) {
679 auto findStatus = _exhaustiveFindOnConfig(opCtx,
680 kConfigReadSelector,
681 repl::ReadConcernLevel::kMajorityReadConcern,
682 NamespaceString(DatabaseType::ConfigNS),
683 BSON(DatabaseType::primary(shardId.toString())),
684 BSONObj(),
685 boost::none); // no limit
686 if (!findStatus.isOK()) {
687 return findStatus.getStatus();
688 }
689
690 for (const BSONObj& obj : findStatus.getValue().value) {
691 string dbName;
692 Status status = bsonExtractStringField(obj, DatabaseType::name(), &dbName);
693 if (!status.isOK()) {
694 dbs->clear();
695 return status;
696 }
697
698 dbs->push_back(dbName);
699 }
700
701 return Status::OK();
702 }
703
getChunks(OperationContext * opCtx,const BSONObj & query,const BSONObj & sort,boost::optional<int> limit,vector<ChunkType> * chunks,OpTime * opTime,repl::ReadConcernLevel readConcern)704 Status ShardingCatalogClientImpl::getChunks(OperationContext* opCtx,
705 const BSONObj& query,
706 const BSONObj& sort,
707 boost::optional<int> limit,
708 vector<ChunkType>* chunks,
709 OpTime* opTime,
710 repl::ReadConcernLevel readConcern) {
711 invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
712 readConcern == repl::ReadConcernLevel::kMajorityReadConcern);
713 chunks->clear();
714
715 // Convert boost::optional<int> to boost::optional<long long>.
716 auto longLimit = limit ? boost::optional<long long>(*limit) : boost::none;
717 auto findStatus = _exhaustiveFindOnConfig(opCtx,
718 kConfigReadSelector,
719 readConcern,
720 NamespaceString(ChunkType::ConfigNS),
721 query,
722 sort,
723 longLimit);
724 if (!findStatus.isOK()) {
725 return {findStatus.getStatus().code(),
726 str::stream() << "Failed to load chunks due to "
727 << findStatus.getStatus().reason()};
728 }
729
730 const auto& chunkDocsOpTimePair = findStatus.getValue();
731 for (const BSONObj& obj : chunkDocsOpTimePair.value) {
732 auto chunkRes = ChunkType::fromConfigBSON(obj);
733 if (!chunkRes.isOK()) {
734 chunks->clear();
735 return {chunkRes.getStatus().code(),
736 stream() << "Failed to parse chunk with id " << obj[ChunkType::name()]
737 << " due to "
738 << chunkRes.getStatus().reason()};
739 }
740
741 chunks->push_back(chunkRes.getValue());
742 }
743
744 if (opTime) {
745 *opTime = chunkDocsOpTimePair.opTime;
746 }
747
748 return Status::OK();
749 }
750
getTagsForCollection(OperationContext * opCtx,const std::string & collectionNs,std::vector<TagsType> * tags)751 Status ShardingCatalogClientImpl::getTagsForCollection(OperationContext* opCtx,
752 const std::string& collectionNs,
753 std::vector<TagsType>* tags) {
754 tags->clear();
755
756 auto findStatus = _exhaustiveFindOnConfig(opCtx,
757 kConfigReadSelector,
758 repl::ReadConcernLevel::kMajorityReadConcern,
759 NamespaceString(TagsType::ConfigNS),
760 BSON(TagsType::ns(collectionNs)),
761 BSON(TagsType::min() << 1),
762 boost::none); // no limit
763 if (!findStatus.isOK()) {
764 return {findStatus.getStatus().code(),
765 str::stream() << "Failed to load tags due to " << findStatus.getStatus().reason()};
766 }
767
768 const auto& tagDocsOpTimePair = findStatus.getValue();
769 for (const BSONObj& obj : tagDocsOpTimePair.value) {
770 auto tagRes = TagsType::fromBSON(obj);
771 if (!tagRes.isOK()) {
772 tags->clear();
773 return {tagRes.getStatus().code(),
774 str::stream() << "Failed to parse tag with id " << obj[TagsType::tag()]
775 << " due to "
776 << tagRes.getStatus().toString()};
777 }
778
779 tags->push_back(tagRes.getValue());
780 }
781
782 return Status::OK();
783 }
784
getAllShards(OperationContext * opCtx,repl::ReadConcernLevel readConcern)785 StatusWith<repl::OpTimeWith<std::vector<ShardType>>> ShardingCatalogClientImpl::getAllShards(
786 OperationContext* opCtx, repl::ReadConcernLevel readConcern) {
787 std::vector<ShardType> shards;
788 auto findStatus = _exhaustiveFindOnConfig(opCtx,
789 kConfigReadSelector,
790 readConcern,
791 NamespaceString(ShardType::ConfigNS),
792 BSONObj(), // no query filter
793 BSONObj(), // no sort
794 boost::none); // no limit
795 if (!findStatus.isOK()) {
796 return findStatus.getStatus();
797 }
798
799 for (const BSONObj& doc : findStatus.getValue().value) {
800 auto shardRes = ShardType::fromBSON(doc);
801 if (!shardRes.isOK()) {
802 return {shardRes.getStatus().code(),
803 stream() << "Failed to parse shard document " << doc << " due to "
804 << shardRes.getStatus().reason()};
805 }
806
807 Status validateStatus = shardRes.getValue().validate();
808 if (!validateStatus.isOK()) {
809 return {validateStatus.code(),
810 stream() << "Failed to validate shard document " << doc << " due to "
811 << validateStatus.reason()};
812 }
813
814 shards.push_back(shardRes.getValue());
815 }
816
817 return repl::OpTimeWith<std::vector<ShardType>>{std::move(shards),
818 findStatus.getValue().opTime};
819 }
820
runUserManagementWriteCommand(OperationContext * opCtx,const std::string & commandName,const std::string & dbname,const BSONObj & cmdObj,BSONObjBuilder * result)821 bool ShardingCatalogClientImpl::runUserManagementWriteCommand(OperationContext* opCtx,
822 const std::string& commandName,
823 const std::string& dbname,
824 const BSONObj& cmdObj,
825 BSONObjBuilder* result) {
826 BSONObj cmdToRun = cmdObj;
827 {
828 // Make sure that if the command has a write concern that it is w:1 or w:majority, and
829 // convert w:1 or no write concern to w:majority before sending.
830 WriteConcernOptions writeConcern;
831 writeConcern.reset();
832
833 BSONElement writeConcernElement = cmdObj[WriteConcernOptions::kWriteConcernField];
834 bool initialCmdHadWriteConcern = !writeConcernElement.eoo();
835 if (initialCmdHadWriteConcern) {
836 Status status = writeConcern.parse(writeConcernElement.Obj());
837 if (!status.isOK()) {
838 return Command::appendCommandStatus(*result, status);
839 }
840
841 if (!(writeConcern.wNumNodes == 1 ||
842 writeConcern.wMode == WriteConcernOptions::kMajority)) {
843 return Command::appendCommandStatus(
844 *result,
845 {ErrorCodes::InvalidOptions,
846 str::stream() << "Invalid replication write concern. User management write "
847 "commands may only use w:1 or w:'majority', got: "
848 << writeConcern.toBSON()});
849 }
850 }
851
852 writeConcern.wMode = WriteConcernOptions::kMajority;
853 writeConcern.wNumNodes = 0;
854
855 BSONObjBuilder modifiedCmd;
856 if (!initialCmdHadWriteConcern) {
857 modifiedCmd.appendElements(cmdObj);
858 } else {
859 BSONObjIterator cmdObjIter(cmdObj);
860 while (cmdObjIter.more()) {
861 BSONElement e = cmdObjIter.next();
862 if (WriteConcernOptions::kWriteConcernField == e.fieldName()) {
863 continue;
864 }
865 modifiedCmd.append(e);
866 }
867 }
868 modifiedCmd.append(WriteConcernOptions::kWriteConcernField, writeConcern.toBSON());
869 cmdToRun = modifiedCmd.obj();
870 }
871
872 auto response =
873 Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
874 opCtx,
875 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
876 dbname,
877 cmdToRun,
878 Shard::kDefaultConfigCommandTimeout,
879 Shard::RetryPolicy::kNotIdempotent);
880
881 if (!response.isOK()) {
882 return Command::appendCommandStatus(*result, response.getStatus());
883 }
884 if (!response.getValue().commandStatus.isOK()) {
885 return Command::appendCommandStatus(*result, response.getValue().commandStatus);
886 }
887 if (!response.getValue().writeConcernStatus.isOK()) {
888 return Command::appendCommandStatus(*result, response.getValue().writeConcernStatus);
889 }
890
891 Command::filterCommandReplyForPassthrough(response.getValue().response, result);
892 return true;
893 }
894
runReadCommandForTest(OperationContext * opCtx,const std::string & dbname,const BSONObj & cmdObj,BSONObjBuilder * result)895 bool ShardingCatalogClientImpl::runReadCommandForTest(OperationContext* opCtx,
896 const std::string& dbname,
897 const BSONObj& cmdObj,
898 BSONObjBuilder* result) {
899 BSONObjBuilder cmdBuilder;
900 cmdBuilder.appendElements(cmdObj);
901 _appendReadConcern(&cmdBuilder);
902
903 auto resultStatus =
904 Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
905 opCtx, kConfigReadSelector, dbname, cmdBuilder.done(), Shard::RetryPolicy::kIdempotent);
906 if (resultStatus.isOK()) {
907 result->appendElements(resultStatus.getValue().response);
908 return resultStatus.getValue().commandStatus.isOK();
909 }
910
911 return Command::appendCommandStatus(*result, resultStatus.getStatus());
912 }
913
runUserManagementReadCommand(OperationContext * opCtx,const std::string & dbname,const BSONObj & cmdObj,BSONObjBuilder * result)914 bool ShardingCatalogClientImpl::runUserManagementReadCommand(OperationContext* opCtx,
915 const std::string& dbname,
916 const BSONObj& cmdObj,
917 BSONObjBuilder* result) {
918 auto resultStatus =
919 Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
920 opCtx,
921 kConfigPrimaryPreferredSelector,
922 dbname,
923 cmdObj,
924 Shard::kDefaultConfigCommandTimeout,
925 Shard::RetryPolicy::kIdempotent);
926 if (resultStatus.isOK()) {
927 Command::filterCommandReplyForPassthrough(resultStatus.getValue().response, result);
928 return resultStatus.getValue().commandStatus.isOK();
929 }
930
931 return Command::appendCommandStatus(*result, resultStatus.getStatus());
932 }
933
applyChunkOpsDeprecated(OperationContext * opCtx,const BSONArray & updateOps,const BSONArray & preCondition,const std::string & nss,const ChunkVersion & lastChunkVersion,const WriteConcernOptions & writeConcern,repl::ReadConcernLevel readConcern)934 Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* opCtx,
935 const BSONArray& updateOps,
936 const BSONArray& preCondition,
937 const std::string& nss,
938 const ChunkVersion& lastChunkVersion,
939 const WriteConcernOptions& writeConcern,
940 repl::ReadConcernLevel readConcern) {
941 invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
942 (readConcern == repl::ReadConcernLevel::kMajorityReadConcern &&
943 writeConcern.wMode == WriteConcernOptions::kMajority));
944 BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition
945 << WriteConcernOptions::kWriteConcernField
946 << writeConcern.toBSON());
947
948 auto response =
949 Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
950 opCtx,
951 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
952 "config",
953 cmd,
954 Shard::RetryPolicy::kIdempotent);
955
956 if (!response.isOK()) {
957 return response.getStatus();
958 }
959
960 Status status = response.getValue().commandStatus.isOK()
961 ? std::move(response.getValue().writeConcernStatus)
962 : std::move(response.getValue().commandStatus);
963
964 // TODO (Dianna) This fail point needs to be reexamined when CommitChunkMigration is in:
965 // migrations will no longer be able to exercise it, so split or merge will need to do so.
966 // SERVER-22659.
967 if (MONGO_FAIL_POINT(failApplyChunkOps)) {
968 status = Status(ErrorCodes::InternalError, "Failpoint 'failApplyChunkOps' generated error");
969 }
970
971 if (!status.isOK()) {
972 string errMsg;
973
974 // This could be a blip in the network connectivity. Check if the commit request made it.
975 //
976 // If all the updates were successfully written to the chunks collection, the last
977 // document in the list of updates should be returned from a query to the chunks
978 // collection. The last chunk can be identified by namespace and version number.
979
980 warning() << "chunk operation commit failed and metadata will be revalidated"
981 << causedBy(redact(status));
982
983 // Look for the chunk in this shard whose version got bumped. We assume that if that
984 // mod made it to the config server, then applyOps was successful.
985 std::vector<ChunkType> newestChunk;
986 BSONObjBuilder query;
987 lastChunkVersion.addToBSON(query, ChunkType::lastmod());
988 query.append(ChunkType::ns(), nss);
989 Status chunkStatus =
990 getChunks(opCtx, query.obj(), BSONObj(), 1, &newestChunk, nullptr, readConcern);
991
992 if (!chunkStatus.isOK()) {
993 errMsg = str::stream() << "getChunks function failed, unable to validate chunk "
994 << "operation metadata: " << chunkStatus.toString()
995 << ". applyChunkOpsDeprecated failed to get confirmation "
996 << "of commit. Unable to save chunk ops. Command: " << cmd
997 << ". Result: " << response.getValue().response;
998 } else if (!newestChunk.empty()) {
999 invariant(newestChunk.size() == 1);
1000 return Status::OK();
1001 } else {
1002 errMsg = str::stream() << "chunk operation commit failed: version "
1003 << lastChunkVersion.toString()
1004 << " doesn't exist in namespace: " << nss
1005 << ". Unable to save chunk ops. Command: " << cmd
1006 << ". Result: " << response.getValue().response;
1007 }
1008
1009 return status.withContext(errMsg);
1010 }
1011
1012 return Status::OK();
1013 }
1014
getDistLockManager()1015 DistLockManager* ShardingCatalogClientImpl::getDistLockManager() {
1016 invariant(_distLockManager);
1017 return _distLockManager.get();
1018 }
1019
writeConfigServerDirect(OperationContext * opCtx,const BatchedCommandRequest & batchRequest,BatchedCommandResponse * batchResponse)1020 void ShardingCatalogClientImpl::writeConfigServerDirect(OperationContext* opCtx,
1021 const BatchedCommandRequest& batchRequest,
1022 BatchedCommandResponse* batchResponse) {
1023 auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1024 *batchResponse = configShard->runBatchWriteCommand(opCtx,
1025 Shard::kDefaultConfigCommandTimeout,
1026 batchRequest,
1027 Shard::RetryPolicy::kNotIdempotent);
1028 }
1029
insertConfigDocument(OperationContext * opCtx,const std::string & ns,const BSONObj & doc,const WriteConcernOptions & writeConcern)1030 Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx,
1031 const std::string& ns,
1032 const BSONObj& doc,
1033 const WriteConcernOptions& writeConcern) {
1034 const NamespaceString nss(ns);
1035 invariant(nss.db() == NamespaceString::kAdminDb || nss.db() == NamespaceString::kConfigDb);
1036
1037 const BSONElement idField = doc.getField("_id");
1038 invariant(!idField.eoo());
1039
1040 BatchedCommandRequest request([&] {
1041 write_ops::Insert insertOp(nss);
1042 insertOp.setDocuments({doc});
1043 return insertOp;
1044 }());
1045 request.setWriteConcern(writeConcern.toBSON());
1046
1047 auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1048 for (int retry = 1; retry <= kMaxWriteRetry; retry++) {
1049 auto response = configShard->runBatchWriteCommand(
1050 opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kNoRetry);
1051
1052 Status status = response.toStatus();
1053
1054 if (retry < kMaxWriteRetry &&
1055 configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) {
1056 // Pretend like the operation is idempotent because we're handling DuplicateKey errors
1057 // specially
1058 continue;
1059 }
1060
1061 // If we get DuplicateKey error on the first attempt to insert, this definitively means that
1062 // we are trying to insert the same entry a second time, so error out. If it happens on a
1063 // retry attempt though, it is not clear whether we are actually inserting a duplicate key
1064 // or it is because we failed to wait for write concern on the first attempt. In order to
1065 // differentiate, fetch the entry and check.
1066 if (retry > 1 && status == ErrorCodes::DuplicateKey) {
1067 LOG(1) << "Insert retry failed because of duplicate key error, rechecking.";
1068
1069 auto fetchDuplicate =
1070 _exhaustiveFindOnConfig(opCtx,
1071 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
1072 repl::ReadConcernLevel::kMajorityReadConcern,
1073 nss,
1074 idField.wrap(),
1075 BSONObj(),
1076 boost::none);
1077 if (!fetchDuplicate.isOK()) {
1078 return fetchDuplicate.getStatus();
1079 }
1080
1081 auto existingDocs = fetchDuplicate.getValue().value;
1082 if (existingDocs.empty()) {
1083 return {ErrorCodes::DuplicateKey,
1084 stream() << "DuplicateKey error was returned after a retry attempt, but no "
1085 "documents were found. This means a concurrent change occurred "
1086 "together with the retries. Original error was "
1087 << status.toString()};
1088 }
1089
1090 invariant(existingDocs.size() == 1);
1091
1092 BSONObj existing = std::move(existingDocs.front());
1093 if (existing.woCompare(doc) == 0) {
1094 // Documents match, so treat the operation as success
1095 return Status::OK();
1096 }
1097 }
1098
1099 return status;
1100 }
1101
1102 MONGO_UNREACHABLE;
1103 }
1104
updateConfigDocument(OperationContext * opCtx,const string & ns,const BSONObj & query,const BSONObj & update,bool upsert,const WriteConcernOptions & writeConcern)1105 StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument(
1106 OperationContext* opCtx,
1107 const string& ns,
1108 const BSONObj& query,
1109 const BSONObj& update,
1110 bool upsert,
1111 const WriteConcernOptions& writeConcern) {
1112 return _updateConfigDocument(opCtx, ns, query, update, upsert, writeConcern);
1113 }
1114
_updateConfigDocument(OperationContext * opCtx,const string & ns,const BSONObj & query,const BSONObj & update,bool upsert,const WriteConcernOptions & writeConcern)1115 StatusWith<bool> ShardingCatalogClientImpl::_updateConfigDocument(
1116 OperationContext* opCtx,
1117 const string& ns,
1118 const BSONObj& query,
1119 const BSONObj& update,
1120 bool upsert,
1121 const WriteConcernOptions& writeConcern) {
1122 const NamespaceString nss(ns);
1123 invariant(nss.db() == "config");
1124
1125 const BSONElement idField = query.getField("_id");
1126 invariant(!idField.eoo());
1127
1128 BatchedCommandRequest request([&] {
1129 write_ops::Update updateOp(nss);
1130 updateOp.setUpdates({[&] {
1131 write_ops::UpdateOpEntry entry;
1132 entry.setQ(query);
1133 entry.setU(update);
1134 entry.setUpsert(upsert);
1135 entry.setMulti(false);
1136 return entry;
1137 }()});
1138 return updateOp;
1139 }());
1140 request.setWriteConcern(writeConcern.toBSON());
1141
1142 auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1143 auto response = configShard->runBatchWriteCommand(
1144 opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent);
1145
1146 Status status = response.toStatus();
1147 if (!status.isOK()) {
1148 return status;
1149 }
1150
1151 const auto nSelected = response.getN();
1152 invariant(nSelected == 0 || nSelected == 1);
1153 return (nSelected == 1);
1154 }
1155
removeConfigDocuments(OperationContext * opCtx,const string & ns,const BSONObj & query,const WriteConcernOptions & writeConcern)1156 Status ShardingCatalogClientImpl::removeConfigDocuments(OperationContext* opCtx,
1157 const string& ns,
1158 const BSONObj& query,
1159 const WriteConcernOptions& writeConcern) {
1160 const NamespaceString nss(ns);
1161 invariant(nss.db() == "config");
1162
1163 BatchedCommandRequest request([&] {
1164 write_ops::Delete deleteOp(nss);
1165 deleteOp.setDeletes({[&] {
1166 write_ops::DeleteOpEntry entry;
1167 entry.setQ(query);
1168 entry.setMulti(true);
1169 return entry;
1170 }()});
1171 return deleteOp;
1172 }());
1173 request.setWriteConcern(writeConcern.toBSON());
1174
1175 auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1176 auto response = configShard->runBatchWriteCommand(
1177 opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent);
1178 return response.toStatus();
1179 }
1180
_createCappedConfigCollection(OperationContext * opCtx,StringData collName,int cappedSize,const WriteConcernOptions & writeConcern)1181 Status ShardingCatalogClientImpl::_createCappedConfigCollection(
1182 OperationContext* opCtx,
1183 StringData collName,
1184 int cappedSize,
1185 const WriteConcernOptions& writeConcern) {
1186 BSONObj createCmd = BSON("create" << collName << "capped" << true << "size" << cappedSize
1187 << WriteConcernOptions::kWriteConcernField
1188 << writeConcern.toBSON());
1189
1190 auto result =
1191 Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
1192 opCtx,
1193 ReadPreferenceSetting{ReadPreference::PrimaryOnly},
1194 "config",
1195 createCmd,
1196 Shard::kDefaultConfigCommandTimeout,
1197 Shard::RetryPolicy::kIdempotent);
1198
1199 if (!result.isOK()) {
1200 return result.getStatus();
1201 }
1202
1203 if (!result.getValue().commandStatus.isOK()) {
1204 if (result.getValue().commandStatus == ErrorCodes::NamespaceExists) {
1205 if (result.getValue().writeConcernStatus.isOK()) {
1206 return Status::OK();
1207 } else {
1208 return result.getValue().writeConcernStatus;
1209 }
1210 } else {
1211 return result.getValue().commandStatus;
1212 }
1213 }
1214
1215 return result.getValue().writeConcernStatus;
1216 }
1217
_exhaustiveFindOnConfig(OperationContext * opCtx,const ReadPreferenceSetting & readPref,const repl::ReadConcernLevel & readConcern,const NamespaceString & nss,const BSONObj & query,const BSONObj & sort,boost::optional<long long> limit)1218 StatusWith<repl::OpTimeWith<vector<BSONObj>>> ShardingCatalogClientImpl::_exhaustiveFindOnConfig(
1219 OperationContext* opCtx,
1220 const ReadPreferenceSetting& readPref,
1221 const repl::ReadConcernLevel& readConcern,
1222 const NamespaceString& nss,
1223 const BSONObj& query,
1224 const BSONObj& sort,
1225 boost::optional<long long> limit) {
1226 auto response = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
1227 opCtx, readPref, readConcern, nss, query, sort, limit);
1228 if (!response.isOK()) {
1229 return response.getStatus();
1230 }
1231
1232 return repl::OpTimeWith<vector<BSONObj>>(std::move(response.getValue().docs),
1233 response.getValue().opTime);
1234 }
1235
_appendReadConcern(BSONObjBuilder * builder)1236 void ShardingCatalogClientImpl::_appendReadConcern(BSONObjBuilder* builder) {
1237 repl::ReadConcernArgs readConcern(grid.configOpTime(),
1238 repl::ReadConcernLevel::kMajorityReadConcern);
1239 readConcern.appendInfo(builder);
1240 }
1241
getNewKeys(OperationContext * opCtx,StringData purpose,const LogicalTime & newerThanThis,repl::ReadConcernLevel readConcernLevel)1242 StatusWith<std::vector<KeysCollectionDocument>> ShardingCatalogClientImpl::getNewKeys(
1243 OperationContext* opCtx,
1244 StringData purpose,
1245 const LogicalTime& newerThanThis,
1246 repl::ReadConcernLevel readConcernLevel) {
1247 auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1248
1249 BSONObjBuilder queryBuilder;
1250 queryBuilder.append("purpose", purpose);
1251 queryBuilder.append("expiresAt", BSON("$gt" << newerThanThis.asTimestamp()));
1252
1253 auto findStatus =
1254 config->exhaustiveFindOnConfig(opCtx,
1255 kConfigReadSelector,
1256 readConcernLevel,
1257 NamespaceString(KeysCollectionDocument::ConfigNS),
1258 queryBuilder.obj(),
1259 BSON("expiresAt" << 1),
1260 boost::none);
1261
1262 if (!findStatus.isOK()) {
1263 return findStatus.getStatus();
1264 }
1265
1266 const auto& keyDocs = findStatus.getValue().docs;
1267 std::vector<KeysCollectionDocument> keys;
1268 for (auto&& keyDoc : keyDocs) {
1269 auto parseStatus = KeysCollectionDocument::fromBSON(keyDoc);
1270 if (!parseStatus.isOK()) {
1271 return parseStatus.getStatus();
1272 }
1273
1274 keys.push_back(std::move(parseStatus.getValue()));
1275 }
1276
1277 return keys;
1278 }
1279
1280 } // namespace mongo
1281