1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #include "mongo/platform/basic.h" 32 33 #include "mongo/base/checked_cast.h" 34 #include "mongo/db/auth/authorization_session.h" 35 #include "mongo/db/catalog/collection.h" 36 #include "mongo/db/catalog/database.h" 37 #include "mongo/db/commands.h" 38 #include "mongo/db/curop.h" 39 #include "mongo/db/db_raii.h" 40 #include "mongo/db/exec/multi_iterator.h" 41 #include "mongo/db/namespace_string.h" 42 #include "mongo/db/query/cursor_response.h" 43 #include "mongo/db/service_context.h" 44 #include "mongo/stdx/memory.h" 45 46 namespace mongo { 47 48 using std::string; 49 using std::unique_ptr; 50 using stdx::make_unique; 51 52 namespace { 53 54 class ParallelCollectionScanCmd : public BasicCommand { 55 public: 56 struct ExtentInfo { ExtentInfomongo::__anon1ac9827b0111::ParallelCollectionScanCmd::ExtentInfo57 ExtentInfo(RecordId dl, size_t s) : diskLoc(dl), size(s) {} 58 RecordId diskLoc; 59 size_t size; 60 }; 61 ParallelCollectionScanCmd()62 ParallelCollectionScanCmd() : BasicCommand("parallelCollectionScan") {} 63 supportsWriteConcern(const BSONObj & cmd) const64 virtual bool supportsWriteConcern(const BSONObj& cmd) const override { 65 return false; 66 } 67 slaveOk() const68 virtual bool slaveOk() const { 69 return true; 70 } 71 maintenanceOk() const72 virtual bool maintenanceOk() const override { 73 return false; 74 } 75 supportsNonLocalReadConcern(const std::string & dbName,const BSONObj & cmdObj) const76 bool supportsNonLocalReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final { 77 return true; 78 } 79 getReadWriteType() const80 ReadWriteType getReadWriteType() const { 81 return ReadWriteType::kCommand; 82 } 83 checkAuthForOperation(OperationContext * opCtx,const std::string & dbname,const BSONObj & cmdObj)84 Status checkAuthForOperation(OperationContext* opCtx, 85 const std::string& dbname, 86 const BSONObj& cmdObj) override { 87 AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient()); 88 89 if (!authSession->isAuthorizedToParseNamespaceElement(cmdObj.firstElement())) { 90 return Status(ErrorCodes::Unauthorized, "Unauthorized"); 91 } 92 93 const NamespaceString ns(parseNsOrUUID(opCtx, dbname, cmdObj)); 94 if (!authSession->isAuthorizedForActionsOnNamespace(ns, ActionType::find)) { 95 return Status(ErrorCodes::Unauthorized, "Unauthorized"); 96 } 97 98 return Status::OK(); 99 } 100 run(OperationContext * opCtx,const string & dbname,const BSONObj & cmdObj,BSONObjBuilder & result)101 virtual bool run(OperationContext* opCtx, 102 const string& dbname, 103 const BSONObj& cmdObj, 104 BSONObjBuilder& result) { 105 Lock::DBLock dbSLock(opCtx, dbname, MODE_IS); 106 const NamespaceString ns(parseNsOrUUID(opCtx, dbname, cmdObj)); 107 108 AutoGetCollectionForReadCommand ctx(opCtx, ns, std::move(dbSLock)); 109 110 Collection* collection = ctx.getCollection(); 111 if (!collection) 112 return appendCommandStatus(result, 113 Status(ErrorCodes::NamespaceNotFound, 114 str::stream() << "ns does not exist: " << ns.ns())); 115 116 size_t numCursors = static_cast<size_t>(cmdObj["numCursors"].numberInt()); 117 118 if (numCursors == 0 || numCursors > 10000) 119 return appendCommandStatus(result, 120 Status(ErrorCodes::BadValue, 121 str::stream() 122 << "numCursors has to be between 1 and 10000" 123 << " was: " 124 << numCursors)); 125 126 std::vector<std::unique_ptr<RecordCursor>> iterators; 127 // Opening multiple cursors on a capped collection and reading them in parallel can produce 128 // behavior that is not well defined. This can be removed when support for parallel 129 // collection scan on capped collections is officially added. The 'getCursor' function 130 // ensures that the cursor returned iterates the capped collection in proper document 131 // insertion order. 132 if (collection->isCapped()) { 133 iterators.push_back(collection->getCursor(opCtx)); 134 numCursors = 1; 135 } else { 136 iterators = collection->getManyCursors(opCtx); 137 if (iterators.size() < numCursors) { 138 numCursors = iterators.size(); 139 } 140 } 141 142 std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs; 143 for (size_t i = 0; i < numCursors; i++) { 144 unique_ptr<WorkingSet> ws = make_unique<WorkingSet>(); 145 unique_ptr<MultiIteratorStage> mis = 146 make_unique<MultiIteratorStage>(opCtx, ws.get(), collection); 147 148 // Takes ownership of 'ws' and 'mis'. 149 auto statusWithPlanExecutor = PlanExecutor::make( 150 opCtx, std::move(ws), std::move(mis), collection, PlanExecutor::YIELD_AUTO); 151 invariant(statusWithPlanExecutor.isOK()); 152 execs.push_back(std::move(statusWithPlanExecutor.getValue())); 153 } 154 155 // transfer iterators to executors using a round-robin distribution. 156 // TODO consider using a common work queue once invalidation issues go away. 157 for (size_t i = 0; i < iterators.size(); i++) { 158 auto& planExec = execs[i % execs.size()]; 159 MultiIteratorStage* mis = checked_cast<MultiIteratorStage*>(planExec->getRootStage()); 160 mis->addIterator(std::move(iterators[i])); 161 } 162 163 { 164 BSONArrayBuilder bucketsBuilder; 165 for (auto&& exec : execs) { 166 // Need to save state while yielding locks between now and getMore(). 167 exec->saveState(); 168 exec->detachFromOperationContext(); 169 170 // Create and register a new ClientCursor. 171 auto pinnedCursor = collection->getCursorManager()->registerCursor( 172 opCtx, 173 {std::move(exec), 174 ns, 175 AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), 176 opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), 177 cmdObj}); 178 pinnedCursor.getCursor()->setLeftoverMaxTimeMicros( 179 opCtx->getRemainingMaxTimeMicros()); 180 181 BSONObjBuilder threadResult; 182 appendCursorResponseObject( 183 pinnedCursor.getCursor()->cursorid(), ns.ns(), BSONArray(), &threadResult); 184 threadResult.appendBool("ok", 1); 185 186 bucketsBuilder.append(threadResult.obj()); 187 } 188 result.appendArray("cursors", bucketsBuilder.obj()); 189 } 190 191 return true; 192 } 193 } parallelCollectionScanCmd; 194 195 } // namespace 196 } // namespace mongo 197