1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/s/catalog/sharding_catalog_client_impl.h"
36 
37 #include <iomanip>
38 #include <pcrecpp.h>
39 
40 #include "mongo/base/status.h"
41 #include "mongo/base/status_with.h"
42 #include "mongo/bson/bsonobjbuilder.h"
43 #include "mongo/bson/util/bson_extract.h"
44 #include "mongo/client/read_preference.h"
45 #include "mongo/client/remote_command_targeter.h"
46 #include "mongo/client/replica_set_monitor.h"
47 #include "mongo/db/audit.h"
48 #include "mongo/db/client.h"
49 #include "mongo/db/commands.h"
50 #include "mongo/db/namespace_string.h"
51 #include "mongo/db/operation_context.h"
52 #include "mongo/db/repl/optime.h"
53 #include "mongo/db/repl/read_concern_args.h"
54 #include "mongo/db/s/type_shard_identity.h"
55 #include "mongo/executor/network_interface.h"
56 #include "mongo/executor/task_executor.h"
57 #include "mongo/rpc/get_status_from_command_result.h"
58 #include "mongo/rpc/metadata/repl_set_metadata.h"
59 #include "mongo/s/catalog/config_server_version.h"
60 #include "mongo/s/catalog/dist_lock_manager.h"
61 #include "mongo/s/catalog/type_changelog.h"
62 #include "mongo/s/catalog/type_chunk.h"
63 #include "mongo/s/catalog/type_collection.h"
64 #include "mongo/s/catalog/type_config_version.h"
65 #include "mongo/s/catalog/type_database.h"
66 #include "mongo/s/catalog/type_shard.h"
67 #include "mongo/s/catalog/type_tags.h"
68 #include "mongo/s/client/shard.h"
69 #include "mongo/s/client/shard_connection.h"
70 #include "mongo/s/client/shard_registry.h"
71 #include "mongo/s/grid.h"
72 #include "mongo/s/set_shard_version_request.h"
73 #include "mongo/s/shard_key_pattern.h"
74 #include "mongo/s/shard_util.h"
75 #include "mongo/s/write_ops/batched_command_request.h"
76 #include "mongo/s/write_ops/batched_command_response.h"
77 #include "mongo/util/assert_util.h"
78 #include "mongo/util/fail_point_service.h"
79 #include "mongo/util/log.h"
80 #include "mongo/util/mongoutils/str.h"
81 #include "mongo/util/net/hostandport.h"
82 #include "mongo/util/time_support.h"
83 
84 namespace mongo {
85 
86 MONGO_FP_DECLARE(failApplyChunkOps);
87 
88 using repl::OpTime;
89 using std::set;
90 using std::shared_ptr;
91 using std::string;
92 using std::unique_ptr;
93 using std::vector;
94 using str::stream;
95 
96 namespace {
97 
98 const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
99 const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred,
100                                                             TagSet{});
101 const int kMaxReadRetry = 3;
102 const int kMaxWriteRetry = 3;
103 
104 const std::string kActionLogCollectionName("actionlog");
105 const int kActionLogCollectionSizeMB = 20 * 1024 * 1024;
106 
107 const std::string kChangeLogCollectionName("changelog");
108 const int kChangeLogCollectionSizeMB = 200 * 1024 * 1024;
109 
110 const NamespaceString kSettingsNamespace("config", "settings");
111 
toBatchError(const Status & status,BatchedCommandResponse * response)112 void toBatchError(const Status& status, BatchedCommandResponse* response) {
113     response->clear();
114     response->setErrCode(status.code());
115     response->setErrMessage(status.reason());
116     response->setOk(false);
117 }
118 
119 }  // namespace
120 
ShardingCatalogClientImpl(std::unique_ptr<DistLockManager> distLockManager)121 ShardingCatalogClientImpl::ShardingCatalogClientImpl(
122     std::unique_ptr<DistLockManager> distLockManager)
123     : _distLockManager(std::move(distLockManager)) {}
124 
125 ShardingCatalogClientImpl::~ShardingCatalogClientImpl() = default;
126 
startup()127 void ShardingCatalogClientImpl::startup() {
128     stdx::lock_guard<stdx::mutex> lk(_mutex);
129     if (_started) {
130         return;
131     }
132 
133     _started = true;
134     _distLockManager->startUp();
135 }
136 
shutDown(OperationContext * opCtx)137 void ShardingCatalogClientImpl::shutDown(OperationContext* opCtx) {
138     LOG(1) << "ShardingCatalogClientImpl::shutDown() called.";
139     {
140         stdx::lock_guard<stdx::mutex> lk(_mutex);
141         _inShutdown = true;
142     }
143 
144     invariant(_distLockManager);
145     _distLockManager->shutDown(opCtx);
146 }
147 
updateShardingCatalogEntryForCollection(OperationContext * opCtx,const std::string & collNs,const CollectionType & coll,const bool upsert)148 Status ShardingCatalogClientImpl::updateShardingCatalogEntryForCollection(
149     OperationContext* opCtx,
150     const std::string& collNs,
151     const CollectionType& coll,
152     const bool upsert) {
153     fassert(28634, coll.validate());
154 
155     auto status = _updateConfigDocument(opCtx,
156                                         CollectionType::ConfigNS,
157                                         BSON(CollectionType::fullNs(collNs)),
158                                         coll.toBSON(),
159                                         upsert,
160                                         ShardingCatalogClient::kMajorityWriteConcern);
161     if (!status.isOK()) {
162         return {status.getStatus().code(),
163                 str::stream() << "Collection metadata write failed due to "
164                               << status.getStatus().reason()};
165     }
166 
167     return Status::OK();
168 }
169 
updateDatabase(OperationContext * opCtx,const std::string & dbName,const DatabaseType & db)170 Status ShardingCatalogClientImpl::updateDatabase(OperationContext* opCtx,
171                                                  const std::string& dbName,
172                                                  const DatabaseType& db) {
173     fassert(28616, db.validate());
174 
175     auto status = updateConfigDocument(opCtx,
176                                        DatabaseType::ConfigNS,
177                                        BSON(DatabaseType::name(dbName)),
178                                        db.toBSON(),
179                                        true,
180                                        ShardingCatalogClient::kMajorityWriteConcern);
181     if (!status.isOK()) {
182         return {status.getStatus().code(),
183                 str::stream() << "Database metadata write failed due to "
184                               << status.getStatus().reason()};
185     }
186 
187     return Status::OK();
188 }
189 
logAction(OperationContext * opCtx,const std::string & what,const std::string & ns,const BSONObj & detail)190 Status ShardingCatalogClientImpl::logAction(OperationContext* opCtx,
191                                             const std::string& what,
192                                             const std::string& ns,
193                                             const BSONObj& detail) {
194     if (_actionLogCollectionCreated.load() == 0) {
195         Status result = _createCappedConfigCollection(opCtx,
196                                                       kActionLogCollectionName,
197                                                       kActionLogCollectionSizeMB,
198                                                       ShardingCatalogClient::kMajorityWriteConcern);
199         if (result.isOK()) {
200             _actionLogCollectionCreated.store(1);
201         } else {
202             log() << "couldn't create config.actionlog collection:" << causedBy(result);
203             return result;
204         }
205     }
206 
207     return _log(opCtx,
208                 kActionLogCollectionName,
209                 what,
210                 ns,
211                 detail,
212                 ShardingCatalogClient::kMajorityWriteConcern);
213 }
214 
logChange(OperationContext * opCtx,const std::string & what,const std::string & ns,const BSONObj & detail,const WriteConcernOptions & writeConcern)215 Status ShardingCatalogClientImpl::logChange(OperationContext* opCtx,
216                                             const std::string& what,
217                                             const std::string& ns,
218                                             const BSONObj& detail,
219                                             const WriteConcernOptions& writeConcern) {
220     invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
221               writeConcern.wMode == WriteConcernOptions::kMajority);
222     if (_changeLogCollectionCreated.load() == 0) {
223         Status result = _createCappedConfigCollection(
224             opCtx, kChangeLogCollectionName, kChangeLogCollectionSizeMB, writeConcern);
225         if (result.isOK()) {
226             _changeLogCollectionCreated.store(1);
227         } else {
228             log() << "couldn't create config.changelog collection:" << causedBy(result);
229             return result;
230         }
231     }
232 
233     return _log(opCtx, kChangeLogCollectionName, what, ns, detail, writeConcern);
234 }
235 
_log(OperationContext * opCtx,const StringData & logCollName,const std::string & what,const std::string & operationNS,const BSONObj & detail,const WriteConcernOptions & writeConcern)236 Status ShardingCatalogClientImpl::_log(OperationContext* opCtx,
237                                        const StringData& logCollName,
238                                        const std::string& what,
239                                        const std::string& operationNS,
240                                        const BSONObj& detail,
241                                        const WriteConcernOptions& writeConcern) {
242     Date_t now = Grid::get(opCtx)->getNetwork()->now();
243     const std::string hostName = Grid::get(opCtx)->getNetwork()->getHostName();
244     const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen();
245 
246     ChangeLogType changeLog;
247     changeLog.setChangeId(changeId);
248     changeLog.setServer(hostName);
249     changeLog.setClientAddr(opCtx->getClient()->clientAddress(true));
250     changeLog.setTime(now);
251     changeLog.setNS(operationNS);
252     changeLog.setWhat(what);
253     changeLog.setDetails(detail);
254 
255     BSONObj changeLogBSON = changeLog.toBSON();
256     log() << "about to log metadata event into " << logCollName << ": " << redact(changeLogBSON);
257 
258     const NamespaceString nss("config", logCollName);
259     Status result = insertConfigDocument(opCtx, nss.ns(), changeLogBSON, writeConcern);
260 
261     if (!result.isOK()) {
262         warning() << "Error encountered while logging config change with ID [" << changeId
263                   << "] into collection " << logCollName << ": " << redact(result);
264     }
265 
266     return result;
267 }
268 
getDatabase(OperationContext * opCtx,const std::string & dbName,repl::ReadConcernLevel readConcernLevel)269 StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::getDatabase(
270     OperationContext* opCtx, const std::string& dbName, repl::ReadConcernLevel readConcernLevel) {
271     if (!NamespaceString::validDBName(dbName, NamespaceString::DollarInDbNameBehavior::Allow)) {
272         return {ErrorCodes::InvalidNamespace, stream() << dbName << " is not a valid db name"};
273     }
274 
275     // The admin database is always hosted on the config server.
276     if (dbName == "admin") {
277         DatabaseType dbt;
278         dbt.setName(dbName);
279         dbt.setSharded(false);
280         dbt.setPrimary(ShardRegistry::kConfigServerShardId);
281 
282         return repl::OpTimeWith<DatabaseType>(dbt);
283     }
284 
285     // The config database's primary shard is always config, and it is always sharded.
286     if (dbName == "config") {
287         DatabaseType dbt;
288         dbt.setName(dbName);
289         dbt.setSharded(true);
290         dbt.setPrimary(ShardRegistry::kConfigServerShardId);
291 
292         return repl::OpTimeWith<DatabaseType>(dbt);
293     }
294 
295     auto result = _fetchDatabaseMetadata(opCtx, dbName, kConfigReadSelector, readConcernLevel);
296     if (result == ErrorCodes::NamespaceNotFound) {
297         // If we failed to find the database metadata on the 'nearest' config server, try again
298         // against the primary, in case the database was recently created.
299         result = _fetchDatabaseMetadata(
300             opCtx, dbName, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, readConcernLevel);
301         if (!result.isOK() && (result != ErrorCodes::NamespaceNotFound)) {
302             return {result.getStatus().code(),
303                     str::stream() << "Could not confirm non-existence of database " << dbName
304                                   << " due to "
305                                   << result.getStatus().reason()};
306         }
307     }
308 
309     return result;
310 }
311 
_fetchDatabaseMetadata(OperationContext * opCtx,const std::string & dbName,const ReadPreferenceSetting & readPref,repl::ReadConcernLevel readConcernLevel)312 StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchDatabaseMetadata(
313     OperationContext* opCtx,
314     const std::string& dbName,
315     const ReadPreferenceSetting& readPref,
316     repl::ReadConcernLevel readConcernLevel) {
317     dassert(dbName != "admin" && dbName != "config");
318 
319     auto findStatus = _exhaustiveFindOnConfig(opCtx,
320                                               readPref,
321                                               readConcernLevel,
322                                               NamespaceString(DatabaseType::ConfigNS),
323                                               BSON(DatabaseType::name(dbName)),
324                                               BSONObj(),
325                                               boost::none);
326     if (!findStatus.isOK()) {
327         return findStatus.getStatus();
328     }
329 
330     const auto& docsWithOpTime = findStatus.getValue();
331     if (docsWithOpTime.value.empty()) {
332         return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"};
333     }
334 
335     invariant(docsWithOpTime.value.size() == 1);
336 
337     auto parseStatus = DatabaseType::fromBSON(docsWithOpTime.value.front());
338     if (!parseStatus.isOK()) {
339         return parseStatus.getStatus();
340     }
341 
342     return repl::OpTimeWith<DatabaseType>(parseStatus.getValue(), docsWithOpTime.opTime);
343 }
344 
getCollection(OperationContext * opCtx,const std::string & collNs)345 StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getCollection(
346     OperationContext* opCtx, const std::string& collNs) {
347     auto statusFind = _exhaustiveFindOnConfig(opCtx,
348                                               kConfigReadSelector,
349                                               repl::ReadConcernLevel::kMajorityReadConcern,
350                                               NamespaceString(CollectionType::ConfigNS),
351                                               BSON(CollectionType::fullNs(collNs)),
352                                               BSONObj(),
353                                               1);
354     if (!statusFind.isOK()) {
355         return statusFind.getStatus();
356     }
357 
358     const auto& retOpTimePair = statusFind.getValue();
359     const auto& retVal = retOpTimePair.value;
360     if (retVal.empty()) {
361         return Status(ErrorCodes::NamespaceNotFound,
362                       stream() << "collection " << collNs << " not found");
363     }
364 
365     invariant(retVal.size() == 1);
366 
367     auto parseStatus = CollectionType::fromBSON(retVal.front());
368     if (!parseStatus.isOK()) {
369         return parseStatus.getStatus();
370     }
371 
372     auto collType = parseStatus.getValue();
373     if (collType.getDropped()) {
374         return Status(ErrorCodes::NamespaceNotFound,
375                       stream() << "collection " << collNs << " was dropped");
376     }
377 
378     return repl::OpTimeWith<CollectionType>(collType, retOpTimePair.opTime);
379 }
380 
getCollections(OperationContext * opCtx,const std::string * dbName,std::vector<CollectionType> * collections,OpTime * opTime)381 Status ShardingCatalogClientImpl::getCollections(OperationContext* opCtx,
382                                                  const std::string* dbName,
383                                                  std::vector<CollectionType>* collections,
384                                                  OpTime* opTime) {
385     BSONObjBuilder b;
386     if (dbName) {
387         invariant(!dbName->empty());
388         b.appendRegex(CollectionType::fullNs(),
389                       string(str::stream() << "^" << pcrecpp::RE::QuoteMeta(*dbName) << "\\."));
390     }
391 
392     auto findStatus = _exhaustiveFindOnConfig(opCtx,
393                                               kConfigReadSelector,
394                                               repl::ReadConcernLevel::kMajorityReadConcern,
395                                               NamespaceString(CollectionType::ConfigNS),
396                                               b.obj(),
397                                               BSONObj(),
398                                               boost::none);  // no limit
399     if (!findStatus.isOK()) {
400         return findStatus.getStatus();
401     }
402 
403     const auto& docsOpTimePair = findStatus.getValue();
404 
405     for (const BSONObj& obj : docsOpTimePair.value) {
406         const auto collectionResult = CollectionType::fromBSON(obj);
407         if (!collectionResult.isOK()) {
408             collections->clear();
409             return {ErrorCodes::FailedToParse,
410                     str::stream() << "error while parsing " << CollectionType::ConfigNS
411                                   << " document: "
412                                   << obj
413                                   << " : "
414                                   << collectionResult.getStatus().toString()};
415         }
416 
417         collections->push_back(collectionResult.getValue());
418     }
419 
420     if (opTime) {
421         *opTime = docsOpTimePair.opTime;
422     }
423 
424     return Status::OK();
425 }
426 
dropCollection(OperationContext * opCtx,const NamespaceString & ns)427 Status ShardingCatalogClientImpl::dropCollection(OperationContext* opCtx,
428                                                  const NamespaceString& ns) {
429     logChange(opCtx,
430               "dropCollection.start",
431               ns.ns(),
432               BSONObj(),
433               ShardingCatalogClientImpl::kMajorityWriteConcern)
434         .ignore();
435 
436     auto shardsStatus = getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern);
437     if (!shardsStatus.isOK()) {
438         return shardsStatus.getStatus();
439     }
440     vector<ShardType> allShards = std::move(shardsStatus.getValue().value);
441 
442     LOG(1) << "dropCollection " << ns << " started";
443 
444     const auto dropCommandBSON = [opCtx, &ns] {
445         BSONObjBuilder builder;
446         builder.append("drop", ns.coll());
447 
448         if (!opCtx->getWriteConcern().usedDefault) {
449             builder.append(WriteConcernOptions::kWriteConcernField,
450                            opCtx->getWriteConcern().toBSON());
451         }
452 
453         return builder.obj();
454     }();
455 
456     std::map<std::string, BSONObj> errors;
457     auto* const shardRegistry = Grid::get(opCtx)->shardRegistry();
458 
459     for (const auto& shardEntry : allShards) {
460         auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName());
461         if (!swShard.isOK()) {
462             return swShard.getStatus();
463         }
464 
465         const auto& shard = swShard.getValue();
466 
467         auto swDropResult = shard->runCommandWithFixedRetryAttempts(
468             opCtx,
469             ReadPreferenceSetting{ReadPreference::PrimaryOnly},
470             ns.db().toString(),
471             dropCommandBSON,
472             Shard::RetryPolicy::kIdempotent);
473 
474         if (!swDropResult.isOK()) {
475             return {swDropResult.getStatus().code(),
476                     str::stream() << swDropResult.getStatus().reason() << " at "
477                                   << shardEntry.getName()};
478         }
479 
480         auto& dropResult = swDropResult.getValue();
481 
482         auto dropStatus = std::move(dropResult.commandStatus);
483         auto wcStatus = std::move(dropResult.writeConcernStatus);
484         if (!dropStatus.isOK() || !wcStatus.isOK()) {
485             if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) {
486                 // Generally getting NamespaceNotFound is okay to ignore as it simply means that
487                 // the collection has already been dropped or doesn't exist on this shard.
488                 // If, however, we get NamespaceNotFound but also have a write concern error then we
489                 // can't confirm whether the fact that the namespace doesn't exist is actually
490                 // committed.  Thus we must still fail on NamespaceNotFound if there is also a write
491                 // concern error. This can happen if we call drop, it succeeds but with a write
492                 // concern error, then we retry the drop.
493                 continue;
494             }
495 
496             errors.emplace(shardEntry.getHost(), std::move(dropResult.response));
497         }
498     }
499 
500     if (!errors.empty()) {
501         StringBuilder sb;
502         sb << "Dropping collection failed on the following hosts: ";
503 
504         for (auto it = errors.cbegin(); it != errors.cend(); ++it) {
505             if (it != errors.cbegin()) {
506                 sb << ", ";
507             }
508 
509             sb << it->first << ": " << it->second;
510         }
511 
512         return {ErrorCodes::OperationFailed, sb.str()};
513     }
514 
515     LOG(1) << "dropCollection " << ns << " shard data deleted";
516 
517     // Remove chunk data
518     Status result = removeConfigDocuments(opCtx,
519                                           ChunkType::ConfigNS,
520                                           BSON(ChunkType::ns(ns.ns())),
521                                           ShardingCatalogClient::kMajorityWriteConcern);
522     if (!result.isOK()) {
523         return result;
524     }
525 
526     LOG(1) << "dropCollection " << ns << " chunk data deleted";
527 
528     // Mark the collection as dropped
529     CollectionType coll;
530     coll.setNs(ns);
531     coll.setDropped(true);
532     coll.setEpoch(ChunkVersion::DROPPED().epoch());
533     coll.setUpdatedAt(Grid::get(opCtx)->getNetwork()->now());
534 
535     const bool upsert = false;
536     result = updateShardingCatalogEntryForCollection(opCtx, ns.ns(), coll, upsert);
537     if (!result.isOK()) {
538         return result;
539     }
540 
541     LOG(1) << "dropCollection " << ns << " collection marked as dropped";
542 
543     for (const auto& shardEntry : allShards) {
544         auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName());
545         if (!swShard.isOK()) {
546             return swShard.getStatus();
547         }
548 
549         const auto& shard = swShard.getValue();
550 
551         SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
552             shardRegistry->getConfigServerConnectionString(),
553             shardEntry.getName(),
554             fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())),
555             ns,
556             ChunkVersion::DROPPED(),
557             true);
558 
559         auto ssvResult = shard->runCommandWithFixedRetryAttempts(
560             opCtx,
561             ReadPreferenceSetting{ReadPreference::PrimaryOnly},
562             "admin",
563             ssv.toBSON(),
564             Shard::RetryPolicy::kIdempotent);
565 
566         if (!ssvResult.isOK()) {
567             return ssvResult.getStatus();
568         }
569 
570         auto ssvStatus = std::move(ssvResult.getValue().commandStatus);
571         if (!ssvStatus.isOK()) {
572             return ssvStatus;
573         }
574 
575         auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts(
576             opCtx,
577             ReadPreferenceSetting{ReadPreference::PrimaryOnly},
578             "admin",
579             BSON("unsetSharding" << 1),
580             Shard::RetryPolicy::kIdempotent);
581 
582         if (!unsetShardingStatus.isOK()) {
583             return unsetShardingStatus.getStatus();
584         }
585 
586         auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus);
587         if (!unsetShardingResult.isOK()) {
588             return unsetShardingResult;
589         }
590     }
591 
592     LOG(1) << "dropCollection " << ns << " completed";
593 
594     logChange(opCtx,
595               "dropCollection",
596               ns.ns(),
597               BSONObj(),
598               ShardingCatalogClientImpl::kMajorityWriteConcern)
599         .ignore();
600 
601     return Status::OK();
602 }
603 
getGlobalSettings(OperationContext * opCtx,StringData key)604 StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContext* opCtx,
605                                                                  StringData key) {
606     auto findStatus = _exhaustiveFindOnConfig(opCtx,
607                                               kConfigReadSelector,
608                                               repl::ReadConcernLevel::kMajorityReadConcern,
609                                               kSettingsNamespace,
610                                               BSON("_id" << key),
611                                               BSONObj(),
612                                               1);
613     if (!findStatus.isOK()) {
614         return findStatus.getStatus();
615     }
616 
617     const auto& docs = findStatus.getValue().value;
618     if (docs.empty()) {
619         return {ErrorCodes::NoMatchingDocument,
620                 str::stream() << "can't find settings document with key: " << key};
621     }
622 
623     invariant(docs.size() == 1);
624     return docs.front();
625 }
626 
getConfigVersion(OperationContext * opCtx,repl::ReadConcernLevel readConcern)627 StatusWith<VersionType> ShardingCatalogClientImpl::getConfigVersion(
628     OperationContext* opCtx, repl::ReadConcernLevel readConcern) {
629     auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
630         opCtx,
631         kConfigReadSelector,
632         readConcern,
633         NamespaceString(VersionType::ConfigNS),
634         BSONObj(),
635         BSONObj(),
636         boost::none /* no limit */);
637     if (!findStatus.isOK()) {
638         return findStatus.getStatus();
639     }
640 
641     auto queryResults = findStatus.getValue().docs;
642 
643     if (queryResults.size() > 1) {
644         return {ErrorCodes::TooManyMatchingDocuments,
645                 str::stream() << "should only have 1 document in " << VersionType::ConfigNS};
646     }
647 
648     if (queryResults.empty()) {
649         VersionType versionInfo;
650         versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
651         versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion);
652         versionInfo.setClusterId(OID{});
653         return versionInfo;
654     }
655 
656     BSONObj versionDoc = queryResults.front();
657     auto versionTypeResult = VersionType::fromBSON(versionDoc);
658     if (!versionTypeResult.isOK()) {
659         return {versionTypeResult.getStatus().code(),
660                 str::stream() << "Unable to parse config.version document " << versionDoc
661                               << " due to "
662                               << versionTypeResult.getStatus().reason()};
663     }
664 
665     auto validationStatus = versionTypeResult.getValue().validate();
666     if (!validationStatus.isOK()) {
667         return Status(validationStatus.code(),
668                       str::stream() << "Unable to validate config.version document " << versionDoc
669                                     << " due to "
670                                     << validationStatus.reason());
671     }
672 
673     return versionTypeResult.getValue();
674 }
675 
getDatabasesForShard(OperationContext * opCtx,const ShardId & shardId,vector<string> * dbs)676 Status ShardingCatalogClientImpl::getDatabasesForShard(OperationContext* opCtx,
677                                                        const ShardId& shardId,
678                                                        vector<string>* dbs) {
679     auto findStatus = _exhaustiveFindOnConfig(opCtx,
680                                               kConfigReadSelector,
681                                               repl::ReadConcernLevel::kMajorityReadConcern,
682                                               NamespaceString(DatabaseType::ConfigNS),
683                                               BSON(DatabaseType::primary(shardId.toString())),
684                                               BSONObj(),
685                                               boost::none);  // no limit
686     if (!findStatus.isOK()) {
687         return findStatus.getStatus();
688     }
689 
690     for (const BSONObj& obj : findStatus.getValue().value) {
691         string dbName;
692         Status status = bsonExtractStringField(obj, DatabaseType::name(), &dbName);
693         if (!status.isOK()) {
694             dbs->clear();
695             return status;
696         }
697 
698         dbs->push_back(dbName);
699     }
700 
701     return Status::OK();
702 }
703 
getChunks(OperationContext * opCtx,const BSONObj & query,const BSONObj & sort,boost::optional<int> limit,vector<ChunkType> * chunks,OpTime * opTime,repl::ReadConcernLevel readConcern)704 Status ShardingCatalogClientImpl::getChunks(OperationContext* opCtx,
705                                             const BSONObj& query,
706                                             const BSONObj& sort,
707                                             boost::optional<int> limit,
708                                             vector<ChunkType>* chunks,
709                                             OpTime* opTime,
710                                             repl::ReadConcernLevel readConcern) {
711     invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
712               readConcern == repl::ReadConcernLevel::kMajorityReadConcern);
713     chunks->clear();
714 
715     // Convert boost::optional<int> to boost::optional<long long>.
716     auto longLimit = limit ? boost::optional<long long>(*limit) : boost::none;
717     auto findStatus = _exhaustiveFindOnConfig(opCtx,
718                                               kConfigReadSelector,
719                                               readConcern,
720                                               NamespaceString(ChunkType::ConfigNS),
721                                               query,
722                                               sort,
723                                               longLimit);
724     if (!findStatus.isOK()) {
725         return {findStatus.getStatus().code(),
726                 str::stream() << "Failed to load chunks due to "
727                               << findStatus.getStatus().reason()};
728     }
729 
730     const auto& chunkDocsOpTimePair = findStatus.getValue();
731     for (const BSONObj& obj : chunkDocsOpTimePair.value) {
732         auto chunkRes = ChunkType::fromConfigBSON(obj);
733         if (!chunkRes.isOK()) {
734             chunks->clear();
735             return {chunkRes.getStatus().code(),
736                     stream() << "Failed to parse chunk with id " << obj[ChunkType::name()]
737                              << " due to "
738                              << chunkRes.getStatus().reason()};
739         }
740 
741         chunks->push_back(chunkRes.getValue());
742     }
743 
744     if (opTime) {
745         *opTime = chunkDocsOpTimePair.opTime;
746     }
747 
748     return Status::OK();
749 }
750 
getTagsForCollection(OperationContext * opCtx,const std::string & collectionNs,std::vector<TagsType> * tags)751 Status ShardingCatalogClientImpl::getTagsForCollection(OperationContext* opCtx,
752                                                        const std::string& collectionNs,
753                                                        std::vector<TagsType>* tags) {
754     tags->clear();
755 
756     auto findStatus = _exhaustiveFindOnConfig(opCtx,
757                                               kConfigReadSelector,
758                                               repl::ReadConcernLevel::kMajorityReadConcern,
759                                               NamespaceString(TagsType::ConfigNS),
760                                               BSON(TagsType::ns(collectionNs)),
761                                               BSON(TagsType::min() << 1),
762                                               boost::none);  // no limit
763     if (!findStatus.isOK()) {
764         return {findStatus.getStatus().code(),
765                 str::stream() << "Failed to load tags due to " << findStatus.getStatus().reason()};
766     }
767 
768     const auto& tagDocsOpTimePair = findStatus.getValue();
769     for (const BSONObj& obj : tagDocsOpTimePair.value) {
770         auto tagRes = TagsType::fromBSON(obj);
771         if (!tagRes.isOK()) {
772             tags->clear();
773             return {tagRes.getStatus().code(),
774                     str::stream() << "Failed to parse tag with id " << obj[TagsType::tag()]
775                                   << " due to "
776                                   << tagRes.getStatus().toString()};
777         }
778 
779         tags->push_back(tagRes.getValue());
780     }
781 
782     return Status::OK();
783 }
784 
getAllShards(OperationContext * opCtx,repl::ReadConcernLevel readConcern)785 StatusWith<repl::OpTimeWith<std::vector<ShardType>>> ShardingCatalogClientImpl::getAllShards(
786     OperationContext* opCtx, repl::ReadConcernLevel readConcern) {
787     std::vector<ShardType> shards;
788     auto findStatus = _exhaustiveFindOnConfig(opCtx,
789                                               kConfigReadSelector,
790                                               readConcern,
791                                               NamespaceString(ShardType::ConfigNS),
792                                               BSONObj(),     // no query filter
793                                               BSONObj(),     // no sort
794                                               boost::none);  // no limit
795     if (!findStatus.isOK()) {
796         return findStatus.getStatus();
797     }
798 
799     for (const BSONObj& doc : findStatus.getValue().value) {
800         auto shardRes = ShardType::fromBSON(doc);
801         if (!shardRes.isOK()) {
802             return {shardRes.getStatus().code(),
803                     stream() << "Failed to parse shard document " << doc << " due to "
804                              << shardRes.getStatus().reason()};
805         }
806 
807         Status validateStatus = shardRes.getValue().validate();
808         if (!validateStatus.isOK()) {
809             return {validateStatus.code(),
810                     stream() << "Failed to validate shard document " << doc << " due to "
811                              << validateStatus.reason()};
812         }
813 
814         shards.push_back(shardRes.getValue());
815     }
816 
817     return repl::OpTimeWith<std::vector<ShardType>>{std::move(shards),
818                                                     findStatus.getValue().opTime};
819 }
820 
runUserManagementWriteCommand(OperationContext * opCtx,const std::string & commandName,const std::string & dbname,const BSONObj & cmdObj,BSONObjBuilder * result)821 bool ShardingCatalogClientImpl::runUserManagementWriteCommand(OperationContext* opCtx,
822                                                               const std::string& commandName,
823                                                               const std::string& dbname,
824                                                               const BSONObj& cmdObj,
825                                                               BSONObjBuilder* result) {
826     BSONObj cmdToRun = cmdObj;
827     {
828         // Make sure that if the command has a write concern that it is w:1 or w:majority, and
829         // convert w:1 or no write concern to w:majority before sending.
830         WriteConcernOptions writeConcern;
831         writeConcern.reset();
832 
833         BSONElement writeConcernElement = cmdObj[WriteConcernOptions::kWriteConcernField];
834         bool initialCmdHadWriteConcern = !writeConcernElement.eoo();
835         if (initialCmdHadWriteConcern) {
836             Status status = writeConcern.parse(writeConcernElement.Obj());
837             if (!status.isOK()) {
838                 return Command::appendCommandStatus(*result, status);
839             }
840 
841             if (!(writeConcern.wNumNodes == 1 ||
842                   writeConcern.wMode == WriteConcernOptions::kMajority)) {
843                 return Command::appendCommandStatus(
844                     *result,
845                     {ErrorCodes::InvalidOptions,
846                      str::stream() << "Invalid replication write concern. User management write "
847                                       "commands may only use w:1 or w:'majority', got: "
848                                    << writeConcern.toBSON()});
849             }
850         }
851 
852         writeConcern.wMode = WriteConcernOptions::kMajority;
853         writeConcern.wNumNodes = 0;
854 
855         BSONObjBuilder modifiedCmd;
856         if (!initialCmdHadWriteConcern) {
857             modifiedCmd.appendElements(cmdObj);
858         } else {
859             BSONObjIterator cmdObjIter(cmdObj);
860             while (cmdObjIter.more()) {
861                 BSONElement e = cmdObjIter.next();
862                 if (WriteConcernOptions::kWriteConcernField == e.fieldName()) {
863                     continue;
864                 }
865                 modifiedCmd.append(e);
866             }
867         }
868         modifiedCmd.append(WriteConcernOptions::kWriteConcernField, writeConcern.toBSON());
869         cmdToRun = modifiedCmd.obj();
870     }
871 
872     auto response =
873         Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
874             opCtx,
875             ReadPreferenceSetting{ReadPreference::PrimaryOnly},
876             dbname,
877             cmdToRun,
878             Shard::kDefaultConfigCommandTimeout,
879             Shard::RetryPolicy::kNotIdempotent);
880 
881     if (!response.isOK()) {
882         return Command::appendCommandStatus(*result, response.getStatus());
883     }
884     if (!response.getValue().commandStatus.isOK()) {
885         return Command::appendCommandStatus(*result, response.getValue().commandStatus);
886     }
887     if (!response.getValue().writeConcernStatus.isOK()) {
888         return Command::appendCommandStatus(*result, response.getValue().writeConcernStatus);
889     }
890 
891     Command::filterCommandReplyForPassthrough(response.getValue().response, result);
892     return true;
893 }
894 
runReadCommandForTest(OperationContext * opCtx,const std::string & dbname,const BSONObj & cmdObj,BSONObjBuilder * result)895 bool ShardingCatalogClientImpl::runReadCommandForTest(OperationContext* opCtx,
896                                                       const std::string& dbname,
897                                                       const BSONObj& cmdObj,
898                                                       BSONObjBuilder* result) {
899     BSONObjBuilder cmdBuilder;
900     cmdBuilder.appendElements(cmdObj);
901     _appendReadConcern(&cmdBuilder);
902 
903     auto resultStatus =
904         Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
905             opCtx, kConfigReadSelector, dbname, cmdBuilder.done(), Shard::RetryPolicy::kIdempotent);
906     if (resultStatus.isOK()) {
907         result->appendElements(resultStatus.getValue().response);
908         return resultStatus.getValue().commandStatus.isOK();
909     }
910 
911     return Command::appendCommandStatus(*result, resultStatus.getStatus());
912 }
913 
runUserManagementReadCommand(OperationContext * opCtx,const std::string & dbname,const BSONObj & cmdObj,BSONObjBuilder * result)914 bool ShardingCatalogClientImpl::runUserManagementReadCommand(OperationContext* opCtx,
915                                                              const std::string& dbname,
916                                                              const BSONObj& cmdObj,
917                                                              BSONObjBuilder* result) {
918     auto resultStatus =
919         Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
920             opCtx,
921             kConfigPrimaryPreferredSelector,
922             dbname,
923             cmdObj,
924             Shard::kDefaultConfigCommandTimeout,
925             Shard::RetryPolicy::kIdempotent);
926     if (resultStatus.isOK()) {
927         Command::filterCommandReplyForPassthrough(resultStatus.getValue().response, result);
928         return resultStatus.getValue().commandStatus.isOK();
929     }
930 
931     return Command::appendCommandStatus(*result, resultStatus.getStatus());
932 }
933 
applyChunkOpsDeprecated(OperationContext * opCtx,const BSONArray & updateOps,const BSONArray & preCondition,const std::string & nss,const ChunkVersion & lastChunkVersion,const WriteConcernOptions & writeConcern,repl::ReadConcernLevel readConcern)934 Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* opCtx,
935                                                           const BSONArray& updateOps,
936                                                           const BSONArray& preCondition,
937                                                           const std::string& nss,
938                                                           const ChunkVersion& lastChunkVersion,
939                                                           const WriteConcernOptions& writeConcern,
940                                                           repl::ReadConcernLevel readConcern) {
941     invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
942               (readConcern == repl::ReadConcernLevel::kMajorityReadConcern &&
943                writeConcern.wMode == WriteConcernOptions::kMajority));
944     BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition
945                                   << WriteConcernOptions::kWriteConcernField
946                                   << writeConcern.toBSON());
947 
948     auto response =
949         Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
950             opCtx,
951             ReadPreferenceSetting{ReadPreference::PrimaryOnly},
952             "config",
953             cmd,
954             Shard::RetryPolicy::kIdempotent);
955 
956     if (!response.isOK()) {
957         return response.getStatus();
958     }
959 
960     Status status = response.getValue().commandStatus.isOK()
961         ? std::move(response.getValue().writeConcernStatus)
962         : std::move(response.getValue().commandStatus);
963 
964     // TODO (Dianna) This fail point needs to be reexamined when CommitChunkMigration is in:
965     // migrations will no longer be able to exercise it, so split or merge will need to do so.
966     // SERVER-22659.
967     if (MONGO_FAIL_POINT(failApplyChunkOps)) {
968         status = Status(ErrorCodes::InternalError, "Failpoint 'failApplyChunkOps' generated error");
969     }
970 
971     if (!status.isOK()) {
972         string errMsg;
973 
974         // This could be a blip in the network connectivity. Check if the commit request made it.
975         //
976         // If all the updates were successfully written to the chunks collection, the last
977         // document in the list of updates should be returned from a query to the chunks
978         // collection. The last chunk can be identified by namespace and version number.
979 
980         warning() << "chunk operation commit failed and metadata will be revalidated"
981                   << causedBy(redact(status));
982 
983         // Look for the chunk in this shard whose version got bumped. We assume that if that
984         // mod made it to the config server, then applyOps was successful.
985         std::vector<ChunkType> newestChunk;
986         BSONObjBuilder query;
987         lastChunkVersion.addToBSON(query, ChunkType::lastmod());
988         query.append(ChunkType::ns(), nss);
989         Status chunkStatus =
990             getChunks(opCtx, query.obj(), BSONObj(), 1, &newestChunk, nullptr, readConcern);
991 
992         if (!chunkStatus.isOK()) {
993             errMsg = str::stream() << "getChunks function failed, unable to validate chunk "
994                                    << "operation metadata: " << chunkStatus.toString()
995                                    << ". applyChunkOpsDeprecated failed to get confirmation "
996                                    << "of commit. Unable to save chunk ops. Command: " << cmd
997                                    << ". Result: " << response.getValue().response;
998         } else if (!newestChunk.empty()) {
999             invariant(newestChunk.size() == 1);
1000             return Status::OK();
1001         } else {
1002             errMsg = str::stream() << "chunk operation commit failed: version "
1003                                    << lastChunkVersion.toString()
1004                                    << " doesn't exist in namespace: " << nss
1005                                    << ". Unable to save chunk ops. Command: " << cmd
1006                                    << ". Result: " << response.getValue().response;
1007         }
1008 
1009         return status.withContext(errMsg);
1010     }
1011 
1012     return Status::OK();
1013 }
1014 
getDistLockManager()1015 DistLockManager* ShardingCatalogClientImpl::getDistLockManager() {
1016     invariant(_distLockManager);
1017     return _distLockManager.get();
1018 }
1019 
writeConfigServerDirect(OperationContext * opCtx,const BatchedCommandRequest & batchRequest,BatchedCommandResponse * batchResponse)1020 void ShardingCatalogClientImpl::writeConfigServerDirect(OperationContext* opCtx,
1021                                                         const BatchedCommandRequest& batchRequest,
1022                                                         BatchedCommandResponse* batchResponse) {
1023     auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1024     *batchResponse = configShard->runBatchWriteCommand(opCtx,
1025                                                        Shard::kDefaultConfigCommandTimeout,
1026                                                        batchRequest,
1027                                                        Shard::RetryPolicy::kNotIdempotent);
1028 }
1029 
insertConfigDocument(OperationContext * opCtx,const std::string & ns,const BSONObj & doc,const WriteConcernOptions & writeConcern)1030 Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx,
1031                                                        const std::string& ns,
1032                                                        const BSONObj& doc,
1033                                                        const WriteConcernOptions& writeConcern) {
1034     const NamespaceString nss(ns);
1035     invariant(nss.db() == NamespaceString::kAdminDb || nss.db() == NamespaceString::kConfigDb);
1036 
1037     const BSONElement idField = doc.getField("_id");
1038     invariant(!idField.eoo());
1039 
1040     BatchedCommandRequest request([&] {
1041         write_ops::Insert insertOp(nss);
1042         insertOp.setDocuments({doc});
1043         return insertOp;
1044     }());
1045     request.setWriteConcern(writeConcern.toBSON());
1046 
1047     auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1048     for (int retry = 1; retry <= kMaxWriteRetry; retry++) {
1049         auto response = configShard->runBatchWriteCommand(
1050             opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kNoRetry);
1051 
1052         Status status = response.toStatus();
1053 
1054         if (retry < kMaxWriteRetry &&
1055             configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) {
1056             // Pretend like the operation is idempotent because we're handling DuplicateKey errors
1057             // specially
1058             continue;
1059         }
1060 
1061         // If we get DuplicateKey error on the first attempt to insert, this definitively means that
1062         // we are trying to insert the same entry a second time, so error out. If it happens on a
1063         // retry attempt though, it is not clear whether we are actually inserting a duplicate key
1064         // or it is because we failed to wait for write concern on the first attempt. In order to
1065         // differentiate, fetch the entry and check.
1066         if (retry > 1 && status == ErrorCodes::DuplicateKey) {
1067             LOG(1) << "Insert retry failed because of duplicate key error, rechecking.";
1068 
1069             auto fetchDuplicate =
1070                 _exhaustiveFindOnConfig(opCtx,
1071                                         ReadPreferenceSetting{ReadPreference::PrimaryOnly},
1072                                         repl::ReadConcernLevel::kMajorityReadConcern,
1073                                         nss,
1074                                         idField.wrap(),
1075                                         BSONObj(),
1076                                         boost::none);
1077             if (!fetchDuplicate.isOK()) {
1078                 return fetchDuplicate.getStatus();
1079             }
1080 
1081             auto existingDocs = fetchDuplicate.getValue().value;
1082             if (existingDocs.empty()) {
1083                 return {ErrorCodes::DuplicateKey,
1084                         stream() << "DuplicateKey error was returned after a retry attempt, but no "
1085                                     "documents were found. This means a concurrent change occurred "
1086                                     "together with the retries. Original error was "
1087                                  << status.toString()};
1088             }
1089 
1090             invariant(existingDocs.size() == 1);
1091 
1092             BSONObj existing = std::move(existingDocs.front());
1093             if (existing.woCompare(doc) == 0) {
1094                 // Documents match, so treat the operation as success
1095                 return Status::OK();
1096             }
1097         }
1098 
1099         return status;
1100     }
1101 
1102     MONGO_UNREACHABLE;
1103 }
1104 
updateConfigDocument(OperationContext * opCtx,const string & ns,const BSONObj & query,const BSONObj & update,bool upsert,const WriteConcernOptions & writeConcern)1105 StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument(
1106     OperationContext* opCtx,
1107     const string& ns,
1108     const BSONObj& query,
1109     const BSONObj& update,
1110     bool upsert,
1111     const WriteConcernOptions& writeConcern) {
1112     return _updateConfigDocument(opCtx, ns, query, update, upsert, writeConcern);
1113 }
1114 
_updateConfigDocument(OperationContext * opCtx,const string & ns,const BSONObj & query,const BSONObj & update,bool upsert,const WriteConcernOptions & writeConcern)1115 StatusWith<bool> ShardingCatalogClientImpl::_updateConfigDocument(
1116     OperationContext* opCtx,
1117     const string& ns,
1118     const BSONObj& query,
1119     const BSONObj& update,
1120     bool upsert,
1121     const WriteConcernOptions& writeConcern) {
1122     const NamespaceString nss(ns);
1123     invariant(nss.db() == "config");
1124 
1125     const BSONElement idField = query.getField("_id");
1126     invariant(!idField.eoo());
1127 
1128     BatchedCommandRequest request([&] {
1129         write_ops::Update updateOp(nss);
1130         updateOp.setUpdates({[&] {
1131             write_ops::UpdateOpEntry entry;
1132             entry.setQ(query);
1133             entry.setU(update);
1134             entry.setUpsert(upsert);
1135             entry.setMulti(false);
1136             return entry;
1137         }()});
1138         return updateOp;
1139     }());
1140     request.setWriteConcern(writeConcern.toBSON());
1141 
1142     auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1143     auto response = configShard->runBatchWriteCommand(
1144         opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent);
1145 
1146     Status status = response.toStatus();
1147     if (!status.isOK()) {
1148         return status;
1149     }
1150 
1151     const auto nSelected = response.getN();
1152     invariant(nSelected == 0 || nSelected == 1);
1153     return (nSelected == 1);
1154 }
1155 
removeConfigDocuments(OperationContext * opCtx,const string & ns,const BSONObj & query,const WriteConcernOptions & writeConcern)1156 Status ShardingCatalogClientImpl::removeConfigDocuments(OperationContext* opCtx,
1157                                                         const string& ns,
1158                                                         const BSONObj& query,
1159                                                         const WriteConcernOptions& writeConcern) {
1160     const NamespaceString nss(ns);
1161     invariant(nss.db() == "config");
1162 
1163     BatchedCommandRequest request([&] {
1164         write_ops::Delete deleteOp(nss);
1165         deleteOp.setDeletes({[&] {
1166             write_ops::DeleteOpEntry entry;
1167             entry.setQ(query);
1168             entry.setMulti(true);
1169             return entry;
1170         }()});
1171         return deleteOp;
1172     }());
1173     request.setWriteConcern(writeConcern.toBSON());
1174 
1175     auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1176     auto response = configShard->runBatchWriteCommand(
1177         opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent);
1178     return response.toStatus();
1179 }
1180 
_createCappedConfigCollection(OperationContext * opCtx,StringData collName,int cappedSize,const WriteConcernOptions & writeConcern)1181 Status ShardingCatalogClientImpl::_createCappedConfigCollection(
1182     OperationContext* opCtx,
1183     StringData collName,
1184     int cappedSize,
1185     const WriteConcernOptions& writeConcern) {
1186     BSONObj createCmd = BSON("create" << collName << "capped" << true << "size" << cappedSize
1187                                       << WriteConcernOptions::kWriteConcernField
1188                                       << writeConcern.toBSON());
1189 
1190     auto result =
1191         Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
1192             opCtx,
1193             ReadPreferenceSetting{ReadPreference::PrimaryOnly},
1194             "config",
1195             createCmd,
1196             Shard::kDefaultConfigCommandTimeout,
1197             Shard::RetryPolicy::kIdempotent);
1198 
1199     if (!result.isOK()) {
1200         return result.getStatus();
1201     }
1202 
1203     if (!result.getValue().commandStatus.isOK()) {
1204         if (result.getValue().commandStatus == ErrorCodes::NamespaceExists) {
1205             if (result.getValue().writeConcernStatus.isOK()) {
1206                 return Status::OK();
1207             } else {
1208                 return result.getValue().writeConcernStatus;
1209             }
1210         } else {
1211             return result.getValue().commandStatus;
1212         }
1213     }
1214 
1215     return result.getValue().writeConcernStatus;
1216 }
1217 
_exhaustiveFindOnConfig(OperationContext * opCtx,const ReadPreferenceSetting & readPref,const repl::ReadConcernLevel & readConcern,const NamespaceString & nss,const BSONObj & query,const BSONObj & sort,boost::optional<long long> limit)1218 StatusWith<repl::OpTimeWith<vector<BSONObj>>> ShardingCatalogClientImpl::_exhaustiveFindOnConfig(
1219     OperationContext* opCtx,
1220     const ReadPreferenceSetting& readPref,
1221     const repl::ReadConcernLevel& readConcern,
1222     const NamespaceString& nss,
1223     const BSONObj& query,
1224     const BSONObj& sort,
1225     boost::optional<long long> limit) {
1226     auto response = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
1227         opCtx, readPref, readConcern, nss, query, sort, limit);
1228     if (!response.isOK()) {
1229         return response.getStatus();
1230     }
1231 
1232     return repl::OpTimeWith<vector<BSONObj>>(std::move(response.getValue().docs),
1233                                              response.getValue().opTime);
1234 }
1235 
_appendReadConcern(BSONObjBuilder * builder)1236 void ShardingCatalogClientImpl::_appendReadConcern(BSONObjBuilder* builder) {
1237     repl::ReadConcernArgs readConcern(grid.configOpTime(),
1238                                       repl::ReadConcernLevel::kMajorityReadConcern);
1239     readConcern.appendInfo(builder);
1240 }
1241 
getNewKeys(OperationContext * opCtx,StringData purpose,const LogicalTime & newerThanThis,repl::ReadConcernLevel readConcernLevel)1242 StatusWith<std::vector<KeysCollectionDocument>> ShardingCatalogClientImpl::getNewKeys(
1243     OperationContext* opCtx,
1244     StringData purpose,
1245     const LogicalTime& newerThanThis,
1246     repl::ReadConcernLevel readConcernLevel) {
1247     auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard();
1248 
1249     BSONObjBuilder queryBuilder;
1250     queryBuilder.append("purpose", purpose);
1251     queryBuilder.append("expiresAt", BSON("$gt" << newerThanThis.asTimestamp()));
1252 
1253     auto findStatus =
1254         config->exhaustiveFindOnConfig(opCtx,
1255                                        kConfigReadSelector,
1256                                        readConcernLevel,
1257                                        NamespaceString(KeysCollectionDocument::ConfigNS),
1258                                        queryBuilder.obj(),
1259                                        BSON("expiresAt" << 1),
1260                                        boost::none);
1261 
1262     if (!findStatus.isOK()) {
1263         return findStatus.getStatus();
1264     }
1265 
1266     const auto& keyDocs = findStatus.getValue().docs;
1267     std::vector<KeysCollectionDocument> keys;
1268     for (auto&& keyDoc : keyDocs) {
1269         auto parseStatus = KeysCollectionDocument::fromBSON(keyDoc);
1270         if (!parseStatus.isOK()) {
1271             return parseStatus.getStatus();
1272         }
1273 
1274         keys.push_back(std::move(parseStatus.getValue()));
1275     }
1276 
1277     return keys;
1278 }
1279 
1280 }  // namespace mongo
1281