1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/db/pipeline/aggregation_request.h"
34
35 #include <algorithm>
36
37 #include "mongo/base/error_codes.h"
38 #include "mongo/base/status_with.h"
39 #include "mongo/base/string_data.h"
40 #include "mongo/db/catalog/document_validation.h"
41 #include "mongo/db/commands.h"
42 #include "mongo/db/pipeline/document.h"
43 #include "mongo/db/pipeline/value.h"
44 #include "mongo/db/query/cursor_request.h"
45 #include "mongo/db/query/query_request.h"
46 #include "mongo/db/repl/read_concern_args.h"
47 #include "mongo/db/storage/storage_options.h"
48 #include "mongo/db/write_concern_options.h"
49
50 namespace mongo {
51
52 constexpr StringData AggregationRequest::kCommandName;
53 constexpr StringData AggregationRequest::kCursorName;
54 constexpr StringData AggregationRequest::kBatchSizeName;
55 constexpr StringData AggregationRequest::kFromMongosName;
56 constexpr StringData AggregationRequest::kNeedsMergeName;
57 constexpr StringData AggregationRequest::kNeedsMerge34Name;
58 constexpr StringData AggregationRequest::kPipelineName;
59 constexpr StringData AggregationRequest::kCollationName;
60 constexpr StringData AggregationRequest::kExplainName;
61 constexpr StringData AggregationRequest::kAllowDiskUseName;
62 constexpr StringData AggregationRequest::kHintName;
63 constexpr StringData AggregationRequest::kCommentName;
64
65 constexpr long long AggregationRequest::kDefaultBatchSize;
66
AggregationRequest(NamespaceString nss,std::vector<BSONObj> pipeline)67 AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline)
68 : _nss(std::move(nss)), _pipeline(std::move(pipeline)), _batchSize(kDefaultBatchSize) {}
69
parsePipelineFromBSON(BSONElement pipelineElem)70 StatusWith<std::vector<BSONObj>> AggregationRequest::parsePipelineFromBSON(
71 BSONElement pipelineElem) {
72 std::vector<BSONObj> pipeline;
73 if (pipelineElem.eoo() || pipelineElem.type() != BSONType::Array) {
74 return {ErrorCodes::TypeMismatch, "'pipeline' option must be specified as an array"};
75 }
76
77 for (auto elem : pipelineElem.Obj()) {
78 if (elem.type() != BSONType::Object) {
79 return {ErrorCodes::TypeMismatch,
80 "Each element of the 'pipeline' array must be an object"};
81 }
82 pipeline.push_back(elem.embeddedObject().getOwned());
83 }
84
85 return std::move(pipeline);
86 }
87
parseFromBSON(const std::string & dbName,const BSONObj & cmdObj,boost::optional<ExplainOptions::Verbosity> explainVerbosity)88 StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
89 const std::string& dbName,
90 const BSONObj& cmdObj,
91 boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
92 return parseFromBSON(parseNs(dbName, cmdObj), cmdObj, explainVerbosity);
93 }
94
parseFromBSON(NamespaceString nss,const BSONObj & cmdObj,boost::optional<ExplainOptions::Verbosity> explainVerbosity)95 StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
96 NamespaceString nss,
97 const BSONObj& cmdObj,
98 boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
99 // Parse required parameters.
100 auto pipelineElem = cmdObj[kPipelineName];
101 auto pipeline = AggregationRequest::parsePipelineFromBSON(pipelineElem);
102 if (!pipeline.isOK()) {
103 return pipeline.getStatus();
104 }
105
106 AggregationRequest request(std::move(nss), std::move(pipeline.getValue()));
107
108 const std::initializer_list<StringData> optionsParsedElseWhere = {kPipelineName, kCommandName};
109
110 bool hasCursorElem = false;
111 bool hasExplainElem = false;
112
113 bool hasFromMongosElem = false;
114 bool hasNeedsMergeElem = false;
115 bool hasNeedsMerge34Elem = false;
116
117 // Parse optional parameters.
118 for (auto&& elem : cmdObj) {
119 auto fieldName = elem.fieldNameStringData();
120
121 if (QueryRequest::kUnwrappedReadPrefField == fieldName) {
122 // We expect this field to be validated elsewhere.
123 request.setUnwrappedReadPref(elem.embeddedObject());
124 } else if (std::find(optionsParsedElseWhere.begin(),
125 optionsParsedElseWhere.end(),
126 fieldName) != optionsParsedElseWhere.end()) {
127 // Ignore options that are parsed elsewhere.
128 } else if (kCursorName == fieldName) {
129 long long batchSize;
130 auto status =
131 CursorRequest::parseCommandCursorOptions(cmdObj, kDefaultBatchSize, &batchSize);
132 if (!status.isOK()) {
133 return status;
134 }
135
136 hasCursorElem = true;
137 request.setBatchSize(batchSize);
138 } else if (kCollationName == fieldName) {
139 if (elem.type() != BSONType::Object) {
140 return {ErrorCodes::TypeMismatch,
141 str::stream() << kCollationName << " must be an object, not a "
142 << typeName(elem.type())};
143 }
144 request.setCollation(elem.embeddedObject().getOwned());
145 } else if (QueryRequest::cmdOptionMaxTimeMS == fieldName) {
146 auto maxTimeMs = QueryRequest::parseMaxTimeMS(elem);
147 if (!maxTimeMs.isOK()) {
148 return maxTimeMs.getStatus();
149 }
150
151 request.setMaxTimeMS(maxTimeMs.getValue());
152 } else if (repl::ReadConcernArgs::kReadConcernFieldName == fieldName) {
153 if (elem.type() != BSONType::Object) {
154 return {ErrorCodes::TypeMismatch,
155 str::stream() << repl::ReadConcernArgs::kReadConcernFieldName
156 << " must be an object, not a "
157 << typeName(elem.type())};
158 }
159 request.setReadConcern(elem.embeddedObject().getOwned());
160 } else if (kHintName == fieldName) {
161 if (BSONType::Object == elem.type()) {
162 request.setHint(elem.embeddedObject());
163 } else if (BSONType::String == elem.type()) {
164 request.setHint(BSON("$hint" << elem.valueStringData()));
165 } else {
166 return Status(ErrorCodes::FailedToParse,
167 str::stream()
168 << kHintName
169 << " must be specified as a string representing an index"
170 << " name, or an object representing an index's key pattern");
171 }
172 } else if (kCommentName == fieldName) {
173 if (elem.type() != BSONType::String) {
174 return {ErrorCodes::TypeMismatch,
175 str::stream() << kCommentName << " must be a string, not a "
176 << typeName(elem.type())};
177 }
178 request.setComment(elem.str());
179 } else if (kExplainName == fieldName) {
180 if (elem.type() != BSONType::Bool) {
181 return {ErrorCodes::TypeMismatch,
182 str::stream() << kExplainName << " must be a boolean, not a "
183 << typeName(elem.type())};
184 }
185
186 hasExplainElem = true;
187 if (elem.Bool()) {
188 request.setExplain(ExplainOptions::Verbosity::kQueryPlanner);
189 }
190 } else if (kFromMongosName == fieldName) {
191 if (elem.type() != BSONType::Bool) {
192 return {ErrorCodes::TypeMismatch,
193 str::stream() << kFromMongosName << " must be a boolean, not a "
194 << typeName(elem.type())};
195 }
196
197 hasFromMongosElem = true;
198 request.setFromMongos(elem.Bool());
199 } else if (kNeedsMergeName == fieldName) {
200 if (elem.type() != BSONType::Bool) {
201 return {ErrorCodes::TypeMismatch,
202 str::stream() << kNeedsMergeName << " must be a boolean, not a "
203 << typeName(elem.type())};
204 }
205
206 hasNeedsMergeElem = true;
207 request.setNeedsMerge(elem.Bool());
208 } else if (kNeedsMerge34Name == fieldName) {
209 if (elem.type() != BSONType::Bool) {
210 return {ErrorCodes::TypeMismatch,
211 str::stream() << kNeedsMerge34Name << " must be a boolean, not a "
212 << typeName(elem.type())};
213 }
214
215 hasNeedsMerge34Elem = true;
216 request.setNeedsMerge(elem.Bool());
217 request.setFromMongos(elem.Bool());
218 request.setFrom34Mongos(elem.Bool());
219 } else if (kAllowDiskUseName == fieldName) {
220 if (storageGlobalParams.readOnly) {
221 return {ErrorCodes::IllegalOperation,
222 str::stream() << "The '" << kAllowDiskUseName
223 << "' option is not permitted in read-only mode."};
224 } else if (elem.type() != BSONType::Bool) {
225 return {ErrorCodes::TypeMismatch,
226 str::stream() << kAllowDiskUseName << " must be a boolean, not a "
227 << typeName(elem.type())};
228 }
229 request.setAllowDiskUse(elem.Bool());
230 } else if (bypassDocumentValidationCommandOption() == fieldName) {
231 request.setBypassDocumentValidation(elem.trueValue());
232 } else if (!Command::isGenericArgument(fieldName)) {
233 return {ErrorCodes::FailedToParse,
234 str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
235 }
236 }
237
238 if (explainVerbosity) {
239 if (hasExplainElem) {
240 return {
241 ErrorCodes::FailedToParse,
242 str::stream() << "The '" << kExplainName
243 << "' option is illegal when a explain verbosity is also provided"};
244 }
245
246 request.setExplain(explainVerbosity);
247 }
248
249 // 'hasExplainElem' implies an aggregate command-level explain option, which does not require
250 // a cursor argument.
251 if (!hasCursorElem && !hasExplainElem) {
252 return {ErrorCodes::FailedToParse,
253 str::stream()
254 << "The '"
255 << kCursorName
256 << "' option is required, except for aggregate with the explain argument"};
257 }
258
259 if (request.getExplain() && cmdObj[WriteConcernOptions::kWriteConcernField]) {
260 return {ErrorCodes::FailedToParse,
261 str::stream() << "Aggregation explain does not support the'"
262 << WriteConcernOptions::kWriteConcernField
263 << "' option"};
264 }
265
266 if (hasNeedsMergeElem && !hasFromMongosElem) {
267 return {ErrorCodes::FailedToParse,
268 str::stream() << "Cannot specify '" << kNeedsMergeName << "' without '"
269 << kFromMongosName
270 << "'"};
271 }
272
273 // If 'fromRouter' is specified, the request is from a 3.4 mongos, so we do not expect
274 // 'fromMongos' or 'needsMerge' to be specified.
275 if (hasNeedsMerge34Elem) {
276 if (hasNeedsMergeElem) {
277 return {ErrorCodes::FailedToParse,
278 str::stream() << "Cannot specify both '" << kNeedsMergeName << "' and '"
279 << kNeedsMerge34Name
280 << "'"};
281 }
282 if (hasFromMongosElem) {
283 return {ErrorCodes::FailedToParse,
284 str::stream() << "Cannot specify both '" << kFromMongosName << "' and '"
285 << kNeedsMerge34Name
286 << "'"};
287 }
288 }
289
290 return request;
291 }
292
parseNs(const std::string & dbname,const BSONObj & cmdObj)293 NamespaceString AggregationRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) {
294 auto firstElement = cmdObj.firstElement();
295
296 if (firstElement.isNumber()) {
297 uassert(ErrorCodes::FailedToParse,
298 str::stream() << "Invalid command format: the '"
299 << firstElement.fieldNameStringData()
300 << "' field must specify a collection name or 1",
301 firstElement.number() == 1);
302 return NamespaceString::makeCollectionlessAggregateNSS(dbname);
303 } else {
304 uassert(ErrorCodes::TypeMismatch,
305 str::stream() << "collection name has invalid type: "
306 << typeName(firstElement.type()),
307 firstElement.type() == BSONType::String);
308
309 const NamespaceString nss(dbname, firstElement.valueStringData());
310
311 uassert(ErrorCodes::InvalidNamespace,
312 str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
313 nss.isValid() && !nss.isCollectionlessAggregateNS());
314
315 return nss;
316 }
317 }
318
serializeToCommandObj() const319 Document AggregationRequest::serializeToCommandObj() const {
320 MutableDocument serialized;
321 return Document{
322 {kCommandName, (_nss.isCollectionlessAggregateNS() ? Value(1) : Value(_nss.coll()))},
323 {kPipelineName, _pipeline},
324 // Only serialize booleans if different than their default.
325 {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()},
326 {kFromMongosName, _fromMongos ? Value(true) : Value()},
327 {kNeedsMergeName, _needsMerge ? Value(true) : Value()},
328 {bypassDocumentValidationCommandOption(),
329 _bypassDocumentValidation ? Value(true) : Value()},
330 // Only serialize a collation if one was specified.
331 {kCollationName, _collation.isEmpty() ? Value() : Value(_collation)},
332 // Only serialize batchSize if not an explain, otherwise serialize an empty cursor object.
333 {kCursorName,
334 _explainMode ? Value(Document()) : Value(Document{{kBatchSizeName, _batchSize}})},
335 // Only serialize a hint if one was specified.
336 {kHintName, _hint.isEmpty() ? Value() : Value(_hint)},
337 // Only serialize a comment if one was specified.
338 {kCommentName, _comment.empty() ? Value() : Value(_comment)},
339 // Only serialize readConcern if specified.
340 {repl::ReadConcernArgs::kReadConcernFieldName,
341 _readConcern.isEmpty() ? Value() : Value(_readConcern)},
342 // Only serialize the unwrapped read preference if specified.
343 {QueryRequest::kUnwrappedReadPrefField,
344 _unwrappedReadPref.isEmpty() ? Value() : Value(_unwrappedReadPref)},
345 // Only serialize maxTimeMs if specified.
346 {QueryRequest::cmdOptionMaxTimeMS,
347 _maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))}};
348 }
349
350 } // namespace mongo
351