1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/db/pipeline/document_source_facet.h"
34
35 #include <memory>
36 #include <vector>
37
38 #include "mongo/base/string_data.h"
39 #include "mongo/bson/bsonobj.h"
40 #include "mongo/bson/bsonobjbuilder.h"
41 #include "mongo/bson/bsontypes.h"
42 #include "mongo/db/pipeline/document.h"
43 #include "mongo/db/pipeline/document_source_tee_consumer.h"
44 #include "mongo/db/pipeline/expression_context.h"
45 #include "mongo/db/pipeline/field_path.h"
46 #include "mongo/db/pipeline/pipeline.h"
47 #include "mongo/db/pipeline/tee_buffer.h"
48 #include "mongo/db/pipeline/value.h"
49 #include "mongo/stdx/memory.h"
50 #include "mongo/util/assert_util.h"
51 #include "mongo/util/mongoutils/str.h"
52
53 namespace mongo {
54
55 using boost::intrusive_ptr;
56 using std::pair;
57 using std::string;
58 using std::vector;
59
DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines,const intrusive_ptr<ExpressionContext> & expCtx,size_t bufferSizeBytes,size_t maxOutputDocBytes)60 DocumentSourceFacet::DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines,
61 const intrusive_ptr<ExpressionContext>& expCtx,
62 size_t bufferSizeBytes,
63 size_t maxOutputDocBytes)
64 : DocumentSourceNeedsMongoProcessInterface(expCtx),
65 _teeBuffer(TeeBuffer::create(facetPipelines.size(), bufferSizeBytes)),
66 _facets(std::move(facetPipelines)),
67 _maxOutputDocSizeBytes(maxOutputDocBytes) {
68 for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
69 auto& facet = _facets[facetId];
70 facet.pipeline->addInitialSource(
71 DocumentSourceTeeConsumer::create(pExpCtx, facetId, _teeBuffer));
72 }
73 }
74
75 namespace {
76 /**
77 * Extracts the names of the facets and the vectors of raw BSONObjs representing the stages within
78 * that facet's pipeline.
79 *
80 * Throws a AssertionException if it fails to parse for any reason.
81 */
extractRawPipelines(const BSONElement & elem)82 vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& elem) {
83 uassert(40169,
84 str::stream() << "the $facet specification must be a non-empty object, but found: "
85 << elem,
86 elem.type() == BSONType::Object && !elem.embeddedObject().isEmpty());
87
88 vector<pair<string, vector<BSONObj>>> rawFacetPipelines;
89 for (auto&& facetElem : elem.embeddedObject()) {
90 const auto facetName = facetElem.fieldNameStringData();
91 FieldPath::uassertValidFieldName(facetName);
92 uassert(40170,
93 str::stream() << "arguments to $facet must be arrays, " << facetName << " is type "
94 << typeName(facetElem.type()),
95 facetElem.type() == BSONType::Array);
96
97 vector<BSONObj> rawPipeline;
98 for (auto&& subPipeElem : facetElem.Obj()) {
99 uassert(40171,
100 str::stream() << "elements of arrays in $facet spec must be non-empty objects, "
101 << facetName
102 << " argument contained an element of type "
103 << typeName(subPipeElem.type())
104 << ": "
105 << subPipeElem,
106 subPipeElem.type() == BSONType::Object);
107 rawPipeline.push_back(subPipeElem.embeddedObject());
108 }
109
110 rawFacetPipelines.emplace_back(facetName.toString(), std::move(rawPipeline));
111 }
112 return rawFacetPipelines;
113 }
114 } // namespace
115
parse(const AggregationRequest & request,const BSONElement & spec)116 std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed::parse(
117 const AggregationRequest& request, const BSONElement& spec) {
118 std::vector<LiteParsedPipeline> liteParsedPipelines;
119
120 for (auto&& rawPipeline : extractRawPipelines(spec)) {
121 liteParsedPipelines.emplace_back(
122 AggregationRequest(request.getNamespaceString(), rawPipeline.second));
123 }
124
125 PrivilegeVector requiredPrivileges;
126 for (auto&& pipeline : liteParsedPipelines) {
127
128 // A correct isMongos flag is only required for DocumentSourceCurrentOp which is disallowed
129 // in $facet pipelines.
130 const bool unusedIsMongosFlag = false;
131 Privilege::addPrivilegesToPrivilegeVector(&requiredPrivileges,
132 pipeline.requiredPrivileges(unusedIsMongosFlag));
133 }
134
135 return stdx::make_unique<DocumentSourceFacet::LiteParsed>(std::move(liteParsedPipelines),
136 std::move(requiredPrivileges));
137 }
138
getInvolvedNamespaces() const139 stdx::unordered_set<NamespaceString> DocumentSourceFacet::LiteParsed::getInvolvedNamespaces()
140 const {
141 stdx::unordered_set<NamespaceString> involvedNamespaces;
142 for (auto&& liteParsedPipeline : _liteParsedPipelines) {
143 auto involvedInSubPipe = liteParsedPipeline.getInvolvedNamespaces();
144 involvedNamespaces.insert(involvedInSubPipe.begin(), involvedInSubPipe.end());
145 }
146 return involvedNamespaces;
147 }
148
149 REGISTER_DOCUMENT_SOURCE(facet,
150 DocumentSourceFacet::LiteParsed::parse,
151 DocumentSourceFacet::createFromBson);
152
create(std::vector<FacetPipeline> facetPipelines,const intrusive_ptr<ExpressionContext> & expCtx,size_t bufferSizeBytes,size_t maxOutputDocBytes)153 intrusive_ptr<DocumentSourceFacet> DocumentSourceFacet::create(
154 std::vector<FacetPipeline> facetPipelines,
155 const intrusive_ptr<ExpressionContext>& expCtx,
156 size_t bufferSizeBytes,
157 size_t maxOutputDocBytes) {
158 return new DocumentSourceFacet(
159 std::move(facetPipelines), expCtx, bufferSizeBytes, maxOutputDocBytes);
160 }
161
setSource(DocumentSource * source)162 void DocumentSourceFacet::setSource(DocumentSource* source) {
163 _teeBuffer->setSource(source);
164 }
165
doDispose()166 void DocumentSourceFacet::doDispose() {
167 for (auto&& facet : _facets) {
168 facet.pipeline.get_deleter().dismissDisposal();
169 facet.pipeline->dispose(pExpCtx->opCtx);
170 }
171 }
172
getNext()173 DocumentSource::GetNextResult DocumentSourceFacet::getNext() {
174 pExpCtx->checkForInterrupt();
175
176 if (_done) {
177 return GetNextResult::makeEOF();
178 }
179
180 const size_t maxBytes = _maxOutputDocSizeBytes;
181 auto ensureUnderMemoryLimit = [ usedBytes = 0ul, &maxBytes ](long long additional) mutable {
182 usedBytes += additional;
183 uassert(4031700,
184 str::stream() << "document constructed by $facet is " << usedBytes
185 << " bytes, which exceeds the limit of "
186 << maxBytes
187 << " bytes",
188 usedBytes <= maxBytes);
189 };
190
191 vector<vector<Value>> results(_facets.size());
192 bool allPipelinesEOF = false;
193 while (!allPipelinesEOF) {
194 allPipelinesEOF = true; // Set this to false if any pipeline isn't EOF.
195 for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
196 const auto& pipeline = _facets[facetId].pipeline;
197 auto next = pipeline->getSources().back()->getNext();
198 for (; next.isAdvanced(); next = pipeline->getSources().back()->getNext()) {
199 ensureUnderMemoryLimit(next.getDocument().getApproximateSize());
200 results[facetId].emplace_back(next.releaseDocument());
201 }
202 allPipelinesEOF = allPipelinesEOF && next.isEOF();
203 }
204 }
205
206 MutableDocument resultDoc;
207 for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
208 resultDoc[_facets[facetId].name] = Value(std::move(results[facetId]));
209 }
210
211 _done = true; // We will only ever produce one result.
212 return resultDoc.freeze();
213 }
214
serialize(boost::optional<ExplainOptions::Verbosity> explain) const215 Value DocumentSourceFacet::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
216 MutableDocument serialized;
217 for (auto&& facet : _facets) {
218 serialized[facet.name] = Value(explain ? facet.pipeline->writeExplainOps(*explain)
219 : facet.pipeline->serialize());
220 }
221 return Value(Document{{"$facet", serialized.freezeToValue()}});
222 }
223
addInvolvedCollections(vector<NamespaceString> * collections) const224 void DocumentSourceFacet::addInvolvedCollections(vector<NamespaceString>* collections) const {
225 for (auto&& facet : _facets) {
226 for (auto&& source : facet.pipeline->getSources()) {
227 source->addInvolvedCollections(collections);
228 }
229 }
230 }
231
optimize()232 intrusive_ptr<DocumentSource> DocumentSourceFacet::optimize() {
233 for (auto&& facet : _facets) {
234 facet.pipeline->optimizePipeline();
235 }
236 return this;
237 }
238
doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> pipelineContext)239 void DocumentSourceFacet::doInjectMongoProcessInterface(
240 std::shared_ptr<MongoProcessInterface> pipelineContext) {
241 for (auto&& facet : _facets) {
242 for (auto&& stage : facet.pipeline->getSources()) {
243 if (auto stageNeedingMongoProcessInterface =
244 dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(stage.get())) {
245 stageNeedingMongoProcessInterface->injectMongoProcessInterface(pipelineContext);
246 }
247 }
248 }
249 }
250
doDetachFromOperationContext()251 void DocumentSourceFacet::doDetachFromOperationContext() {
252 for (auto&& facet : _facets) {
253 facet.pipeline->detachFromOperationContext();
254 }
255 }
256
doReattachToOperationContext(OperationContext * opCtx)257 void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) {
258 for (auto&& facet : _facets) {
259 facet.pipeline->reattachToOperationContext(opCtx);
260 }
261 }
262
constraints(Pipeline::SplitState pipeState) const263 DocumentSource::StageConstraints DocumentSourceFacet::constraints(
264 Pipeline::SplitState pipeState) const {
265 const bool mayUseDisk = std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
266 const auto sources = facet.pipeline->getSources();
267 return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
268 return source->constraints().diskRequirement == DiskUseRequirement::kWritesTmpData;
269 });
270 });
271
272 // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
273 // This means that if any stage in any of the $facet pipelines requires the primary shard, then
274 // the entire $facet must happen on the merger, and the merger must be the primary shard.
275 const bool needsPrimaryShard =
276 std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
277 const auto sources = facet.pipeline->getSources();
278 return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
279 return source->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard;
280 });
281 });
282
283 return {StreamType::kBlocking,
284 PositionRequirement::kNone,
285 needsPrimaryShard ? HostTypeRequirement::kPrimaryShard : HostTypeRequirement::kAnyShard,
286 mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse,
287 FacetRequirement::kNotAllowed};
288 }
289
getDependencies(DepsTracker * deps) const290 DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
291 const bool scopeHasVariables = pExpCtx->variablesParseState.hasDefinedVariables();
292 for (auto&& facet : _facets) {
293 auto subDepsTracker = facet.pipeline->getDependencies(deps->getMetadataAvailable());
294
295 deps->fields.insert(subDepsTracker.fields.begin(), subDepsTracker.fields.end());
296 deps->vars.insert(subDepsTracker.vars.begin(), subDepsTracker.vars.end());
297
298 deps->needWholeDocument = deps->needWholeDocument || subDepsTracker.needWholeDocument;
299 deps->setNeedTextScore(deps->getNeedTextScore() || subDepsTracker.getNeedTextScore());
300
301 // If there are variables defined at this stage's scope, there may be dependencies upon
302 // them in subsequent pipelines. Keep enumerating.
303 if (deps->needWholeDocument && deps->getNeedTextScore() && !scopeHasVariables) {
304 break;
305 }
306 }
307
308 // We will combine multiple documents into one, and the output document will have new fields, so
309 // we will stop looking for dependencies at this point.
310 return GetDepsReturn::EXHAUSTIVE_ALL;
311 }
312
createFromBson(BSONElement elem,const intrusive_ptr<ExpressionContext> & expCtx)313 intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
314 BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
315
316 std::vector<FacetPipeline> facetPipelines;
317 for (auto&& rawFacet : extractRawPipelines(elem)) {
318 const auto facetName = rawFacet.first;
319
320 auto pipeline = uassertStatusOK(Pipeline::parseFacetPipeline(rawFacet.second, expCtx));
321
322 facetPipelines.emplace_back(facetName, std::move(pipeline));
323 }
324
325 return DocumentSourceFacet::create(std::move(facetPipelines), expCtx);
326 }
327 } // namespace mongo
328