1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/base/checked_cast.h"
34 #include "mongo/db/auth/authorization_session.h"
35 #include "mongo/db/catalog/collection.h"
36 #include "mongo/db/catalog/database.h"
37 #include "mongo/db/commands.h"
38 #include "mongo/db/curop.h"
39 #include "mongo/db/db_raii.h"
40 #include "mongo/db/exec/multi_iterator.h"
41 #include "mongo/db/namespace_string.h"
42 #include "mongo/db/query/cursor_response.h"
43 #include "mongo/db/service_context.h"
44 #include "mongo/stdx/memory.h"
45 
46 namespace mongo {
47 
48 using std::string;
49 using std::unique_ptr;
50 using stdx::make_unique;
51 
52 namespace {
53 
54 class ParallelCollectionScanCmd : public BasicCommand {
55 public:
56     struct ExtentInfo {
ExtentInfomongo::__anon1ac9827b0111::ParallelCollectionScanCmd::ExtentInfo57         ExtentInfo(RecordId dl, size_t s) : diskLoc(dl), size(s) {}
58         RecordId diskLoc;
59         size_t size;
60     };
61 
ParallelCollectionScanCmd()62     ParallelCollectionScanCmd() : BasicCommand("parallelCollectionScan") {}
63 
supportsWriteConcern(const BSONObj & cmd) const64     virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
65         return false;
66     }
67 
slaveOk() const68     virtual bool slaveOk() const {
69         return true;
70     }
71 
maintenanceOk() const72     virtual bool maintenanceOk() const override {
73         return false;
74     }
75 
supportsNonLocalReadConcern(const std::string & dbName,const BSONObj & cmdObj) const76     bool supportsNonLocalReadConcern(const std::string& dbName, const BSONObj& cmdObj) const final {
77         return true;
78     }
79 
getReadWriteType() const80     ReadWriteType getReadWriteType() const {
81         return ReadWriteType::kCommand;
82     }
83 
checkAuthForOperation(OperationContext * opCtx,const std::string & dbname,const BSONObj & cmdObj)84     Status checkAuthForOperation(OperationContext* opCtx,
85                                  const std::string& dbname,
86                                  const BSONObj& cmdObj) override {
87         AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient());
88 
89         if (!authSession->isAuthorizedToParseNamespaceElement(cmdObj.firstElement())) {
90             return Status(ErrorCodes::Unauthorized, "Unauthorized");
91         }
92 
93         const NamespaceString ns(parseNsOrUUID(opCtx, dbname, cmdObj));
94         if (!authSession->isAuthorizedForActionsOnNamespace(ns, ActionType::find)) {
95             return Status(ErrorCodes::Unauthorized, "Unauthorized");
96         }
97 
98         return Status::OK();
99     }
100 
run(OperationContext * opCtx,const string & dbname,const BSONObj & cmdObj,BSONObjBuilder & result)101     virtual bool run(OperationContext* opCtx,
102                      const string& dbname,
103                      const BSONObj& cmdObj,
104                      BSONObjBuilder& result) {
105         Lock::DBLock dbSLock(opCtx, dbname, MODE_IS);
106         const NamespaceString ns(parseNsOrUUID(opCtx, dbname, cmdObj));
107 
108         AutoGetCollectionForReadCommand ctx(opCtx, ns, std::move(dbSLock));
109 
110         Collection* collection = ctx.getCollection();
111         if (!collection)
112             return appendCommandStatus(result,
113                                        Status(ErrorCodes::NamespaceNotFound,
114                                               str::stream() << "ns does not exist: " << ns.ns()));
115 
116         size_t numCursors = static_cast<size_t>(cmdObj["numCursors"].numberInt());
117 
118         if (numCursors == 0 || numCursors > 10000)
119             return appendCommandStatus(result,
120                                        Status(ErrorCodes::BadValue,
121                                               str::stream()
122                                                   << "numCursors has to be between 1 and 10000"
123                                                   << " was: "
124                                                   << numCursors));
125 
126         std::vector<std::unique_ptr<RecordCursor>> iterators;
127         // Opening multiple cursors on a capped collection and reading them in parallel can produce
128         // behavior that is not well defined. This can be removed when support for parallel
129         // collection scan on capped collections is officially added. The 'getCursor' function
130         // ensures that the cursor returned iterates the capped collection in proper document
131         // insertion order.
132         if (collection->isCapped()) {
133             iterators.push_back(collection->getCursor(opCtx));
134             numCursors = 1;
135         } else {
136             iterators = collection->getManyCursors(opCtx);
137             if (iterators.size() < numCursors) {
138                 numCursors = iterators.size();
139             }
140         }
141 
142         std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs;
143         for (size_t i = 0; i < numCursors; i++) {
144             unique_ptr<WorkingSet> ws = make_unique<WorkingSet>();
145             unique_ptr<MultiIteratorStage> mis =
146                 make_unique<MultiIteratorStage>(opCtx, ws.get(), collection);
147 
148             // Takes ownership of 'ws' and 'mis'.
149             auto statusWithPlanExecutor = PlanExecutor::make(
150                 opCtx, std::move(ws), std::move(mis), collection, PlanExecutor::YIELD_AUTO);
151             invariant(statusWithPlanExecutor.isOK());
152             execs.push_back(std::move(statusWithPlanExecutor.getValue()));
153         }
154 
155         // transfer iterators to executors using a round-robin distribution.
156         // TODO consider using a common work queue once invalidation issues go away.
157         for (size_t i = 0; i < iterators.size(); i++) {
158             auto& planExec = execs[i % execs.size()];
159             MultiIteratorStage* mis = checked_cast<MultiIteratorStage*>(planExec->getRootStage());
160             mis->addIterator(std::move(iterators[i]));
161         }
162 
163         {
164             BSONArrayBuilder bucketsBuilder;
165             for (auto&& exec : execs) {
166                 // Need to save state while yielding locks between now and getMore().
167                 exec->saveState();
168                 exec->detachFromOperationContext();
169 
170                 // Create and register a new ClientCursor.
171                 auto pinnedCursor = collection->getCursorManager()->registerCursor(
172                     opCtx,
173                     {std::move(exec),
174                      ns,
175                      AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
176                      opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
177                      cmdObj});
178                 pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(
179                     opCtx->getRemainingMaxTimeMicros());
180 
181                 BSONObjBuilder threadResult;
182                 appendCursorResponseObject(
183                     pinnedCursor.getCursor()->cursorid(), ns.ns(), BSONArray(), &threadResult);
184                 threadResult.appendBool("ok", 1);
185 
186                 bucketsBuilder.append(threadResult.obj());
187             }
188             result.appendArray("cursors", bucketsBuilder.obj());
189         }
190 
191         return true;
192     }
193 } parallelCollectionScanCmd;
194 
195 }  // namespace
196 }  // namespace mongo
197