1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/db/pipeline/aggregation_request.h"
34 
35 #include <algorithm>
36 
37 #include "mongo/base/error_codes.h"
38 #include "mongo/base/status_with.h"
39 #include "mongo/base/string_data.h"
40 #include "mongo/db/catalog/document_validation.h"
41 #include "mongo/db/commands.h"
42 #include "mongo/db/pipeline/document.h"
43 #include "mongo/db/pipeline/value.h"
44 #include "mongo/db/query/cursor_request.h"
45 #include "mongo/db/query/query_request.h"
46 #include "mongo/db/repl/read_concern_args.h"
47 #include "mongo/db/storage/storage_options.h"
48 #include "mongo/db/write_concern_options.h"
49 
50 namespace mongo {
51 
52 constexpr StringData AggregationRequest::kCommandName;
53 constexpr StringData AggregationRequest::kCursorName;
54 constexpr StringData AggregationRequest::kBatchSizeName;
55 constexpr StringData AggregationRequest::kFromMongosName;
56 constexpr StringData AggregationRequest::kNeedsMergeName;
57 constexpr StringData AggregationRequest::kNeedsMerge34Name;
58 constexpr StringData AggregationRequest::kPipelineName;
59 constexpr StringData AggregationRequest::kCollationName;
60 constexpr StringData AggregationRequest::kExplainName;
61 constexpr StringData AggregationRequest::kAllowDiskUseName;
62 constexpr StringData AggregationRequest::kHintName;
63 constexpr StringData AggregationRequest::kCommentName;
64 
65 constexpr long long AggregationRequest::kDefaultBatchSize;
66 
AggregationRequest(NamespaceString nss,std::vector<BSONObj> pipeline)67 AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline)
68     : _nss(std::move(nss)), _pipeline(std::move(pipeline)), _batchSize(kDefaultBatchSize) {}
69 
parsePipelineFromBSON(BSONElement pipelineElem)70 StatusWith<std::vector<BSONObj>> AggregationRequest::parsePipelineFromBSON(
71     BSONElement pipelineElem) {
72     std::vector<BSONObj> pipeline;
73     if (pipelineElem.eoo() || pipelineElem.type() != BSONType::Array) {
74         return {ErrorCodes::TypeMismatch, "'pipeline' option must be specified as an array"};
75     }
76 
77     for (auto elem : pipelineElem.Obj()) {
78         if (elem.type() != BSONType::Object) {
79             return {ErrorCodes::TypeMismatch,
80                     "Each element of the 'pipeline' array must be an object"};
81         }
82         pipeline.push_back(elem.embeddedObject().getOwned());
83     }
84 
85     return std::move(pipeline);
86 }
87 
parseFromBSON(const std::string & dbName,const BSONObj & cmdObj,boost::optional<ExplainOptions::Verbosity> explainVerbosity)88 StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
89     const std::string& dbName,
90     const BSONObj& cmdObj,
91     boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
92     return parseFromBSON(parseNs(dbName, cmdObj), cmdObj, explainVerbosity);
93 }
94 
parseFromBSON(NamespaceString nss,const BSONObj & cmdObj,boost::optional<ExplainOptions::Verbosity> explainVerbosity)95 StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
96     NamespaceString nss,
97     const BSONObj& cmdObj,
98     boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
99     // Parse required parameters.
100     auto pipelineElem = cmdObj[kPipelineName];
101     auto pipeline = AggregationRequest::parsePipelineFromBSON(pipelineElem);
102     if (!pipeline.isOK()) {
103         return pipeline.getStatus();
104     }
105 
106     AggregationRequest request(std::move(nss), std::move(pipeline.getValue()));
107 
108     const std::initializer_list<StringData> optionsParsedElseWhere = {kPipelineName, kCommandName};
109 
110     bool hasCursorElem = false;
111     bool hasExplainElem = false;
112 
113     bool hasFromMongosElem = false;
114     bool hasNeedsMergeElem = false;
115     bool hasNeedsMerge34Elem = false;
116 
117     // Parse optional parameters.
118     for (auto&& elem : cmdObj) {
119         auto fieldName = elem.fieldNameStringData();
120 
121         if (QueryRequest::kUnwrappedReadPrefField == fieldName) {
122             // We expect this field to be validated elsewhere.
123             request.setUnwrappedReadPref(elem.embeddedObject());
124         } else if (std::find(optionsParsedElseWhere.begin(),
125                              optionsParsedElseWhere.end(),
126                              fieldName) != optionsParsedElseWhere.end()) {
127             // Ignore options that are parsed elsewhere.
128         } else if (kCursorName == fieldName) {
129             long long batchSize;
130             auto status =
131                 CursorRequest::parseCommandCursorOptions(cmdObj, kDefaultBatchSize, &batchSize);
132             if (!status.isOK()) {
133                 return status;
134             }
135 
136             hasCursorElem = true;
137             request.setBatchSize(batchSize);
138         } else if (kCollationName == fieldName) {
139             if (elem.type() != BSONType::Object) {
140                 return {ErrorCodes::TypeMismatch,
141                         str::stream() << kCollationName << " must be an object, not a "
142                                       << typeName(elem.type())};
143             }
144             request.setCollation(elem.embeddedObject().getOwned());
145         } else if (QueryRequest::cmdOptionMaxTimeMS == fieldName) {
146             auto maxTimeMs = QueryRequest::parseMaxTimeMS(elem);
147             if (!maxTimeMs.isOK()) {
148                 return maxTimeMs.getStatus();
149             }
150 
151             request.setMaxTimeMS(maxTimeMs.getValue());
152         } else if (repl::ReadConcernArgs::kReadConcernFieldName == fieldName) {
153             if (elem.type() != BSONType::Object) {
154                 return {ErrorCodes::TypeMismatch,
155                         str::stream() << repl::ReadConcernArgs::kReadConcernFieldName
156                                       << " must be an object, not a "
157                                       << typeName(elem.type())};
158             }
159             request.setReadConcern(elem.embeddedObject().getOwned());
160         } else if (kHintName == fieldName) {
161             if (BSONType::Object == elem.type()) {
162                 request.setHint(elem.embeddedObject());
163             } else if (BSONType::String == elem.type()) {
164                 request.setHint(BSON("$hint" << elem.valueStringData()));
165             } else {
166                 return Status(ErrorCodes::FailedToParse,
167                               str::stream()
168                                   << kHintName
169                                   << " must be specified as a string representing an index"
170                                   << " name, or an object representing an index's key pattern");
171             }
172         } else if (kCommentName == fieldName) {
173             if (elem.type() != BSONType::String) {
174                 return {ErrorCodes::TypeMismatch,
175                         str::stream() << kCommentName << " must be a string, not a "
176                                       << typeName(elem.type())};
177             }
178             request.setComment(elem.str());
179         } else if (kExplainName == fieldName) {
180             if (elem.type() != BSONType::Bool) {
181                 return {ErrorCodes::TypeMismatch,
182                         str::stream() << kExplainName << " must be a boolean, not a "
183                                       << typeName(elem.type())};
184             }
185 
186             hasExplainElem = true;
187             if (elem.Bool()) {
188                 request.setExplain(ExplainOptions::Verbosity::kQueryPlanner);
189             }
190         } else if (kFromMongosName == fieldName) {
191             if (elem.type() != BSONType::Bool) {
192                 return {ErrorCodes::TypeMismatch,
193                         str::stream() << kFromMongosName << " must be a boolean, not a "
194                                       << typeName(elem.type())};
195             }
196 
197             hasFromMongosElem = true;
198             request.setFromMongos(elem.Bool());
199         } else if (kNeedsMergeName == fieldName) {
200             if (elem.type() != BSONType::Bool) {
201                 return {ErrorCodes::TypeMismatch,
202                         str::stream() << kNeedsMergeName << " must be a boolean, not a "
203                                       << typeName(elem.type())};
204             }
205 
206             hasNeedsMergeElem = true;
207             request.setNeedsMerge(elem.Bool());
208         } else if (kNeedsMerge34Name == fieldName) {
209             if (elem.type() != BSONType::Bool) {
210                 return {ErrorCodes::TypeMismatch,
211                         str::stream() << kNeedsMerge34Name << " must be a boolean, not a "
212                                       << typeName(elem.type())};
213             }
214 
215             hasNeedsMerge34Elem = true;
216             request.setNeedsMerge(elem.Bool());
217             request.setFromMongos(elem.Bool());
218             request.setFrom34Mongos(elem.Bool());
219         } else if (kAllowDiskUseName == fieldName) {
220             if (storageGlobalParams.readOnly) {
221                 return {ErrorCodes::IllegalOperation,
222                         str::stream() << "The '" << kAllowDiskUseName
223                                       << "' option is not permitted in read-only mode."};
224             } else if (elem.type() != BSONType::Bool) {
225                 return {ErrorCodes::TypeMismatch,
226                         str::stream() << kAllowDiskUseName << " must be a boolean, not a "
227                                       << typeName(elem.type())};
228             }
229             request.setAllowDiskUse(elem.Bool());
230         } else if (bypassDocumentValidationCommandOption() == fieldName) {
231             request.setBypassDocumentValidation(elem.trueValue());
232         } else if (!Command::isGenericArgument(fieldName)) {
233             return {ErrorCodes::FailedToParse,
234                     str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
235         }
236     }
237 
238     if (explainVerbosity) {
239         if (hasExplainElem) {
240             return {
241                 ErrorCodes::FailedToParse,
242                 str::stream() << "The '" << kExplainName
243                               << "' option is illegal when a explain verbosity is also provided"};
244         }
245 
246         request.setExplain(explainVerbosity);
247     }
248 
249     // 'hasExplainElem' implies an aggregate command-level explain option, which does not require
250     // a cursor argument.
251     if (!hasCursorElem && !hasExplainElem) {
252         return {ErrorCodes::FailedToParse,
253                 str::stream()
254                     << "The '"
255                     << kCursorName
256                     << "' option is required, except for aggregate with the explain argument"};
257     }
258 
259     if (request.getExplain() && cmdObj[WriteConcernOptions::kWriteConcernField]) {
260         return {ErrorCodes::FailedToParse,
261                 str::stream() << "Aggregation explain does not support the'"
262                               << WriteConcernOptions::kWriteConcernField
263                               << "' option"};
264     }
265 
266     if (hasNeedsMergeElem && !hasFromMongosElem) {
267         return {ErrorCodes::FailedToParse,
268                 str::stream() << "Cannot specify '" << kNeedsMergeName << "' without '"
269                               << kFromMongosName
270                               << "'"};
271     }
272 
273     // If 'fromRouter' is specified, the request is from a 3.4 mongos, so we do not expect
274     // 'fromMongos' or 'needsMerge' to be specified.
275     if (hasNeedsMerge34Elem) {
276         if (hasNeedsMergeElem) {
277             return {ErrorCodes::FailedToParse,
278                     str::stream() << "Cannot specify both '" << kNeedsMergeName << "' and '"
279                                   << kNeedsMerge34Name
280                                   << "'"};
281         }
282         if (hasFromMongosElem) {
283             return {ErrorCodes::FailedToParse,
284                     str::stream() << "Cannot specify both '" << kFromMongosName << "' and '"
285                                   << kNeedsMerge34Name
286                                   << "'"};
287         }
288     }
289 
290     return request;
291 }
292 
parseNs(const std::string & dbname,const BSONObj & cmdObj)293 NamespaceString AggregationRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) {
294     auto firstElement = cmdObj.firstElement();
295 
296     if (firstElement.isNumber()) {
297         uassert(ErrorCodes::FailedToParse,
298                 str::stream() << "Invalid command format: the '"
299                               << firstElement.fieldNameStringData()
300                               << "' field must specify a collection name or 1",
301                 firstElement.number() == 1);
302         return NamespaceString::makeCollectionlessAggregateNSS(dbname);
303     } else {
304         uassert(ErrorCodes::TypeMismatch,
305                 str::stream() << "collection name has invalid type: "
306                               << typeName(firstElement.type()),
307                 firstElement.type() == BSONType::String);
308 
309         const NamespaceString nss(dbname, firstElement.valueStringData());
310 
311         uassert(ErrorCodes::InvalidNamespace,
312                 str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
313                 nss.isValid() && !nss.isCollectionlessAggregateNS());
314 
315         return nss;
316     }
317 }
318 
serializeToCommandObj() const319 Document AggregationRequest::serializeToCommandObj() const {
320     MutableDocument serialized;
321     return Document{
322         {kCommandName, (_nss.isCollectionlessAggregateNS() ? Value(1) : Value(_nss.coll()))},
323         {kPipelineName, _pipeline},
324         // Only serialize booleans if different than their default.
325         {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()},
326         {kFromMongosName, _fromMongos ? Value(true) : Value()},
327         {kNeedsMergeName, _needsMerge ? Value(true) : Value()},
328         {bypassDocumentValidationCommandOption(),
329          _bypassDocumentValidation ? Value(true) : Value()},
330         // Only serialize a collation if one was specified.
331         {kCollationName, _collation.isEmpty() ? Value() : Value(_collation)},
332         // Only serialize batchSize if not an explain, otherwise serialize an empty cursor object.
333         {kCursorName,
334          _explainMode ? Value(Document()) : Value(Document{{kBatchSizeName, _batchSize}})},
335         // Only serialize a hint if one was specified.
336         {kHintName, _hint.isEmpty() ? Value() : Value(_hint)},
337         // Only serialize a comment if one was specified.
338         {kCommentName, _comment.empty() ? Value() : Value(_comment)},
339         // Only serialize readConcern if specified.
340         {repl::ReadConcernArgs::kReadConcernFieldName,
341          _readConcern.isEmpty() ? Value() : Value(_readConcern)},
342         // Only serialize the unwrapped read preference if specified.
343         {QueryRequest::kUnwrappedReadPrefField,
344          _unwrappedReadPref.isEmpty() ? Value() : Value(_unwrappedReadPref)},
345         // Only serialize maxTimeMs if specified.
346         {QueryRequest::cmdOptionMaxTimeMS,
347          _maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))}};
348 }
349 
350 }  // namespace mongo
351