1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/db/pipeline/document_source_facet.h"
34 
35 #include <memory>
36 #include <vector>
37 
38 #include "mongo/base/string_data.h"
39 #include "mongo/bson/bsonobj.h"
40 #include "mongo/bson/bsonobjbuilder.h"
41 #include "mongo/bson/bsontypes.h"
42 #include "mongo/db/pipeline/document.h"
43 #include "mongo/db/pipeline/document_source_tee_consumer.h"
44 #include "mongo/db/pipeline/expression_context.h"
45 #include "mongo/db/pipeline/field_path.h"
46 #include "mongo/db/pipeline/pipeline.h"
47 #include "mongo/db/pipeline/tee_buffer.h"
48 #include "mongo/db/pipeline/value.h"
49 #include "mongo/stdx/memory.h"
50 #include "mongo/util/assert_util.h"
51 #include "mongo/util/mongoutils/str.h"
52 
53 namespace mongo {
54 
55 using boost::intrusive_ptr;
56 using std::pair;
57 using std::string;
58 using std::vector;
59 
DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines,const intrusive_ptr<ExpressionContext> & expCtx,size_t bufferSizeBytes,size_t maxOutputDocBytes)60 DocumentSourceFacet::DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines,
61                                          const intrusive_ptr<ExpressionContext>& expCtx,
62                                          size_t bufferSizeBytes,
63                                          size_t maxOutputDocBytes)
64     : DocumentSourceNeedsMongoProcessInterface(expCtx),
65       _teeBuffer(TeeBuffer::create(facetPipelines.size(), bufferSizeBytes)),
66       _facets(std::move(facetPipelines)),
67       _maxOutputDocSizeBytes(maxOutputDocBytes) {
68     for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
69         auto& facet = _facets[facetId];
70         facet.pipeline->addInitialSource(
71             DocumentSourceTeeConsumer::create(pExpCtx, facetId, _teeBuffer));
72     }
73 }
74 
75 namespace {
76 /**
77  * Extracts the names of the facets and the vectors of raw BSONObjs representing the stages within
78  * that facet's pipeline.
79  *
80  * Throws a AssertionException if it fails to parse for any reason.
81  */
extractRawPipelines(const BSONElement & elem)82 vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& elem) {
83     uassert(40169,
84             str::stream() << "the $facet specification must be a non-empty object, but found: "
85                           << elem,
86             elem.type() == BSONType::Object && !elem.embeddedObject().isEmpty());
87 
88     vector<pair<string, vector<BSONObj>>> rawFacetPipelines;
89     for (auto&& facetElem : elem.embeddedObject()) {
90         const auto facetName = facetElem.fieldNameStringData();
91         FieldPath::uassertValidFieldName(facetName);
92         uassert(40170,
93                 str::stream() << "arguments to $facet must be arrays, " << facetName << " is type "
94                               << typeName(facetElem.type()),
95                 facetElem.type() == BSONType::Array);
96 
97         vector<BSONObj> rawPipeline;
98         for (auto&& subPipeElem : facetElem.Obj()) {
99             uassert(40171,
100                     str::stream() << "elements of arrays in $facet spec must be non-empty objects, "
101                                   << facetName
102                                   << " argument contained an element of type "
103                                   << typeName(subPipeElem.type())
104                                   << ": "
105                                   << subPipeElem,
106                     subPipeElem.type() == BSONType::Object);
107             rawPipeline.push_back(subPipeElem.embeddedObject());
108         }
109 
110         rawFacetPipelines.emplace_back(facetName.toString(), std::move(rawPipeline));
111     }
112     return rawFacetPipelines;
113 }
114 }  // namespace
115 
parse(const AggregationRequest & request,const BSONElement & spec)116 std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed::parse(
117     const AggregationRequest& request, const BSONElement& spec) {
118     std::vector<LiteParsedPipeline> liteParsedPipelines;
119 
120     for (auto&& rawPipeline : extractRawPipelines(spec)) {
121         liteParsedPipelines.emplace_back(
122             AggregationRequest(request.getNamespaceString(), rawPipeline.second));
123     }
124 
125     PrivilegeVector requiredPrivileges;
126     for (auto&& pipeline : liteParsedPipelines) {
127 
128         // A correct isMongos flag is only required for DocumentSourceCurrentOp which is disallowed
129         // in $facet pipelines.
130         const bool unusedIsMongosFlag = false;
131         Privilege::addPrivilegesToPrivilegeVector(&requiredPrivileges,
132                                                   pipeline.requiredPrivileges(unusedIsMongosFlag));
133     }
134 
135     return stdx::make_unique<DocumentSourceFacet::LiteParsed>(std::move(liteParsedPipelines),
136                                                               std::move(requiredPrivileges));
137 }
138 
getInvolvedNamespaces() const139 stdx::unordered_set<NamespaceString> DocumentSourceFacet::LiteParsed::getInvolvedNamespaces()
140     const {
141     stdx::unordered_set<NamespaceString> involvedNamespaces;
142     for (auto&& liteParsedPipeline : _liteParsedPipelines) {
143         auto involvedInSubPipe = liteParsedPipeline.getInvolvedNamespaces();
144         involvedNamespaces.insert(involvedInSubPipe.begin(), involvedInSubPipe.end());
145     }
146     return involvedNamespaces;
147 }
148 
149 REGISTER_DOCUMENT_SOURCE(facet,
150                          DocumentSourceFacet::LiteParsed::parse,
151                          DocumentSourceFacet::createFromBson);
152 
create(std::vector<FacetPipeline> facetPipelines,const intrusive_ptr<ExpressionContext> & expCtx,size_t bufferSizeBytes,size_t maxOutputDocBytes)153 intrusive_ptr<DocumentSourceFacet> DocumentSourceFacet::create(
154     std::vector<FacetPipeline> facetPipelines,
155     const intrusive_ptr<ExpressionContext>& expCtx,
156     size_t bufferSizeBytes,
157     size_t maxOutputDocBytes) {
158     return new DocumentSourceFacet(
159         std::move(facetPipelines), expCtx, bufferSizeBytes, maxOutputDocBytes);
160 }
161 
setSource(DocumentSource * source)162 void DocumentSourceFacet::setSource(DocumentSource* source) {
163     _teeBuffer->setSource(source);
164 }
165 
doDispose()166 void DocumentSourceFacet::doDispose() {
167     for (auto&& facet : _facets) {
168         facet.pipeline.get_deleter().dismissDisposal();
169         facet.pipeline->dispose(pExpCtx->opCtx);
170     }
171 }
172 
getNext()173 DocumentSource::GetNextResult DocumentSourceFacet::getNext() {
174     pExpCtx->checkForInterrupt();
175 
176     if (_done) {
177         return GetNextResult::makeEOF();
178     }
179 
180     const size_t maxBytes = _maxOutputDocSizeBytes;
181     auto ensureUnderMemoryLimit = [ usedBytes = 0ul, &maxBytes ](long long additional) mutable {
182         usedBytes += additional;
183         uassert(4031700,
184                 str::stream() << "document constructed by $facet is " << usedBytes
185                               << " bytes, which exceeds the limit of "
186                               << maxBytes
187                               << " bytes",
188                 usedBytes <= maxBytes);
189     };
190 
191     vector<vector<Value>> results(_facets.size());
192     bool allPipelinesEOF = false;
193     while (!allPipelinesEOF) {
194         allPipelinesEOF = true;  // Set this to false if any pipeline isn't EOF.
195         for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
196             const auto& pipeline = _facets[facetId].pipeline;
197             auto next = pipeline->getSources().back()->getNext();
198             for (; next.isAdvanced(); next = pipeline->getSources().back()->getNext()) {
199                 ensureUnderMemoryLimit(next.getDocument().getApproximateSize());
200                 results[facetId].emplace_back(next.releaseDocument());
201             }
202             allPipelinesEOF = allPipelinesEOF && next.isEOF();
203         }
204     }
205 
206     MutableDocument resultDoc;
207     for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
208         resultDoc[_facets[facetId].name] = Value(std::move(results[facetId]));
209     }
210 
211     _done = true;  // We will only ever produce one result.
212     return resultDoc.freeze();
213 }
214 
serialize(boost::optional<ExplainOptions::Verbosity> explain) const215 Value DocumentSourceFacet::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
216     MutableDocument serialized;
217     for (auto&& facet : _facets) {
218         serialized[facet.name] = Value(explain ? facet.pipeline->writeExplainOps(*explain)
219                                                : facet.pipeline->serialize());
220     }
221     return Value(Document{{"$facet", serialized.freezeToValue()}});
222 }
223 
addInvolvedCollections(vector<NamespaceString> * collections) const224 void DocumentSourceFacet::addInvolvedCollections(vector<NamespaceString>* collections) const {
225     for (auto&& facet : _facets) {
226         for (auto&& source : facet.pipeline->getSources()) {
227             source->addInvolvedCollections(collections);
228         }
229     }
230 }
231 
optimize()232 intrusive_ptr<DocumentSource> DocumentSourceFacet::optimize() {
233     for (auto&& facet : _facets) {
234         facet.pipeline->optimizePipeline();
235     }
236     return this;
237 }
238 
doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> pipelineContext)239 void DocumentSourceFacet::doInjectMongoProcessInterface(
240     std::shared_ptr<MongoProcessInterface> pipelineContext) {
241     for (auto&& facet : _facets) {
242         for (auto&& stage : facet.pipeline->getSources()) {
243             if (auto stageNeedingMongoProcessInterface =
244                     dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(stage.get())) {
245                 stageNeedingMongoProcessInterface->injectMongoProcessInterface(pipelineContext);
246             }
247         }
248     }
249 }
250 
doDetachFromOperationContext()251 void DocumentSourceFacet::doDetachFromOperationContext() {
252     for (auto&& facet : _facets) {
253         facet.pipeline->detachFromOperationContext();
254     }
255 }
256 
doReattachToOperationContext(OperationContext * opCtx)257 void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) {
258     for (auto&& facet : _facets) {
259         facet.pipeline->reattachToOperationContext(opCtx);
260     }
261 }
262 
constraints(Pipeline::SplitState pipeState) const263 DocumentSource::StageConstraints DocumentSourceFacet::constraints(
264     Pipeline::SplitState pipeState) const {
265     const bool mayUseDisk = std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
266         const auto sources = facet.pipeline->getSources();
267         return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
268             return source->constraints().diskRequirement == DiskUseRequirement::kWritesTmpData;
269         });
270     });
271 
272     // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
273     // This means that if any stage in any of the $facet pipelines requires the primary shard, then
274     // the entire $facet must happen on the merger, and the merger must be the primary shard.
275     const bool needsPrimaryShard =
276         std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
277             const auto sources = facet.pipeline->getSources();
278             return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
279                 return source->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard;
280             });
281         });
282 
283     return {StreamType::kBlocking,
284             PositionRequirement::kNone,
285             needsPrimaryShard ? HostTypeRequirement::kPrimaryShard : HostTypeRequirement::kAnyShard,
286             mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse,
287             FacetRequirement::kNotAllowed};
288 }
289 
getDependencies(DepsTracker * deps) const290 DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
291     const bool scopeHasVariables = pExpCtx->variablesParseState.hasDefinedVariables();
292     for (auto&& facet : _facets) {
293         auto subDepsTracker = facet.pipeline->getDependencies(deps->getMetadataAvailable());
294 
295         deps->fields.insert(subDepsTracker.fields.begin(), subDepsTracker.fields.end());
296         deps->vars.insert(subDepsTracker.vars.begin(), subDepsTracker.vars.end());
297 
298         deps->needWholeDocument = deps->needWholeDocument || subDepsTracker.needWholeDocument;
299         deps->setNeedTextScore(deps->getNeedTextScore() || subDepsTracker.getNeedTextScore());
300 
301         // If there are variables defined at this stage's scope, there may be dependencies upon
302         // them in subsequent pipelines. Keep enumerating.
303         if (deps->needWholeDocument && deps->getNeedTextScore() && !scopeHasVariables) {
304             break;
305         }
306     }
307 
308     // We will combine multiple documents into one, and the output document will have new fields, so
309     // we will stop looking for dependencies at this point.
310     return GetDepsReturn::EXHAUSTIVE_ALL;
311 }
312 
createFromBson(BSONElement elem,const intrusive_ptr<ExpressionContext> & expCtx)313 intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
314     BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
315 
316     std::vector<FacetPipeline> facetPipelines;
317     for (auto&& rawFacet : extractRawPipelines(elem)) {
318         const auto facetName = rawFacet.first;
319 
320         auto pipeline = uassertStatusOK(Pipeline::parseFacetPipeline(rawFacet.second, expCtx));
321 
322         facetPipelines.emplace_back(facetName, std::move(pipeline));
323     }
324 
325     return DocumentSourceFacet::create(std::move(facetPipelines), expCtx);
326 }
327 }  // namespace mongo
328