1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/repl/storage_interface_impl.h"
36 
37 #include <algorithm>
38 #include <boost/optional.hpp>
39 #include <utility>
40 
41 #include "mongo/base/status.h"
42 #include "mongo/base/status_with.h"
43 #include "mongo/base/string_data.h"
44 #include "mongo/bson/bsonobj.h"
45 #include "mongo/bson/bsonobjbuilder.h"
46 #include "mongo/bson/util/bson_extract.h"
47 #include "mongo/db/auth/authorization_manager.h"
48 #include "mongo/db/catalog/coll_mod.h"
49 #include "mongo/db/catalog/collection.h"
50 #include "mongo/db/catalog/collection_catalog_entry.h"
51 #include "mongo/db/catalog/database.h"
52 #include "mongo/db/catalog/document_validation.h"
53 #include "mongo/db/catalog/index_catalog.h"
54 #include "mongo/db/catalog/index_create.h"
55 #include "mongo/db/catalog/uuid_catalog.h"
56 #include "mongo/db/client.h"
57 #include "mongo/db/concurrency/d_concurrency.h"
58 #include "mongo/db/concurrency/write_conflict_exception.h"
59 #include "mongo/db/curop.h"
60 #include "mongo/db/db_raii.h"
61 #include "mongo/db/dbhelpers.h"
62 #include "mongo/db/exec/delete.h"
63 #include "mongo/db/exec/update.h"
64 #include "mongo/db/jsobj.h"
65 #include "mongo/db/keypattern.h"
66 #include "mongo/db/operation_context.h"
67 #include "mongo/db/ops/delete_request.h"
68 #include "mongo/db/ops/parsed_update.h"
69 #include "mongo/db/ops/update_request.h"
70 #include "mongo/db/query/get_executor.h"
71 #include "mongo/db/query/internal_plans.h"
72 #include "mongo/db/repl/collection_bulk_loader_impl.h"
73 #include "mongo/db/repl/oplog.h"
74 #include "mongo/db/repl/replication_coordinator.h"
75 #include "mongo/db/repl/replication_coordinator_global.h"
76 #include "mongo/db/repl/rollback_gen.h"
77 #include "mongo/db/service_context.h"
78 #include "mongo/util/assert_util.h"
79 #include "mongo/util/log.h"
80 #include "mongo/util/mongoutils/str.h"
81 
82 namespace mongo {
83 namespace repl {
84 
85 const char StorageInterfaceImpl::kDefaultRollbackIdNamespace[] = "local.system.rollback.id";
86 const char StorageInterfaceImpl::kRollbackIdFieldName[] = "rollbackId";
87 const char StorageInterfaceImpl::kRollbackIdDocumentId[] = "rollbackId";
88 
89 namespace {
90 using UniqueLock = stdx::unique_lock<stdx::mutex>;
91 
92 const auto kIdIndexName = "_id_"_sd;
93 
94 }  // namespace
95 
StorageInterfaceImpl()96 StorageInterfaceImpl::StorageInterfaceImpl()
97     : _rollbackIdNss(StorageInterfaceImpl::kDefaultRollbackIdNamespace) {}
98 
getRollbackID(OperationContext * opCtx)99 StatusWith<int> StorageInterfaceImpl::getRollbackID(OperationContext* opCtx) {
100     BSONObjBuilder bob;
101     bob.append("_id", kRollbackIdDocumentId);
102     auto id = bob.obj();
103 
104     try {
105         auto rbidDoc = findById(opCtx, _rollbackIdNss, id["_id"]);
106         if (!rbidDoc.isOK()) {
107             return rbidDoc.getStatus();
108         }
109 
110         auto rbid = RollbackID::parse(IDLParserErrorContext("RollbackID"), rbidDoc.getValue());
111         invariant(rbid.get_id() == kRollbackIdDocumentId);
112         return rbid.getRollbackId();
113     } catch (...) {
114         return exceptionToStatus();
115     }
116 
117     MONGO_UNREACHABLE;
118 }
119 
initializeRollbackID(OperationContext * opCtx)120 StatusWith<int> StorageInterfaceImpl::initializeRollbackID(OperationContext* opCtx) {
121     auto status = createCollection(opCtx, _rollbackIdNss, CollectionOptions());
122     if (!status.isOK()) {
123         return status;
124     }
125 
126     RollbackID rbid;
127     int initRBID = 1;
128     rbid.set_id(kRollbackIdDocumentId);
129     rbid.setRollbackId(initRBID);
130 
131     BSONObjBuilder bob;
132     rbid.serialize(&bob);
133     Timestamp noTimestamp;  // This write is not replicated.
134     status = insertDocument(opCtx,
135                             _rollbackIdNss,
136                             TimestampedBSONObj{bob.done(), noTimestamp},
137                             OpTime::kUninitializedTerm);
138     if (status.isOK()) {
139         return initRBID;
140     } else {
141         return status;
142     }
143 }
144 
incrementRollbackID(OperationContext * opCtx)145 StatusWith<int> StorageInterfaceImpl::incrementRollbackID(OperationContext* opCtx) {
146     // This is safe because this is only called during rollback, and you can not have two
147     // rollbacks at once.
148     auto rbidSW = getRollbackID(opCtx);
149     if (!rbidSW.isOK()) {
150         return rbidSW;
151     }
152 
153     // If we would go over the integer limit, reset the Rollback ID to 1.
154     BSONObjBuilder updateBob;
155     int newRBID = -1;
156     if (rbidSW.getValue() == std::numeric_limits<int>::max()) {
157         newRBID = 1;
158         BSONObjBuilder setBob(updateBob.subobjStart("$set"));
159         setBob.append(kRollbackIdFieldName, newRBID);
160     } else {
161         BSONObjBuilder incBob(updateBob.subobjStart("$inc"));
162         incBob.append(kRollbackIdFieldName, 1);
163         newRBID = rbidSW.getValue() + 1;
164     }
165 
166     // Since the Rollback ID is in a singleton collection, we can fix the _id field.
167     BSONObjBuilder bob;
168     bob.append("_id", kRollbackIdDocumentId);
169     auto id = bob.obj();
170     Status status = upsertById(opCtx, _rollbackIdNss, id["_id"], updateBob.obj());
171 
172     // We wait until durable so that we are sure the Rollback ID is updated before rollback ends.
173     if (status.isOK()) {
174         opCtx->recoveryUnit()->waitUntilDurable();
175         return newRBID;
176     }
177     return status;
178 }
179 
180 StatusWith<std::unique_ptr<CollectionBulkLoader>>
createCollectionForBulkLoading(const NamespaceString & nss,const CollectionOptions & options,const BSONObj idIndexSpec,const std::vector<BSONObj> & secondaryIndexSpecs)181 StorageInterfaceImpl::createCollectionForBulkLoading(
182     const NamespaceString& nss,
183     const CollectionOptions& options,
184     const BSONObj idIndexSpec,
185     const std::vector<BSONObj>& secondaryIndexSpecs) {
186 
187     LOG(2) << "StorageInterfaceImpl::createCollectionForBulkLoading called for ns: " << nss.ns();
188 
189     class StashClient {
190     public:
191         StashClient() {
192             if (Client::getCurrent()) {
193                 _stashedClient = Client::releaseCurrent();
194             }
195         }
196         ~StashClient() {
197             if (Client::getCurrent()) {
198                 Client::releaseCurrent();
199             }
200             if (_stashedClient) {
201                 Client::setCurrent(std::move(_stashedClient));
202             }
203         }
204 
205     private:
206         ServiceContext::UniqueClient _stashedClient;
207     } stash;
208     Client::setCurrent(
209         getGlobalServiceContext()->makeClient(str::stream() << nss.ns() << " loader"));
210     auto opCtx = cc().makeOperationContext();
211 
212     documentValidationDisabled(opCtx.get()) = true;
213 
214     std::unique_ptr<AutoGetCollection> autoColl;
215     // Retry if WCE.
216     Status status = writeConflictRetry(opCtx.get(), "beginCollectionClone", nss.ns(), [&] {
217         UnreplicatedWritesBlock uwb(opCtx.get());
218 
219         // Get locks and create the collection.
220         AutoGetOrCreateDb db(opCtx.get(), nss.db(), MODE_X);
221         AutoGetCollection coll(opCtx.get(), nss, MODE_IX);
222 
223         if (coll.getCollection()) {
224             return Status(ErrorCodes::NamespaceExists,
225                           str::stream() << "Collection " << nss.ns() << " already exists.");
226         }
227         {
228             // Create the collection.
229             WriteUnitOfWork wunit(opCtx.get());
230             fassert(40332, db.getDb()->createCollection(opCtx.get(), nss.ns(), options, false));
231             wunit.commit();
232         }
233 
234         autoColl = stdx::make_unique<AutoGetCollection>(opCtx.get(), nss, MODE_IX);
235 
236         // Build empty capped indexes.  Capped indexes cannot be built by the MultiIndexBlock
237         // because the cap might delete documents off the back while we are inserting them into
238         // the front.
239         if (options.capped) {
240             WriteUnitOfWork wunit(opCtx.get());
241             if (!idIndexSpec.isEmpty()) {
242                 auto status =
243                     autoColl->getCollection()->getIndexCatalog()->createIndexOnEmptyCollection(
244                         opCtx.get(), idIndexSpec);
245                 if (!status.getStatus().isOK()) {
246                     return status.getStatus();
247                 }
248             }
249             for (auto&& spec : secondaryIndexSpecs) {
250                 auto status =
251                     autoColl->getCollection()->getIndexCatalog()->createIndexOnEmptyCollection(
252                         opCtx.get(), spec);
253                 if (!status.getStatus().isOK()) {
254                     return status.getStatus();
255                 }
256             }
257             wunit.commit();
258         }
259 
260         return Status::OK();
261     });
262 
263     if (!status.isOK()) {
264         return status;
265     }
266 
267     // Move locks into loader, so it now controls their lifetime.
268     auto loader =
269         stdx::make_unique<CollectionBulkLoaderImpl>(Client::releaseCurrent(),
270                                                     std::move(opCtx),
271                                                     std::move(autoColl),
272                                                     options.capped ? BSONObj() : idIndexSpec);
273 
274     status = loader->init(options.capped ? std::vector<BSONObj>() : secondaryIndexSpecs);
275     if (!status.isOK()) {
276         return status;
277     }
278     return {std::move(loader)};
279 }
280 
insertDocument(OperationContext * opCtx,const NamespaceString & nss,const TimestampedBSONObj & doc,long long term)281 Status StorageInterfaceImpl::insertDocument(OperationContext* opCtx,
282                                             const NamespaceString& nss,
283                                             const TimestampedBSONObj& doc,
284                                             long long term) {
285     return insertDocuments(opCtx, nss, {InsertStatement(doc.obj, doc.timestamp, term)});
286 }
287 
288 namespace {
289 
290 /**
291  * Returns Collection* from database RAII object.
292  * Returns NamespaceNotFound if the database or collection does not exist.
293  */
294 template <typename AutoGetCollectionType>
getCollection(const AutoGetCollectionType & autoGetCollection,const NamespaceString & nss,const std::string & message)295 StatusWith<Collection*> getCollection(const AutoGetCollectionType& autoGetCollection,
296                                       const NamespaceString& nss,
297                                       const std::string& message) {
298     if (!autoGetCollection.getDb()) {
299         return {ErrorCodes::NamespaceNotFound,
300                 str::stream() << "Database [" << nss.db() << "] not found. " << message};
301     }
302 
303     auto collection = autoGetCollection.getCollection();
304     if (!collection) {
305         return {ErrorCodes::NamespaceNotFound,
306                 str::stream() << "Collection [" << nss.ns() << "] not found. " << message};
307     }
308 
309     return collection;
310 }
311 
insertDocumentsSingleBatch(OperationContext * opCtx,const NamespaceString & nss,std::vector<InsertStatement>::const_iterator begin,std::vector<InsertStatement>::const_iterator end)312 Status insertDocumentsSingleBatch(OperationContext* opCtx,
313                                   const NamespaceString& nss,
314                                   std::vector<InsertStatement>::const_iterator begin,
315                                   std::vector<InsertStatement>::const_iterator end) {
316     AutoGetCollection autoColl(opCtx, nss, MODE_IX);
317 
318     auto collectionResult =
319         getCollection(autoColl, nss, "The collection must exist before inserting documents.");
320     if (!collectionResult.isOK()) {
321         return collectionResult.getStatus();
322     }
323     auto collection = collectionResult.getValue();
324 
325     WriteUnitOfWork wunit(opCtx);
326     OpDebug* const nullOpDebug = nullptr;
327     auto status = collection->insertDocuments(opCtx, begin, end, nullOpDebug, false);
328     if (!status.isOK()) {
329         return status;
330     }
331     wunit.commit();
332 
333     return Status::OK();
334 }
335 
336 }  // namespace
337 
insertDocuments(OperationContext * opCtx,const NamespaceString & nss,const std::vector<InsertStatement> & docs)338 Status StorageInterfaceImpl::insertDocuments(OperationContext* opCtx,
339                                              const NamespaceString& nss,
340                                              const std::vector<InsertStatement>& docs) {
341     if (docs.size() > 1U) {
342         try {
343             if (insertDocumentsSingleBatch(opCtx, nss, docs.cbegin(), docs.cend()).isOK()) {
344                 return Status::OK();
345             }
346         } catch (...) {
347             // Ignore this failure and behave as-if we never tried to do the combined batch insert.
348             // The loop below will handle reporting any non-transient errors.
349         }
350     }
351 
352     // Try to insert the batch one-at-a-time because the batch failed all-at-once inserting.
353     for (auto it = docs.cbegin(); it != docs.cend(); ++it) {
354         auto status =
355             writeConflictRetry(opCtx, "StorageInterfaceImpl::insertDocuments", nss.ns(), [&] {
356                 auto status = insertDocumentsSingleBatch(opCtx, nss, it, it + 1);
357                 if (!status.isOK()) {
358                     return status;
359                 }
360 
361                 return Status::OK();
362             });
363 
364         if (!status.isOK()) {
365             return status;
366         }
367     }
368 
369     return Status::OK();
370 }
371 
dropReplicatedDatabases(OperationContext * opCtx)372 Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* opCtx) {
373     dropAllDatabasesExceptLocal(opCtx);
374     return Status::OK();
375 }
376 
createOplog(OperationContext * opCtx,const NamespaceString & nss)377 Status StorageInterfaceImpl::createOplog(OperationContext* opCtx, const NamespaceString& nss) {
378     mongo::repl::createOplog(opCtx, nss.ns(), true);
379     return Status::OK();
380 }
381 
getOplogMaxSize(OperationContext * opCtx,const NamespaceString & nss)382 StatusWith<size_t> StorageInterfaceImpl::getOplogMaxSize(OperationContext* opCtx,
383                                                          const NamespaceString& nss) {
384     AutoGetCollectionForReadCommand autoColl(opCtx, nss);
385     auto collectionResult = getCollection(autoColl, nss, "Your oplog doesn't exist.");
386     if (!collectionResult.isOK()) {
387         return collectionResult.getStatus();
388     }
389     auto collection = collectionResult.getValue();
390 
391     const auto options = collection->getCatalogEntry()->getCollectionOptions(opCtx);
392     if (!options.capped)
393         return {ErrorCodes::BadValue, str::stream() << nss.ns() << " isn't capped"};
394 
395     return options.cappedSize;
396 }
397 
createCollection(OperationContext * opCtx,const NamespaceString & nss,const CollectionOptions & options)398 Status StorageInterfaceImpl::createCollection(OperationContext* opCtx,
399                                               const NamespaceString& nss,
400                                               const CollectionOptions& options) {
401     return writeConflictRetry(opCtx, "StorageInterfaceImpl::createCollection", nss.ns(), [&] {
402         AutoGetOrCreateDb databaseWriteGuard(opCtx, nss.db(), MODE_X);
403         auto db = databaseWriteGuard.getDb();
404         invariant(db);
405         if (db->getCollection(opCtx, nss)) {
406             return Status(ErrorCodes::NamespaceExists,
407                           str::stream() << "Collection " << nss.ns() << " already exists.");
408         }
409         WriteUnitOfWork wuow(opCtx);
410         try {
411             auto coll = db->createCollection(opCtx, nss.ns(), options);
412             invariant(coll);
413         } catch (const AssertionException& ex) {
414             return ex.toStatus();
415         }
416         wuow.commit();
417 
418         return Status::OK();
419     });
420 }
421 
dropCollection(OperationContext * opCtx,const NamespaceString & nss)422 Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const NamespaceString& nss) {
423     return writeConflictRetry(opCtx, "StorageInterfaceImpl::dropCollection", nss.ns(), [&] {
424         AutoGetDb autoDB(opCtx, nss.db(), MODE_X);
425         if (!autoDB.getDb()) {
426             // Database does not exist - nothing to do.
427             return Status::OK();
428         }
429         WriteUnitOfWork wunit(opCtx);
430         const auto status = autoDB.getDb()->dropCollectionEvenIfSystem(opCtx, nss);
431         if (!status.isOK()) {
432             return status;
433         }
434         wunit.commit();
435         return Status::OK();
436     });
437 }
438 
truncateCollection(OperationContext * opCtx,const NamespaceString & nss)439 Status StorageInterfaceImpl::truncateCollection(OperationContext* opCtx,
440                                                 const NamespaceString& nss) {
441     return writeConflictRetry(opCtx, "StorageInterfaceImpl::truncateCollection", nss.ns(), [&] {
442         AutoGetCollection autoColl(opCtx, nss, MODE_X);
443         auto collectionResult =
444             getCollection(autoColl, nss, "The collection must exist before truncating.");
445         if (!collectionResult.isOK()) {
446             return collectionResult.getStatus();
447         }
448         auto collection = collectionResult.getValue();
449 
450         WriteUnitOfWork wunit(opCtx);
451         const auto status = collection->truncate(opCtx);
452         if (!status.isOK()) {
453             return status;
454         }
455         wunit.commit();
456         return Status::OK();
457     });
458 }
459 
renameCollection(OperationContext * opCtx,const NamespaceString & fromNS,const NamespaceString & toNS,bool stayTemp)460 Status StorageInterfaceImpl::renameCollection(OperationContext* opCtx,
461                                               const NamespaceString& fromNS,
462                                               const NamespaceString& toNS,
463                                               bool stayTemp) {
464     if (fromNS.db() != toNS.db()) {
465         return Status(ErrorCodes::InvalidNamespace,
466                       str::stream() << "Cannot rename collection between databases. From NS: "
467                                     << fromNS.ns()
468                                     << "; to NS: "
469                                     << toNS.ns());
470     }
471 
472     return writeConflictRetry(opCtx, "StorageInterfaceImpl::renameCollection", fromNS.ns(), [&] {
473         AutoGetDb autoDB(opCtx, fromNS.db(), MODE_X);
474         if (!autoDB.getDb()) {
475             return Status(ErrorCodes::NamespaceNotFound,
476                           str::stream() << "Cannot rename collection from " << fromNS.ns() << " to "
477                                         << toNS.ns()
478                                         << ". Database "
479                                         << fromNS.db()
480                                         << " not found.");
481         }
482         WriteUnitOfWork wunit(opCtx);
483         const auto status =
484             autoDB.getDb()->renameCollection(opCtx, fromNS.ns(), toNS.ns(), stayTemp);
485         if (!status.isOK()) {
486             return status;
487         }
488 
489         auto newColl = autoDB.getDb()->getCollection(opCtx, toNS);
490         if (newColl->uuid()) {
491             UUIDCatalog::get(opCtx).onRenameCollection(
492                 opCtx, [newColl] { return newColl; }, newColl->uuid().get());
493         }
494         wunit.commit();
495         return status;
496     });
497 }
498 
499 namespace {
500 
501 /**
502  * Returns DeleteStageParams for deleteOne with fetch.
503  */
makeDeleteStageParamsForDeleteDocuments()504 DeleteStageParams makeDeleteStageParamsForDeleteDocuments() {
505     DeleteStageParams deleteStageParams;
506     deleteStageParams.isMulti = true;
507     deleteStageParams.returnDeleted = true;
508     return deleteStageParams;
509 }
510 
511 /**
512  * Shared implementation between findDocuments, deleteDocuments, and _findOrDeleteById.
513  * _findOrDeleteById is used by findById, and deleteById.
514  */
515 enum class FindDeleteMode { kFind, kDelete };
_findOrDeleteDocuments(OperationContext * opCtx,const NamespaceString & nss,boost::optional<StringData> indexName,StorageInterface::ScanDirection scanDirection,const BSONObj & startKey,const BSONObj & endKey,BoundInclusion boundInclusion,std::size_t limit,FindDeleteMode mode)516 StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments(
517     OperationContext* opCtx,
518     const NamespaceString& nss,
519     boost::optional<StringData> indexName,
520     StorageInterface::ScanDirection scanDirection,
521     const BSONObj& startKey,
522     const BSONObj& endKey,
523     BoundInclusion boundInclusion,
524     std::size_t limit,
525     FindDeleteMode mode) {
526     auto isFind = mode == FindDeleteMode::kFind;
527     auto opStr = isFind ? "StorageInterfaceImpl::find" : "StorageInterfaceImpl::delete";
528 
529 
530     return writeConflictRetry(opCtx, opStr, nss.ns(), [&] {
531         // We need to explicitly use this in a few places to help the type inference.  Use a
532         // shorthand.
533         using Result = StatusWith<std::vector<BSONObj>>;
534 
535         auto collectionAccessMode = isFind ? MODE_IS : MODE_IX;
536         AutoGetCollection autoColl(opCtx, nss, collectionAccessMode);
537         auto collectionResult = getCollection(
538             autoColl, nss, str::stream() << "Unable to proceed with " << opStr << ".");
539         if (!collectionResult.isOK()) {
540             return Result(collectionResult.getStatus());
541         }
542         auto collection = collectionResult.getValue();
543 
544         auto isForward = scanDirection == StorageInterface::ScanDirection::kForward;
545         auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD;
546 
547         std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor;
548         if (!indexName) {
549             if (!startKey.isEmpty()) {
550                 return Result(ErrorCodes::NoSuchKey,
551                               "non-empty startKey not allowed for collection scan");
552             }
553             if (boundInclusion != BoundInclusion::kIncludeStartKeyOnly) {
554                 return Result(ErrorCodes::InvalidOptions,
555                               "bound inclusion must be BoundInclusion::kIncludeStartKeyOnly for "
556                               "collection scan");
557             }
558             // Use collection scan.
559             planExecutor = isFind
560                 ? InternalPlanner::collectionScan(
561                       opCtx, nss.ns(), collection, PlanExecutor::NO_YIELD, direction)
562                 : InternalPlanner::deleteWithCollectionScan(
563                       opCtx,
564                       collection,
565                       makeDeleteStageParamsForDeleteDocuments(),
566                       PlanExecutor::NO_YIELD,
567                       direction);
568         } else {
569             // Use index scan.
570             auto indexCatalog = collection->getIndexCatalog();
571             invariant(indexCatalog);
572             bool includeUnfinishedIndexes = false;
573             IndexDescriptor* indexDescriptor =
574                 indexCatalog->findIndexByName(opCtx, *indexName, includeUnfinishedIndexes);
575             if (!indexDescriptor) {
576                 return Result(ErrorCodes::IndexNotFound,
577                               str::stream() << "Index not found, ns:" << nss.ns() << ", index: "
578                                             << *indexName);
579             }
580             if (indexDescriptor->isPartial()) {
581                 return Result(ErrorCodes::IndexOptionsConflict,
582                               str::stream()
583                                   << "Partial index is not allowed for this operation, ns:"
584                                   << nss.ns()
585                                   << ", index: "
586                                   << *indexName);
587             }
588 
589             KeyPattern keyPattern(indexDescriptor->keyPattern());
590             auto minKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, false));
591             auto maxKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, true));
592             auto bounds =
593                 isForward ? std::make_pair(minKey, maxKey) : std::make_pair(maxKey, minKey);
594             if (!startKey.isEmpty()) {
595                 bounds.first = startKey;
596             }
597             if (!endKey.isEmpty()) {
598                 bounds.second = endKey;
599             }
600             planExecutor = isFind
601                 ? InternalPlanner::indexScan(opCtx,
602                                              collection,
603                                              indexDescriptor,
604                                              bounds.first,
605                                              bounds.second,
606                                              boundInclusion,
607                                              PlanExecutor::NO_YIELD,
608                                              direction,
609                                              InternalPlanner::IXSCAN_FETCH)
610                 : InternalPlanner::deleteWithIndexScan(opCtx,
611                                                        collection,
612                                                        makeDeleteStageParamsForDeleteDocuments(),
613                                                        indexDescriptor,
614                                                        bounds.first,
615                                                        bounds.second,
616                                                        boundInclusion,
617                                                        PlanExecutor::NO_YIELD,
618                                                        direction);
619         }
620 
621         std::vector<BSONObj> docs;
622         while (docs.size() < limit) {
623             BSONObj doc;
624             auto state = planExecutor->getNext(&doc, nullptr);
625             if (PlanExecutor::ADVANCED == state) {
626                 docs.push_back(doc.getOwned());
627             } else {
628                 invariant(PlanExecutor::IS_EOF == state);
629                 break;
630             }
631         }
632         return Result(docs);
633     });
634 }
635 
_findOrDeleteById(OperationContext * opCtx,const NamespaceString & nss,const BSONElement & idKey,FindDeleteMode mode)636 StatusWith<BSONObj> _findOrDeleteById(OperationContext* opCtx,
637                                       const NamespaceString& nss,
638                                       const BSONElement& idKey,
639                                       FindDeleteMode mode) {
640     auto wrappedIdKey = idKey.wrap("");
641     auto result = _findOrDeleteDocuments(opCtx,
642                                          nss,
643                                          kIdIndexName,
644                                          StorageInterface::ScanDirection::kForward,
645                                          wrappedIdKey,
646                                          wrappedIdKey,
647                                          BoundInclusion::kIncludeBothStartAndEndKeys,
648                                          1U,
649                                          mode);
650     if (!result.isOK()) {
651         return result.getStatus();
652     }
653     const auto& docs = result.getValue();
654     if (docs.empty()) {
655         return {ErrorCodes::NoSuchKey, str::stream() << "No document found with _id: " << idKey};
656     }
657 
658     return docs.front();
659 }
660 
661 }  // namespace
662 
findDocuments(OperationContext * opCtx,const NamespaceString & nss,boost::optional<StringData> indexName,ScanDirection scanDirection,const BSONObj & startKey,BoundInclusion boundInclusion,std::size_t limit)663 StatusWith<std::vector<BSONObj>> StorageInterfaceImpl::findDocuments(
664     OperationContext* opCtx,
665     const NamespaceString& nss,
666     boost::optional<StringData> indexName,
667     ScanDirection scanDirection,
668     const BSONObj& startKey,
669     BoundInclusion boundInclusion,
670     std::size_t limit) {
671     return _findOrDeleteDocuments(opCtx,
672                                   nss,
673                                   indexName,
674                                   scanDirection,
675                                   startKey,
676                                   {},
677                                   boundInclusion,
678                                   limit,
679                                   FindDeleteMode::kFind);
680 }
681 
deleteDocuments(OperationContext * opCtx,const NamespaceString & nss,boost::optional<StringData> indexName,ScanDirection scanDirection,const BSONObj & startKey,BoundInclusion boundInclusion,std::size_t limit)682 StatusWith<std::vector<BSONObj>> StorageInterfaceImpl::deleteDocuments(
683     OperationContext* opCtx,
684     const NamespaceString& nss,
685     boost::optional<StringData> indexName,
686     ScanDirection scanDirection,
687     const BSONObj& startKey,
688     BoundInclusion boundInclusion,
689     std::size_t limit) {
690     return _findOrDeleteDocuments(opCtx,
691                                   nss,
692                                   indexName,
693                                   scanDirection,
694                                   startKey,
695                                   {},
696                                   boundInclusion,
697                                   limit,
698                                   FindDeleteMode::kDelete);
699 }
700 
findSingleton(OperationContext * opCtx,const NamespaceString & nss)701 StatusWith<BSONObj> StorageInterfaceImpl::findSingleton(OperationContext* opCtx,
702                                                         const NamespaceString& nss) {
703     auto result = findDocuments(opCtx,
704                                 nss,
705                                 boost::none,  // Collection scan.
706                                 StorageInterface::ScanDirection::kForward,
707                                 {},  // Start at the beginning of the collection.
708                                 BoundInclusion::kIncludeStartKeyOnly,
709                                 2U);  // Ask for 2 documents to ensure it's a singleton.
710     if (!result.isOK()) {
711         return result.getStatus();
712     }
713 
714     const auto& docs = result.getValue();
715     if (docs.empty()) {
716         return {ErrorCodes::CollectionIsEmpty,
717                 str::stream() << "No document found in namespace: " << nss.ns()};
718     } else if (docs.size() != 1U) {
719         return {ErrorCodes::TooManyMatchingDocuments,
720                 str::stream() << "More than singleton document found in namespace: " << nss.ns()};
721     }
722 
723     return docs.front();
724 }
725 
findById(OperationContext * opCtx,const NamespaceString & nss,const BSONElement & idKey)726 StatusWith<BSONObj> StorageInterfaceImpl::findById(OperationContext* opCtx,
727                                                    const NamespaceString& nss,
728                                                    const BSONElement& idKey) {
729     return _findOrDeleteById(opCtx, nss, idKey, FindDeleteMode::kFind);
730 }
731 
deleteById(OperationContext * opCtx,const NamespaceString & nss,const BSONElement & idKey)732 StatusWith<BSONObj> StorageInterfaceImpl::deleteById(OperationContext* opCtx,
733                                                      const NamespaceString& nss,
734                                                      const BSONElement& idKey) {
735     return _findOrDeleteById(opCtx, nss, idKey, FindDeleteMode::kDelete);
736 }
737 
738 namespace {
739 
740 /**
741  * Checks _id key passed to upsertById and returns a query document for UpdateRequest.
742  */
makeUpsertQuery(const BSONElement & idKey)743 StatusWith<BSONObj> makeUpsertQuery(const BSONElement& idKey) {
744     auto query = BSON("_id" << idKey);
745 
746     // With the ID hack, only simple _id queries are allowed. Otherwise, UpdateStage will fail with
747     // a fatal assertion.
748     if (!CanonicalQuery::isSimpleIdQuery(query)) {
749         return {ErrorCodes::InvalidIdField,
750                 str::stream() << "Unable to update document with a non-simple _id query: "
751                               << query};
752     }
753 
754     return query;
755 }
756 
_updateWithQuery(OperationContext * opCtx,const UpdateRequest & request)757 Status _updateWithQuery(OperationContext* opCtx, const UpdateRequest& request) {
758     invariant(!request.isMulti());  // We only want to update one document for performance.
759     invariant(!request.shouldReturnAnyDocs());
760     invariant(PlanExecutor::NO_YIELD == request.getYieldPolicy());
761 
762     auto& nss = request.getNamespaceString();
763     return writeConflictRetry(opCtx, "_updateWithQuery", nss.ns(), [&] {
764         // ParsedUpdate needs to be inside the write conflict retry loop because it may create a
765         // CanonicalQuery whose ownership will be transferred to the plan executor in
766         // getExecutorUpdate().
767         ParsedUpdate parsedUpdate(opCtx, &request);
768         auto parsedUpdateStatus = parsedUpdate.parseRequest();
769         if (!parsedUpdateStatus.isOK()) {
770             return parsedUpdateStatus;
771         }
772 
773         AutoGetCollection autoColl(opCtx, nss, MODE_IX);
774         auto collectionResult = getCollection(
775             autoColl,
776             nss,
777             str::stream() << "Unable to update documents in " << nss.ns() << " using query "
778                           << request.getQuery());
779         if (!collectionResult.isOK()) {
780             return collectionResult.getStatus();
781         }
782         auto collection = collectionResult.getValue();
783 
784         auto planExecutorResult =
785             mongo::getExecutorUpdate(opCtx, nullptr, collection, &parsedUpdate);
786         if (!planExecutorResult.isOK()) {
787             return planExecutorResult.getStatus();
788         }
789         auto planExecutor = std::move(planExecutorResult.getValue());
790 
791         return planExecutor->executePlan();
792     });
793 }
794 
795 }  // namespace
796 
upsertById(OperationContext * opCtx,const NamespaceString & nss,const BSONElement & idKey,const BSONObj & update)797 Status StorageInterfaceImpl::upsertById(OperationContext* opCtx,
798                                         const NamespaceString& nss,
799                                         const BSONElement& idKey,
800                                         const BSONObj& update) {
801     // Validate and construct an _id query for UpdateResult.
802     // The _id key will be passed directly to IDHackStage.
803     auto queryResult = makeUpsertQuery(idKey);
804     if (!queryResult.isOK()) {
805         return queryResult.getStatus();
806     }
807     auto query = queryResult.getValue();
808 
809     UpdateRequest request(nss);
810     request.setQuery(query);
811     request.setUpdates(update);
812     request.setUpsert(true);
813     invariant(!request.isMulti());  // This follows from using an exact _id query.
814     invariant(!request.shouldReturnAnyDocs());
815     invariant(PlanExecutor::NO_YIELD == request.getYieldPolicy());
816 
817     return writeConflictRetry(opCtx, "StorageInterfaceImpl::upsertById", nss.ns(), [&] {
818         // ParsedUpdate needs to be inside the write conflict retry loop because it contains
819         // the UpdateDriver whose state may be modified while we are applying the update.
820         ParsedUpdate parsedUpdate(opCtx, &request);
821         auto parsedUpdateStatus = parsedUpdate.parseRequest();
822         if (!parsedUpdateStatus.isOK()) {
823             return parsedUpdateStatus;
824         }
825 
826         AutoGetCollection autoColl(opCtx, nss, MODE_IX);
827         auto collectionResult = getCollection(autoColl, nss, "Unable to update document.");
828         if (!collectionResult.isOK()) {
829             return collectionResult.getStatus();
830         }
831         auto collection = collectionResult.getValue();
832 
833         // We're using the ID hack to perform the update so we have to disallow collections
834         // without an _id index.
835         auto descriptor = collection->getIndexCatalog()->findIdIndex(opCtx);
836         if (!descriptor) {
837             return Status(ErrorCodes::IndexNotFound,
838                           "Unable to update document in a collection without an _id index.");
839         }
840 
841         UpdateStageParams updateStageParams(
842             parsedUpdate.getRequest(), parsedUpdate.getDriver(), nullptr);
843         auto planExecutor = InternalPlanner::updateWithIdHack(opCtx,
844                                                               collection,
845                                                               updateStageParams,
846                                                               descriptor,
847                                                               idKey.wrap(""),
848                                                               parsedUpdate.yieldPolicy());
849 
850         return planExecutor->executePlan();
851     });
852 }
853 
putSingleton(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & update)854 Status StorageInterfaceImpl::putSingleton(OperationContext* opCtx,
855                                           const NamespaceString& nss,
856                                           const BSONObj& update) {
857     UpdateRequest request(nss);
858     request.setQuery({});
859     request.setUpdates(update);
860     request.setUpsert(true);
861     return _updateWithQuery(opCtx, request);
862 }
863 
updateSingleton(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & query,const BSONObj & update)864 Status StorageInterfaceImpl::updateSingleton(OperationContext* opCtx,
865                                              const NamespaceString& nss,
866                                              const BSONObj& query,
867                                              const BSONObj& update) {
868     UpdateRequest request(nss);
869     request.setQuery(query);
870     request.setUpdates(update);
871     invariant(!request.isUpsert());
872     return _updateWithQuery(opCtx, request);
873 }
874 
deleteByFilter(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & filter)875 Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx,
876                                             const NamespaceString& nss,
877                                             const BSONObj& filter) {
878     DeleteRequest request(nss);
879     request.setQuery(filter);
880     request.setMulti(true);
881     request.setYieldPolicy(PlanExecutor::NO_YIELD);
882 
883     // This disables the isLegalClientSystemNS() check in getExecutorDelete() which is used to
884     // disallow client deletes from unrecognized system collections.
885     request.setGod();
886 
887     return writeConflictRetry(opCtx, "StorageInterfaceImpl::deleteByFilter", nss.ns(), [&] {
888         // ParsedDelete needs to be inside the write conflict retry loop because it may create a
889         // CanonicalQuery whose ownership will be transferred to the plan executor in
890         // getExecutorDelete().
891         ParsedDelete parsedDelete(opCtx, &request);
892         auto parsedDeleteStatus = parsedDelete.parseRequest();
893         if (!parsedDeleteStatus.isOK()) {
894             return parsedDeleteStatus;
895         }
896 
897         AutoGetCollection autoColl(opCtx, nss, MODE_IX);
898         auto collectionResult = getCollection(
899             autoColl,
900             nss,
901             str::stream() << "Unable to delete documents in " << nss.ns() << " using filter "
902                           << filter);
903         if (!collectionResult.isOK()) {
904             return collectionResult.getStatus();
905         }
906         auto collection = collectionResult.getValue();
907 
908         auto planExecutorResult =
909             mongo::getExecutorDelete(opCtx, nullptr, collection, &parsedDelete);
910         if (!planExecutorResult.isOK()) {
911             return planExecutorResult.getStatus();
912         }
913         auto planExecutor = std::move(planExecutorResult.getValue());
914 
915         return planExecutor->executePlan();
916     });
917 }
918 
getCollectionSize(OperationContext * opCtx,const NamespaceString & nss)919 StatusWith<StorageInterface::CollectionSize> StorageInterfaceImpl::getCollectionSize(
920     OperationContext* opCtx, const NamespaceString& nss) {
921     AutoGetCollectionForRead autoColl(opCtx, nss);
922 
923     auto collectionResult =
924         getCollection(autoColl, nss, "Unable to get total size of documents in collection.");
925     if (!collectionResult.isOK()) {
926         return collectionResult.getStatus();
927     }
928     auto collection = collectionResult.getValue();
929 
930     return collection->dataSize(opCtx);
931 }
932 
getCollectionCount(OperationContext * opCtx,const NamespaceString & nss)933 StatusWith<StorageInterface::CollectionCount> StorageInterfaceImpl::getCollectionCount(
934     OperationContext* opCtx, const NamespaceString& nss) {
935     AutoGetCollectionForRead autoColl(opCtx, nss);
936 
937     auto collectionResult =
938         getCollection(autoColl, nss, "Unable to get number of documents in collection.");
939     if (!collectionResult.isOK()) {
940         return collectionResult.getStatus();
941     }
942     auto collection = collectionResult.getValue();
943 
944     return collection->numRecords(opCtx);
945 }
946 
getCollectionUUID(OperationContext * opCtx,const NamespaceString & nss)947 StatusWith<OptionalCollectionUUID> StorageInterfaceImpl::getCollectionUUID(
948     OperationContext* opCtx, const NamespaceString& nss) {
949     AutoGetCollectionForRead autoColl(opCtx, nss);
950 
951     auto collectionResult = getCollection(
952         autoColl, nss, str::stream() << "Unable to get UUID of " << nss.ns() << " collection.");
953     if (!collectionResult.isOK()) {
954         return collectionResult.getStatus();
955     }
956     auto collection = collectionResult.getValue();
957     return collection->uuid();
958 }
959 
upgradeUUIDSchemaVersionNonReplicated(OperationContext * opCtx)960 Status StorageInterfaceImpl::upgradeUUIDSchemaVersionNonReplicated(OperationContext* opCtx) {
961     return updateUUIDSchemaVersionNonReplicated(opCtx, true);
962 }
963 
setStableTimestamp(ServiceContext * serviceCtx,Timestamp snapshotName)964 void StorageInterfaceImpl::setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) {
965     serviceCtx->getGlobalStorageEngine()->setStableTimestamp(snapshotName);
966 }
967 
setInitialDataTimestamp(ServiceContext * serviceCtx,Timestamp snapshotName)968 void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx,
969                                                    Timestamp snapshotName) {
970     serviceCtx->getGlobalStorageEngine()->setInitialDataTimestamp(snapshotName);
971 }
972 
recoverToStableTimestamp(ServiceContext * serviceCtx)973 Status StorageInterfaceImpl::recoverToStableTimestamp(ServiceContext* serviceCtx) {
974     return serviceCtx->getGlobalStorageEngine()->recoverToStableTimestamp();
975 }
976 
isAdminDbValid(OperationContext * opCtx)977 Status StorageInterfaceImpl::isAdminDbValid(OperationContext* opCtx) {
978     AutoGetDb autoDB(opCtx, "admin", MODE_X);
979     auto adminDb = autoDB.getDb();
980     if (!adminDb) {
981         return Status::OK();
982     }
983 
984     Collection* const usersCollection =
985         adminDb->getCollection(opCtx, AuthorizationManager::usersCollectionNamespace);
986     const bool hasUsers =
987         usersCollection && !Helpers::findOne(opCtx, usersCollection, BSONObj(), false).isNull();
988     Collection* const adminVersionCollection =
989         adminDb->getCollection(opCtx, AuthorizationManager::versionCollectionNamespace);
990     BSONObj authSchemaVersionDocument;
991     if (!adminVersionCollection ||
992         !Helpers::findOne(opCtx,
993                           adminVersionCollection,
994                           AuthorizationManager::versionDocumentQuery,
995                           authSchemaVersionDocument)) {
996         if (!hasUsers) {
997             // It's OK to have no auth version document if there are no user documents.
998             return Status::OK();
999         }
1000         std::string msg = str::stream()
1001             << "During initial sync, found documents in "
1002             << AuthorizationManager::usersCollectionNamespace.ns()
1003             << " but could not find an auth schema version document in "
1004             << AuthorizationManager::versionCollectionNamespace.ns() << ".  "
1005             << "This indicates that the primary of this replica set was not successfully "
1006                "upgraded to schema version "
1007             << AuthorizationManager::schemaVersion26Final
1008             << ", which is the minimum supported schema version in this version of MongoDB";
1009         return {ErrorCodes::AuthSchemaIncompatible, msg};
1010     }
1011     long long foundSchemaVersion;
1012     Status status = bsonExtractIntegerField(authSchemaVersionDocument,
1013                                             AuthorizationManager::schemaVersionFieldName,
1014                                             &foundSchemaVersion);
1015     if (!status.isOK()) {
1016         std::string msg = str::stream()
1017             << "During initial sync, found malformed auth schema version document: "
1018             << status.toString() << "; document: " << authSchemaVersionDocument;
1019         return {ErrorCodes::AuthSchemaIncompatible, msg};
1020     }
1021     if ((foundSchemaVersion != AuthorizationManager::schemaVersion26Final) &&
1022         (foundSchemaVersion != AuthorizationManager::schemaVersion28SCRAM)) {
1023         std::string msg = str::stream()
1024             << "During initial sync, found auth schema version " << foundSchemaVersion
1025             << ", but this version of MongoDB only supports schema versions "
1026             << AuthorizationManager::schemaVersion26Final << " and "
1027             << AuthorizationManager::schemaVersion28SCRAM;
1028         return {ErrorCodes::AuthSchemaIncompatible, msg};
1029     }
1030 
1031     return Status::OK();
1032 }
1033 
waitForAllEarlierOplogWritesToBeVisible(OperationContext * opCtx)1034 void StorageInterfaceImpl::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) {
1035     AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS);
1036     oplog.getCollection()->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
1037 }
1038 
supportsDocLocking(ServiceContext * serviceCtx) const1039 bool StorageInterfaceImpl::supportsDocLocking(ServiceContext* serviceCtx) const {
1040     return serviceCtx->getGlobalStorageEngine()->supportsDocLocking();
1041 }
1042 
getAllCommittedTimestamp(ServiceContext * serviceCtx) const1043 Timestamp StorageInterfaceImpl::getAllCommittedTimestamp(ServiceContext* serviceCtx) const {
1044     return serviceCtx->getGlobalStorageEngine()->getAllCommittedTimestamp();
1045 }
1046 
1047 }  // namespace repl
1048 }  // namespace mongo
1049