1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/repl/storage_interface_impl.h"
36
37 #include <algorithm>
38 #include <boost/optional.hpp>
39 #include <utility>
40
41 #include "mongo/base/status.h"
42 #include "mongo/base/status_with.h"
43 #include "mongo/base/string_data.h"
44 #include "mongo/bson/bsonobj.h"
45 #include "mongo/bson/bsonobjbuilder.h"
46 #include "mongo/bson/util/bson_extract.h"
47 #include "mongo/db/auth/authorization_manager.h"
48 #include "mongo/db/catalog/coll_mod.h"
49 #include "mongo/db/catalog/collection.h"
50 #include "mongo/db/catalog/collection_catalog_entry.h"
51 #include "mongo/db/catalog/database.h"
52 #include "mongo/db/catalog/document_validation.h"
53 #include "mongo/db/catalog/index_catalog.h"
54 #include "mongo/db/catalog/index_create.h"
55 #include "mongo/db/catalog/uuid_catalog.h"
56 #include "mongo/db/client.h"
57 #include "mongo/db/concurrency/d_concurrency.h"
58 #include "mongo/db/concurrency/write_conflict_exception.h"
59 #include "mongo/db/curop.h"
60 #include "mongo/db/db_raii.h"
61 #include "mongo/db/dbhelpers.h"
62 #include "mongo/db/exec/delete.h"
63 #include "mongo/db/exec/update.h"
64 #include "mongo/db/jsobj.h"
65 #include "mongo/db/keypattern.h"
66 #include "mongo/db/operation_context.h"
67 #include "mongo/db/ops/delete_request.h"
68 #include "mongo/db/ops/parsed_update.h"
69 #include "mongo/db/ops/update_request.h"
70 #include "mongo/db/query/get_executor.h"
71 #include "mongo/db/query/internal_plans.h"
72 #include "mongo/db/repl/collection_bulk_loader_impl.h"
73 #include "mongo/db/repl/oplog.h"
74 #include "mongo/db/repl/replication_coordinator.h"
75 #include "mongo/db/repl/replication_coordinator_global.h"
76 #include "mongo/db/repl/rollback_gen.h"
77 #include "mongo/db/service_context.h"
78 #include "mongo/util/assert_util.h"
79 #include "mongo/util/log.h"
80 #include "mongo/util/mongoutils/str.h"
81
82 namespace mongo {
83 namespace repl {
84
85 const char StorageInterfaceImpl::kDefaultRollbackIdNamespace[] = "local.system.rollback.id";
86 const char StorageInterfaceImpl::kRollbackIdFieldName[] = "rollbackId";
87 const char StorageInterfaceImpl::kRollbackIdDocumentId[] = "rollbackId";
88
89 namespace {
90 using UniqueLock = stdx::unique_lock<stdx::mutex>;
91
92 const auto kIdIndexName = "_id_"_sd;
93
94 } // namespace
95
StorageInterfaceImpl()96 StorageInterfaceImpl::StorageInterfaceImpl()
97 : _rollbackIdNss(StorageInterfaceImpl::kDefaultRollbackIdNamespace) {}
98
getRollbackID(OperationContext * opCtx)99 StatusWith<int> StorageInterfaceImpl::getRollbackID(OperationContext* opCtx) {
100 BSONObjBuilder bob;
101 bob.append("_id", kRollbackIdDocumentId);
102 auto id = bob.obj();
103
104 try {
105 auto rbidDoc = findById(opCtx, _rollbackIdNss, id["_id"]);
106 if (!rbidDoc.isOK()) {
107 return rbidDoc.getStatus();
108 }
109
110 auto rbid = RollbackID::parse(IDLParserErrorContext("RollbackID"), rbidDoc.getValue());
111 invariant(rbid.get_id() == kRollbackIdDocumentId);
112 return rbid.getRollbackId();
113 } catch (...) {
114 return exceptionToStatus();
115 }
116
117 MONGO_UNREACHABLE;
118 }
119
initializeRollbackID(OperationContext * opCtx)120 StatusWith<int> StorageInterfaceImpl::initializeRollbackID(OperationContext* opCtx) {
121 auto status = createCollection(opCtx, _rollbackIdNss, CollectionOptions());
122 if (!status.isOK()) {
123 return status;
124 }
125
126 RollbackID rbid;
127 int initRBID = 1;
128 rbid.set_id(kRollbackIdDocumentId);
129 rbid.setRollbackId(initRBID);
130
131 BSONObjBuilder bob;
132 rbid.serialize(&bob);
133 Timestamp noTimestamp; // This write is not replicated.
134 status = insertDocument(opCtx,
135 _rollbackIdNss,
136 TimestampedBSONObj{bob.done(), noTimestamp},
137 OpTime::kUninitializedTerm);
138 if (status.isOK()) {
139 return initRBID;
140 } else {
141 return status;
142 }
143 }
144
incrementRollbackID(OperationContext * opCtx)145 StatusWith<int> StorageInterfaceImpl::incrementRollbackID(OperationContext* opCtx) {
146 // This is safe because this is only called during rollback, and you can not have two
147 // rollbacks at once.
148 auto rbidSW = getRollbackID(opCtx);
149 if (!rbidSW.isOK()) {
150 return rbidSW;
151 }
152
153 // If we would go over the integer limit, reset the Rollback ID to 1.
154 BSONObjBuilder updateBob;
155 int newRBID = -1;
156 if (rbidSW.getValue() == std::numeric_limits<int>::max()) {
157 newRBID = 1;
158 BSONObjBuilder setBob(updateBob.subobjStart("$set"));
159 setBob.append(kRollbackIdFieldName, newRBID);
160 } else {
161 BSONObjBuilder incBob(updateBob.subobjStart("$inc"));
162 incBob.append(kRollbackIdFieldName, 1);
163 newRBID = rbidSW.getValue() + 1;
164 }
165
166 // Since the Rollback ID is in a singleton collection, we can fix the _id field.
167 BSONObjBuilder bob;
168 bob.append("_id", kRollbackIdDocumentId);
169 auto id = bob.obj();
170 Status status = upsertById(opCtx, _rollbackIdNss, id["_id"], updateBob.obj());
171
172 // We wait until durable so that we are sure the Rollback ID is updated before rollback ends.
173 if (status.isOK()) {
174 opCtx->recoveryUnit()->waitUntilDurable();
175 return newRBID;
176 }
177 return status;
178 }
179
180 StatusWith<std::unique_ptr<CollectionBulkLoader>>
createCollectionForBulkLoading(const NamespaceString & nss,const CollectionOptions & options,const BSONObj idIndexSpec,const std::vector<BSONObj> & secondaryIndexSpecs)181 StorageInterfaceImpl::createCollectionForBulkLoading(
182 const NamespaceString& nss,
183 const CollectionOptions& options,
184 const BSONObj idIndexSpec,
185 const std::vector<BSONObj>& secondaryIndexSpecs) {
186
187 LOG(2) << "StorageInterfaceImpl::createCollectionForBulkLoading called for ns: " << nss.ns();
188
189 class StashClient {
190 public:
191 StashClient() {
192 if (Client::getCurrent()) {
193 _stashedClient = Client::releaseCurrent();
194 }
195 }
196 ~StashClient() {
197 if (Client::getCurrent()) {
198 Client::releaseCurrent();
199 }
200 if (_stashedClient) {
201 Client::setCurrent(std::move(_stashedClient));
202 }
203 }
204
205 private:
206 ServiceContext::UniqueClient _stashedClient;
207 } stash;
208 Client::setCurrent(
209 getGlobalServiceContext()->makeClient(str::stream() << nss.ns() << " loader"));
210 auto opCtx = cc().makeOperationContext();
211
212 documentValidationDisabled(opCtx.get()) = true;
213
214 std::unique_ptr<AutoGetCollection> autoColl;
215 // Retry if WCE.
216 Status status = writeConflictRetry(opCtx.get(), "beginCollectionClone", nss.ns(), [&] {
217 UnreplicatedWritesBlock uwb(opCtx.get());
218
219 // Get locks and create the collection.
220 AutoGetOrCreateDb db(opCtx.get(), nss.db(), MODE_X);
221 AutoGetCollection coll(opCtx.get(), nss, MODE_IX);
222
223 if (coll.getCollection()) {
224 return Status(ErrorCodes::NamespaceExists,
225 str::stream() << "Collection " << nss.ns() << " already exists.");
226 }
227 {
228 // Create the collection.
229 WriteUnitOfWork wunit(opCtx.get());
230 fassert(40332, db.getDb()->createCollection(opCtx.get(), nss.ns(), options, false));
231 wunit.commit();
232 }
233
234 autoColl = stdx::make_unique<AutoGetCollection>(opCtx.get(), nss, MODE_IX);
235
236 // Build empty capped indexes. Capped indexes cannot be built by the MultiIndexBlock
237 // because the cap might delete documents off the back while we are inserting them into
238 // the front.
239 if (options.capped) {
240 WriteUnitOfWork wunit(opCtx.get());
241 if (!idIndexSpec.isEmpty()) {
242 auto status =
243 autoColl->getCollection()->getIndexCatalog()->createIndexOnEmptyCollection(
244 opCtx.get(), idIndexSpec);
245 if (!status.getStatus().isOK()) {
246 return status.getStatus();
247 }
248 }
249 for (auto&& spec : secondaryIndexSpecs) {
250 auto status =
251 autoColl->getCollection()->getIndexCatalog()->createIndexOnEmptyCollection(
252 opCtx.get(), spec);
253 if (!status.getStatus().isOK()) {
254 return status.getStatus();
255 }
256 }
257 wunit.commit();
258 }
259
260 return Status::OK();
261 });
262
263 if (!status.isOK()) {
264 return status;
265 }
266
267 // Move locks into loader, so it now controls their lifetime.
268 auto loader =
269 stdx::make_unique<CollectionBulkLoaderImpl>(Client::releaseCurrent(),
270 std::move(opCtx),
271 std::move(autoColl),
272 options.capped ? BSONObj() : idIndexSpec);
273
274 status = loader->init(options.capped ? std::vector<BSONObj>() : secondaryIndexSpecs);
275 if (!status.isOK()) {
276 return status;
277 }
278 return {std::move(loader)};
279 }
280
insertDocument(OperationContext * opCtx,const NamespaceString & nss,const TimestampedBSONObj & doc,long long term)281 Status StorageInterfaceImpl::insertDocument(OperationContext* opCtx,
282 const NamespaceString& nss,
283 const TimestampedBSONObj& doc,
284 long long term) {
285 return insertDocuments(opCtx, nss, {InsertStatement(doc.obj, doc.timestamp, term)});
286 }
287
288 namespace {
289
290 /**
291 * Returns Collection* from database RAII object.
292 * Returns NamespaceNotFound if the database or collection does not exist.
293 */
294 template <typename AutoGetCollectionType>
getCollection(const AutoGetCollectionType & autoGetCollection,const NamespaceString & nss,const std::string & message)295 StatusWith<Collection*> getCollection(const AutoGetCollectionType& autoGetCollection,
296 const NamespaceString& nss,
297 const std::string& message) {
298 if (!autoGetCollection.getDb()) {
299 return {ErrorCodes::NamespaceNotFound,
300 str::stream() << "Database [" << nss.db() << "] not found. " << message};
301 }
302
303 auto collection = autoGetCollection.getCollection();
304 if (!collection) {
305 return {ErrorCodes::NamespaceNotFound,
306 str::stream() << "Collection [" << nss.ns() << "] not found. " << message};
307 }
308
309 return collection;
310 }
311
insertDocumentsSingleBatch(OperationContext * opCtx,const NamespaceString & nss,std::vector<InsertStatement>::const_iterator begin,std::vector<InsertStatement>::const_iterator end)312 Status insertDocumentsSingleBatch(OperationContext* opCtx,
313 const NamespaceString& nss,
314 std::vector<InsertStatement>::const_iterator begin,
315 std::vector<InsertStatement>::const_iterator end) {
316 AutoGetCollection autoColl(opCtx, nss, MODE_IX);
317
318 auto collectionResult =
319 getCollection(autoColl, nss, "The collection must exist before inserting documents.");
320 if (!collectionResult.isOK()) {
321 return collectionResult.getStatus();
322 }
323 auto collection = collectionResult.getValue();
324
325 WriteUnitOfWork wunit(opCtx);
326 OpDebug* const nullOpDebug = nullptr;
327 auto status = collection->insertDocuments(opCtx, begin, end, nullOpDebug, false);
328 if (!status.isOK()) {
329 return status;
330 }
331 wunit.commit();
332
333 return Status::OK();
334 }
335
336 } // namespace
337
insertDocuments(OperationContext * opCtx,const NamespaceString & nss,const std::vector<InsertStatement> & docs)338 Status StorageInterfaceImpl::insertDocuments(OperationContext* opCtx,
339 const NamespaceString& nss,
340 const std::vector<InsertStatement>& docs) {
341 if (docs.size() > 1U) {
342 try {
343 if (insertDocumentsSingleBatch(opCtx, nss, docs.cbegin(), docs.cend()).isOK()) {
344 return Status::OK();
345 }
346 } catch (...) {
347 // Ignore this failure and behave as-if we never tried to do the combined batch insert.
348 // The loop below will handle reporting any non-transient errors.
349 }
350 }
351
352 // Try to insert the batch one-at-a-time because the batch failed all-at-once inserting.
353 for (auto it = docs.cbegin(); it != docs.cend(); ++it) {
354 auto status =
355 writeConflictRetry(opCtx, "StorageInterfaceImpl::insertDocuments", nss.ns(), [&] {
356 auto status = insertDocumentsSingleBatch(opCtx, nss, it, it + 1);
357 if (!status.isOK()) {
358 return status;
359 }
360
361 return Status::OK();
362 });
363
364 if (!status.isOK()) {
365 return status;
366 }
367 }
368
369 return Status::OK();
370 }
371
dropReplicatedDatabases(OperationContext * opCtx)372 Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* opCtx) {
373 dropAllDatabasesExceptLocal(opCtx);
374 return Status::OK();
375 }
376
createOplog(OperationContext * opCtx,const NamespaceString & nss)377 Status StorageInterfaceImpl::createOplog(OperationContext* opCtx, const NamespaceString& nss) {
378 mongo::repl::createOplog(opCtx, nss.ns(), true);
379 return Status::OK();
380 }
381
getOplogMaxSize(OperationContext * opCtx,const NamespaceString & nss)382 StatusWith<size_t> StorageInterfaceImpl::getOplogMaxSize(OperationContext* opCtx,
383 const NamespaceString& nss) {
384 AutoGetCollectionForReadCommand autoColl(opCtx, nss);
385 auto collectionResult = getCollection(autoColl, nss, "Your oplog doesn't exist.");
386 if (!collectionResult.isOK()) {
387 return collectionResult.getStatus();
388 }
389 auto collection = collectionResult.getValue();
390
391 const auto options = collection->getCatalogEntry()->getCollectionOptions(opCtx);
392 if (!options.capped)
393 return {ErrorCodes::BadValue, str::stream() << nss.ns() << " isn't capped"};
394
395 return options.cappedSize;
396 }
397
createCollection(OperationContext * opCtx,const NamespaceString & nss,const CollectionOptions & options)398 Status StorageInterfaceImpl::createCollection(OperationContext* opCtx,
399 const NamespaceString& nss,
400 const CollectionOptions& options) {
401 return writeConflictRetry(opCtx, "StorageInterfaceImpl::createCollection", nss.ns(), [&] {
402 AutoGetOrCreateDb databaseWriteGuard(opCtx, nss.db(), MODE_X);
403 auto db = databaseWriteGuard.getDb();
404 invariant(db);
405 if (db->getCollection(opCtx, nss)) {
406 return Status(ErrorCodes::NamespaceExists,
407 str::stream() << "Collection " << nss.ns() << " already exists.");
408 }
409 WriteUnitOfWork wuow(opCtx);
410 try {
411 auto coll = db->createCollection(opCtx, nss.ns(), options);
412 invariant(coll);
413 } catch (const AssertionException& ex) {
414 return ex.toStatus();
415 }
416 wuow.commit();
417
418 return Status::OK();
419 });
420 }
421
dropCollection(OperationContext * opCtx,const NamespaceString & nss)422 Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const NamespaceString& nss) {
423 return writeConflictRetry(opCtx, "StorageInterfaceImpl::dropCollection", nss.ns(), [&] {
424 AutoGetDb autoDB(opCtx, nss.db(), MODE_X);
425 if (!autoDB.getDb()) {
426 // Database does not exist - nothing to do.
427 return Status::OK();
428 }
429 WriteUnitOfWork wunit(opCtx);
430 const auto status = autoDB.getDb()->dropCollectionEvenIfSystem(opCtx, nss);
431 if (!status.isOK()) {
432 return status;
433 }
434 wunit.commit();
435 return Status::OK();
436 });
437 }
438
truncateCollection(OperationContext * opCtx,const NamespaceString & nss)439 Status StorageInterfaceImpl::truncateCollection(OperationContext* opCtx,
440 const NamespaceString& nss) {
441 return writeConflictRetry(opCtx, "StorageInterfaceImpl::truncateCollection", nss.ns(), [&] {
442 AutoGetCollection autoColl(opCtx, nss, MODE_X);
443 auto collectionResult =
444 getCollection(autoColl, nss, "The collection must exist before truncating.");
445 if (!collectionResult.isOK()) {
446 return collectionResult.getStatus();
447 }
448 auto collection = collectionResult.getValue();
449
450 WriteUnitOfWork wunit(opCtx);
451 const auto status = collection->truncate(opCtx);
452 if (!status.isOK()) {
453 return status;
454 }
455 wunit.commit();
456 return Status::OK();
457 });
458 }
459
renameCollection(OperationContext * opCtx,const NamespaceString & fromNS,const NamespaceString & toNS,bool stayTemp)460 Status StorageInterfaceImpl::renameCollection(OperationContext* opCtx,
461 const NamespaceString& fromNS,
462 const NamespaceString& toNS,
463 bool stayTemp) {
464 if (fromNS.db() != toNS.db()) {
465 return Status(ErrorCodes::InvalidNamespace,
466 str::stream() << "Cannot rename collection between databases. From NS: "
467 << fromNS.ns()
468 << "; to NS: "
469 << toNS.ns());
470 }
471
472 return writeConflictRetry(opCtx, "StorageInterfaceImpl::renameCollection", fromNS.ns(), [&] {
473 AutoGetDb autoDB(opCtx, fromNS.db(), MODE_X);
474 if (!autoDB.getDb()) {
475 return Status(ErrorCodes::NamespaceNotFound,
476 str::stream() << "Cannot rename collection from " << fromNS.ns() << " to "
477 << toNS.ns()
478 << ". Database "
479 << fromNS.db()
480 << " not found.");
481 }
482 WriteUnitOfWork wunit(opCtx);
483 const auto status =
484 autoDB.getDb()->renameCollection(opCtx, fromNS.ns(), toNS.ns(), stayTemp);
485 if (!status.isOK()) {
486 return status;
487 }
488
489 auto newColl = autoDB.getDb()->getCollection(opCtx, toNS);
490 if (newColl->uuid()) {
491 UUIDCatalog::get(opCtx).onRenameCollection(
492 opCtx, [newColl] { return newColl; }, newColl->uuid().get());
493 }
494 wunit.commit();
495 return status;
496 });
497 }
498
499 namespace {
500
501 /**
502 * Returns DeleteStageParams for deleteOne with fetch.
503 */
makeDeleteStageParamsForDeleteDocuments()504 DeleteStageParams makeDeleteStageParamsForDeleteDocuments() {
505 DeleteStageParams deleteStageParams;
506 deleteStageParams.isMulti = true;
507 deleteStageParams.returnDeleted = true;
508 return deleteStageParams;
509 }
510
511 /**
512 * Shared implementation between findDocuments, deleteDocuments, and _findOrDeleteById.
513 * _findOrDeleteById is used by findById, and deleteById.
514 */
515 enum class FindDeleteMode { kFind, kDelete };
_findOrDeleteDocuments(OperationContext * opCtx,const NamespaceString & nss,boost::optional<StringData> indexName,StorageInterface::ScanDirection scanDirection,const BSONObj & startKey,const BSONObj & endKey,BoundInclusion boundInclusion,std::size_t limit,FindDeleteMode mode)516 StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments(
517 OperationContext* opCtx,
518 const NamespaceString& nss,
519 boost::optional<StringData> indexName,
520 StorageInterface::ScanDirection scanDirection,
521 const BSONObj& startKey,
522 const BSONObj& endKey,
523 BoundInclusion boundInclusion,
524 std::size_t limit,
525 FindDeleteMode mode) {
526 auto isFind = mode == FindDeleteMode::kFind;
527 auto opStr = isFind ? "StorageInterfaceImpl::find" : "StorageInterfaceImpl::delete";
528
529
530 return writeConflictRetry(opCtx, opStr, nss.ns(), [&] {
531 // We need to explicitly use this in a few places to help the type inference. Use a
532 // shorthand.
533 using Result = StatusWith<std::vector<BSONObj>>;
534
535 auto collectionAccessMode = isFind ? MODE_IS : MODE_IX;
536 AutoGetCollection autoColl(opCtx, nss, collectionAccessMode);
537 auto collectionResult = getCollection(
538 autoColl, nss, str::stream() << "Unable to proceed with " << opStr << ".");
539 if (!collectionResult.isOK()) {
540 return Result(collectionResult.getStatus());
541 }
542 auto collection = collectionResult.getValue();
543
544 auto isForward = scanDirection == StorageInterface::ScanDirection::kForward;
545 auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD;
546
547 std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor;
548 if (!indexName) {
549 if (!startKey.isEmpty()) {
550 return Result(ErrorCodes::NoSuchKey,
551 "non-empty startKey not allowed for collection scan");
552 }
553 if (boundInclusion != BoundInclusion::kIncludeStartKeyOnly) {
554 return Result(ErrorCodes::InvalidOptions,
555 "bound inclusion must be BoundInclusion::kIncludeStartKeyOnly for "
556 "collection scan");
557 }
558 // Use collection scan.
559 planExecutor = isFind
560 ? InternalPlanner::collectionScan(
561 opCtx, nss.ns(), collection, PlanExecutor::NO_YIELD, direction)
562 : InternalPlanner::deleteWithCollectionScan(
563 opCtx,
564 collection,
565 makeDeleteStageParamsForDeleteDocuments(),
566 PlanExecutor::NO_YIELD,
567 direction);
568 } else {
569 // Use index scan.
570 auto indexCatalog = collection->getIndexCatalog();
571 invariant(indexCatalog);
572 bool includeUnfinishedIndexes = false;
573 IndexDescriptor* indexDescriptor =
574 indexCatalog->findIndexByName(opCtx, *indexName, includeUnfinishedIndexes);
575 if (!indexDescriptor) {
576 return Result(ErrorCodes::IndexNotFound,
577 str::stream() << "Index not found, ns:" << nss.ns() << ", index: "
578 << *indexName);
579 }
580 if (indexDescriptor->isPartial()) {
581 return Result(ErrorCodes::IndexOptionsConflict,
582 str::stream()
583 << "Partial index is not allowed for this operation, ns:"
584 << nss.ns()
585 << ", index: "
586 << *indexName);
587 }
588
589 KeyPattern keyPattern(indexDescriptor->keyPattern());
590 auto minKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, false));
591 auto maxKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, true));
592 auto bounds =
593 isForward ? std::make_pair(minKey, maxKey) : std::make_pair(maxKey, minKey);
594 if (!startKey.isEmpty()) {
595 bounds.first = startKey;
596 }
597 if (!endKey.isEmpty()) {
598 bounds.second = endKey;
599 }
600 planExecutor = isFind
601 ? InternalPlanner::indexScan(opCtx,
602 collection,
603 indexDescriptor,
604 bounds.first,
605 bounds.second,
606 boundInclusion,
607 PlanExecutor::NO_YIELD,
608 direction,
609 InternalPlanner::IXSCAN_FETCH)
610 : InternalPlanner::deleteWithIndexScan(opCtx,
611 collection,
612 makeDeleteStageParamsForDeleteDocuments(),
613 indexDescriptor,
614 bounds.first,
615 bounds.second,
616 boundInclusion,
617 PlanExecutor::NO_YIELD,
618 direction);
619 }
620
621 std::vector<BSONObj> docs;
622 while (docs.size() < limit) {
623 BSONObj doc;
624 auto state = planExecutor->getNext(&doc, nullptr);
625 if (PlanExecutor::ADVANCED == state) {
626 docs.push_back(doc.getOwned());
627 } else {
628 invariant(PlanExecutor::IS_EOF == state);
629 break;
630 }
631 }
632 return Result(docs);
633 });
634 }
635
_findOrDeleteById(OperationContext * opCtx,const NamespaceString & nss,const BSONElement & idKey,FindDeleteMode mode)636 StatusWith<BSONObj> _findOrDeleteById(OperationContext* opCtx,
637 const NamespaceString& nss,
638 const BSONElement& idKey,
639 FindDeleteMode mode) {
640 auto wrappedIdKey = idKey.wrap("");
641 auto result = _findOrDeleteDocuments(opCtx,
642 nss,
643 kIdIndexName,
644 StorageInterface::ScanDirection::kForward,
645 wrappedIdKey,
646 wrappedIdKey,
647 BoundInclusion::kIncludeBothStartAndEndKeys,
648 1U,
649 mode);
650 if (!result.isOK()) {
651 return result.getStatus();
652 }
653 const auto& docs = result.getValue();
654 if (docs.empty()) {
655 return {ErrorCodes::NoSuchKey, str::stream() << "No document found with _id: " << idKey};
656 }
657
658 return docs.front();
659 }
660
661 } // namespace
662
findDocuments(OperationContext * opCtx,const NamespaceString & nss,boost::optional<StringData> indexName,ScanDirection scanDirection,const BSONObj & startKey,BoundInclusion boundInclusion,std::size_t limit)663 StatusWith<std::vector<BSONObj>> StorageInterfaceImpl::findDocuments(
664 OperationContext* opCtx,
665 const NamespaceString& nss,
666 boost::optional<StringData> indexName,
667 ScanDirection scanDirection,
668 const BSONObj& startKey,
669 BoundInclusion boundInclusion,
670 std::size_t limit) {
671 return _findOrDeleteDocuments(opCtx,
672 nss,
673 indexName,
674 scanDirection,
675 startKey,
676 {},
677 boundInclusion,
678 limit,
679 FindDeleteMode::kFind);
680 }
681
deleteDocuments(OperationContext * opCtx,const NamespaceString & nss,boost::optional<StringData> indexName,ScanDirection scanDirection,const BSONObj & startKey,BoundInclusion boundInclusion,std::size_t limit)682 StatusWith<std::vector<BSONObj>> StorageInterfaceImpl::deleteDocuments(
683 OperationContext* opCtx,
684 const NamespaceString& nss,
685 boost::optional<StringData> indexName,
686 ScanDirection scanDirection,
687 const BSONObj& startKey,
688 BoundInclusion boundInclusion,
689 std::size_t limit) {
690 return _findOrDeleteDocuments(opCtx,
691 nss,
692 indexName,
693 scanDirection,
694 startKey,
695 {},
696 boundInclusion,
697 limit,
698 FindDeleteMode::kDelete);
699 }
700
findSingleton(OperationContext * opCtx,const NamespaceString & nss)701 StatusWith<BSONObj> StorageInterfaceImpl::findSingleton(OperationContext* opCtx,
702 const NamespaceString& nss) {
703 auto result = findDocuments(opCtx,
704 nss,
705 boost::none, // Collection scan.
706 StorageInterface::ScanDirection::kForward,
707 {}, // Start at the beginning of the collection.
708 BoundInclusion::kIncludeStartKeyOnly,
709 2U); // Ask for 2 documents to ensure it's a singleton.
710 if (!result.isOK()) {
711 return result.getStatus();
712 }
713
714 const auto& docs = result.getValue();
715 if (docs.empty()) {
716 return {ErrorCodes::CollectionIsEmpty,
717 str::stream() << "No document found in namespace: " << nss.ns()};
718 } else if (docs.size() != 1U) {
719 return {ErrorCodes::TooManyMatchingDocuments,
720 str::stream() << "More than singleton document found in namespace: " << nss.ns()};
721 }
722
723 return docs.front();
724 }
725
findById(OperationContext * opCtx,const NamespaceString & nss,const BSONElement & idKey)726 StatusWith<BSONObj> StorageInterfaceImpl::findById(OperationContext* opCtx,
727 const NamespaceString& nss,
728 const BSONElement& idKey) {
729 return _findOrDeleteById(opCtx, nss, idKey, FindDeleteMode::kFind);
730 }
731
deleteById(OperationContext * opCtx,const NamespaceString & nss,const BSONElement & idKey)732 StatusWith<BSONObj> StorageInterfaceImpl::deleteById(OperationContext* opCtx,
733 const NamespaceString& nss,
734 const BSONElement& idKey) {
735 return _findOrDeleteById(opCtx, nss, idKey, FindDeleteMode::kDelete);
736 }
737
738 namespace {
739
740 /**
741 * Checks _id key passed to upsertById and returns a query document for UpdateRequest.
742 */
makeUpsertQuery(const BSONElement & idKey)743 StatusWith<BSONObj> makeUpsertQuery(const BSONElement& idKey) {
744 auto query = BSON("_id" << idKey);
745
746 // With the ID hack, only simple _id queries are allowed. Otherwise, UpdateStage will fail with
747 // a fatal assertion.
748 if (!CanonicalQuery::isSimpleIdQuery(query)) {
749 return {ErrorCodes::InvalidIdField,
750 str::stream() << "Unable to update document with a non-simple _id query: "
751 << query};
752 }
753
754 return query;
755 }
756
_updateWithQuery(OperationContext * opCtx,const UpdateRequest & request)757 Status _updateWithQuery(OperationContext* opCtx, const UpdateRequest& request) {
758 invariant(!request.isMulti()); // We only want to update one document for performance.
759 invariant(!request.shouldReturnAnyDocs());
760 invariant(PlanExecutor::NO_YIELD == request.getYieldPolicy());
761
762 auto& nss = request.getNamespaceString();
763 return writeConflictRetry(opCtx, "_updateWithQuery", nss.ns(), [&] {
764 // ParsedUpdate needs to be inside the write conflict retry loop because it may create a
765 // CanonicalQuery whose ownership will be transferred to the plan executor in
766 // getExecutorUpdate().
767 ParsedUpdate parsedUpdate(opCtx, &request);
768 auto parsedUpdateStatus = parsedUpdate.parseRequest();
769 if (!parsedUpdateStatus.isOK()) {
770 return parsedUpdateStatus;
771 }
772
773 AutoGetCollection autoColl(opCtx, nss, MODE_IX);
774 auto collectionResult = getCollection(
775 autoColl,
776 nss,
777 str::stream() << "Unable to update documents in " << nss.ns() << " using query "
778 << request.getQuery());
779 if (!collectionResult.isOK()) {
780 return collectionResult.getStatus();
781 }
782 auto collection = collectionResult.getValue();
783
784 auto planExecutorResult =
785 mongo::getExecutorUpdate(opCtx, nullptr, collection, &parsedUpdate);
786 if (!planExecutorResult.isOK()) {
787 return planExecutorResult.getStatus();
788 }
789 auto planExecutor = std::move(planExecutorResult.getValue());
790
791 return planExecutor->executePlan();
792 });
793 }
794
795 } // namespace
796
upsertById(OperationContext * opCtx,const NamespaceString & nss,const BSONElement & idKey,const BSONObj & update)797 Status StorageInterfaceImpl::upsertById(OperationContext* opCtx,
798 const NamespaceString& nss,
799 const BSONElement& idKey,
800 const BSONObj& update) {
801 // Validate and construct an _id query for UpdateResult.
802 // The _id key will be passed directly to IDHackStage.
803 auto queryResult = makeUpsertQuery(idKey);
804 if (!queryResult.isOK()) {
805 return queryResult.getStatus();
806 }
807 auto query = queryResult.getValue();
808
809 UpdateRequest request(nss);
810 request.setQuery(query);
811 request.setUpdates(update);
812 request.setUpsert(true);
813 invariant(!request.isMulti()); // This follows from using an exact _id query.
814 invariant(!request.shouldReturnAnyDocs());
815 invariant(PlanExecutor::NO_YIELD == request.getYieldPolicy());
816
817 return writeConflictRetry(opCtx, "StorageInterfaceImpl::upsertById", nss.ns(), [&] {
818 // ParsedUpdate needs to be inside the write conflict retry loop because it contains
819 // the UpdateDriver whose state may be modified while we are applying the update.
820 ParsedUpdate parsedUpdate(opCtx, &request);
821 auto parsedUpdateStatus = parsedUpdate.parseRequest();
822 if (!parsedUpdateStatus.isOK()) {
823 return parsedUpdateStatus;
824 }
825
826 AutoGetCollection autoColl(opCtx, nss, MODE_IX);
827 auto collectionResult = getCollection(autoColl, nss, "Unable to update document.");
828 if (!collectionResult.isOK()) {
829 return collectionResult.getStatus();
830 }
831 auto collection = collectionResult.getValue();
832
833 // We're using the ID hack to perform the update so we have to disallow collections
834 // without an _id index.
835 auto descriptor = collection->getIndexCatalog()->findIdIndex(opCtx);
836 if (!descriptor) {
837 return Status(ErrorCodes::IndexNotFound,
838 "Unable to update document in a collection without an _id index.");
839 }
840
841 UpdateStageParams updateStageParams(
842 parsedUpdate.getRequest(), parsedUpdate.getDriver(), nullptr);
843 auto planExecutor = InternalPlanner::updateWithIdHack(opCtx,
844 collection,
845 updateStageParams,
846 descriptor,
847 idKey.wrap(""),
848 parsedUpdate.yieldPolicy());
849
850 return planExecutor->executePlan();
851 });
852 }
853
putSingleton(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & update)854 Status StorageInterfaceImpl::putSingleton(OperationContext* opCtx,
855 const NamespaceString& nss,
856 const BSONObj& update) {
857 UpdateRequest request(nss);
858 request.setQuery({});
859 request.setUpdates(update);
860 request.setUpsert(true);
861 return _updateWithQuery(opCtx, request);
862 }
863
updateSingleton(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & query,const BSONObj & update)864 Status StorageInterfaceImpl::updateSingleton(OperationContext* opCtx,
865 const NamespaceString& nss,
866 const BSONObj& query,
867 const BSONObj& update) {
868 UpdateRequest request(nss);
869 request.setQuery(query);
870 request.setUpdates(update);
871 invariant(!request.isUpsert());
872 return _updateWithQuery(opCtx, request);
873 }
874
deleteByFilter(OperationContext * opCtx,const NamespaceString & nss,const BSONObj & filter)875 Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx,
876 const NamespaceString& nss,
877 const BSONObj& filter) {
878 DeleteRequest request(nss);
879 request.setQuery(filter);
880 request.setMulti(true);
881 request.setYieldPolicy(PlanExecutor::NO_YIELD);
882
883 // This disables the isLegalClientSystemNS() check in getExecutorDelete() which is used to
884 // disallow client deletes from unrecognized system collections.
885 request.setGod();
886
887 return writeConflictRetry(opCtx, "StorageInterfaceImpl::deleteByFilter", nss.ns(), [&] {
888 // ParsedDelete needs to be inside the write conflict retry loop because it may create a
889 // CanonicalQuery whose ownership will be transferred to the plan executor in
890 // getExecutorDelete().
891 ParsedDelete parsedDelete(opCtx, &request);
892 auto parsedDeleteStatus = parsedDelete.parseRequest();
893 if (!parsedDeleteStatus.isOK()) {
894 return parsedDeleteStatus;
895 }
896
897 AutoGetCollection autoColl(opCtx, nss, MODE_IX);
898 auto collectionResult = getCollection(
899 autoColl,
900 nss,
901 str::stream() << "Unable to delete documents in " << nss.ns() << " using filter "
902 << filter);
903 if (!collectionResult.isOK()) {
904 return collectionResult.getStatus();
905 }
906 auto collection = collectionResult.getValue();
907
908 auto planExecutorResult =
909 mongo::getExecutorDelete(opCtx, nullptr, collection, &parsedDelete);
910 if (!planExecutorResult.isOK()) {
911 return planExecutorResult.getStatus();
912 }
913 auto planExecutor = std::move(planExecutorResult.getValue());
914
915 return planExecutor->executePlan();
916 });
917 }
918
getCollectionSize(OperationContext * opCtx,const NamespaceString & nss)919 StatusWith<StorageInterface::CollectionSize> StorageInterfaceImpl::getCollectionSize(
920 OperationContext* opCtx, const NamespaceString& nss) {
921 AutoGetCollectionForRead autoColl(opCtx, nss);
922
923 auto collectionResult =
924 getCollection(autoColl, nss, "Unable to get total size of documents in collection.");
925 if (!collectionResult.isOK()) {
926 return collectionResult.getStatus();
927 }
928 auto collection = collectionResult.getValue();
929
930 return collection->dataSize(opCtx);
931 }
932
getCollectionCount(OperationContext * opCtx,const NamespaceString & nss)933 StatusWith<StorageInterface::CollectionCount> StorageInterfaceImpl::getCollectionCount(
934 OperationContext* opCtx, const NamespaceString& nss) {
935 AutoGetCollectionForRead autoColl(opCtx, nss);
936
937 auto collectionResult =
938 getCollection(autoColl, nss, "Unable to get number of documents in collection.");
939 if (!collectionResult.isOK()) {
940 return collectionResult.getStatus();
941 }
942 auto collection = collectionResult.getValue();
943
944 return collection->numRecords(opCtx);
945 }
946
getCollectionUUID(OperationContext * opCtx,const NamespaceString & nss)947 StatusWith<OptionalCollectionUUID> StorageInterfaceImpl::getCollectionUUID(
948 OperationContext* opCtx, const NamespaceString& nss) {
949 AutoGetCollectionForRead autoColl(opCtx, nss);
950
951 auto collectionResult = getCollection(
952 autoColl, nss, str::stream() << "Unable to get UUID of " << nss.ns() << " collection.");
953 if (!collectionResult.isOK()) {
954 return collectionResult.getStatus();
955 }
956 auto collection = collectionResult.getValue();
957 return collection->uuid();
958 }
959
upgradeUUIDSchemaVersionNonReplicated(OperationContext * opCtx)960 Status StorageInterfaceImpl::upgradeUUIDSchemaVersionNonReplicated(OperationContext* opCtx) {
961 return updateUUIDSchemaVersionNonReplicated(opCtx, true);
962 }
963
setStableTimestamp(ServiceContext * serviceCtx,Timestamp snapshotName)964 void StorageInterfaceImpl::setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) {
965 serviceCtx->getGlobalStorageEngine()->setStableTimestamp(snapshotName);
966 }
967
setInitialDataTimestamp(ServiceContext * serviceCtx,Timestamp snapshotName)968 void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx,
969 Timestamp snapshotName) {
970 serviceCtx->getGlobalStorageEngine()->setInitialDataTimestamp(snapshotName);
971 }
972
recoverToStableTimestamp(ServiceContext * serviceCtx)973 Status StorageInterfaceImpl::recoverToStableTimestamp(ServiceContext* serviceCtx) {
974 return serviceCtx->getGlobalStorageEngine()->recoverToStableTimestamp();
975 }
976
isAdminDbValid(OperationContext * opCtx)977 Status StorageInterfaceImpl::isAdminDbValid(OperationContext* opCtx) {
978 AutoGetDb autoDB(opCtx, "admin", MODE_X);
979 auto adminDb = autoDB.getDb();
980 if (!adminDb) {
981 return Status::OK();
982 }
983
984 Collection* const usersCollection =
985 adminDb->getCollection(opCtx, AuthorizationManager::usersCollectionNamespace);
986 const bool hasUsers =
987 usersCollection && !Helpers::findOne(opCtx, usersCollection, BSONObj(), false).isNull();
988 Collection* const adminVersionCollection =
989 adminDb->getCollection(opCtx, AuthorizationManager::versionCollectionNamespace);
990 BSONObj authSchemaVersionDocument;
991 if (!adminVersionCollection ||
992 !Helpers::findOne(opCtx,
993 adminVersionCollection,
994 AuthorizationManager::versionDocumentQuery,
995 authSchemaVersionDocument)) {
996 if (!hasUsers) {
997 // It's OK to have no auth version document if there are no user documents.
998 return Status::OK();
999 }
1000 std::string msg = str::stream()
1001 << "During initial sync, found documents in "
1002 << AuthorizationManager::usersCollectionNamespace.ns()
1003 << " but could not find an auth schema version document in "
1004 << AuthorizationManager::versionCollectionNamespace.ns() << ". "
1005 << "This indicates that the primary of this replica set was not successfully "
1006 "upgraded to schema version "
1007 << AuthorizationManager::schemaVersion26Final
1008 << ", which is the minimum supported schema version in this version of MongoDB";
1009 return {ErrorCodes::AuthSchemaIncompatible, msg};
1010 }
1011 long long foundSchemaVersion;
1012 Status status = bsonExtractIntegerField(authSchemaVersionDocument,
1013 AuthorizationManager::schemaVersionFieldName,
1014 &foundSchemaVersion);
1015 if (!status.isOK()) {
1016 std::string msg = str::stream()
1017 << "During initial sync, found malformed auth schema version document: "
1018 << status.toString() << "; document: " << authSchemaVersionDocument;
1019 return {ErrorCodes::AuthSchemaIncompatible, msg};
1020 }
1021 if ((foundSchemaVersion != AuthorizationManager::schemaVersion26Final) &&
1022 (foundSchemaVersion != AuthorizationManager::schemaVersion28SCRAM)) {
1023 std::string msg = str::stream()
1024 << "During initial sync, found auth schema version " << foundSchemaVersion
1025 << ", but this version of MongoDB only supports schema versions "
1026 << AuthorizationManager::schemaVersion26Final << " and "
1027 << AuthorizationManager::schemaVersion28SCRAM;
1028 return {ErrorCodes::AuthSchemaIncompatible, msg};
1029 }
1030
1031 return Status::OK();
1032 }
1033
waitForAllEarlierOplogWritesToBeVisible(OperationContext * opCtx)1034 void StorageInterfaceImpl::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) {
1035 AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS);
1036 oplog.getCollection()->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
1037 }
1038
supportsDocLocking(ServiceContext * serviceCtx) const1039 bool StorageInterfaceImpl::supportsDocLocking(ServiceContext* serviceCtx) const {
1040 return serviceCtx->getGlobalStorageEngine()->supportsDocLocking();
1041 }
1042
getAllCommittedTimestamp(ServiceContext * serviceCtx) const1043 Timestamp StorageInterfaceImpl::getAllCommittedTimestamp(ServiceContext* serviceCtx) const {
1044 return serviceCtx->getGlobalStorageEngine()->getAllCommittedTimestamp();
1045 }
1046
1047 } // namespace repl
1048 } // namespace mongo
1049