1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/db/pipeline/document_source_lookup.h"
34 
35 #include "mongo/base/init.h"
36 #include "mongo/db/jsobj.h"
37 #include "mongo/db/matcher/expression_algo.h"
38 #include "mongo/db/pipeline/document.h"
39 #include "mongo/db/pipeline/document_path_support.h"
40 #include "mongo/db/pipeline/expression.h"
41 #include "mongo/db/pipeline/expression_context.h"
42 #include "mongo/db/pipeline/value.h"
43 #include "mongo/db/query/query_knobs.h"
44 #include "mongo/stdx/memory.h"
45 
46 namespace mongo {
47 
48 using boost::intrusive_ptr;
49 using std::vector;
50 
51 namespace {
pipelineToString(const vector<BSONObj> & pipeline)52 std::string pipelineToString(const vector<BSONObj>& pipeline) {
53     StringBuilder sb;
54     sb << "[";
55 
56     auto first = true;
57     for (auto& stageSpec : pipeline) {
58         if (!first) {
59             sb << ", ";
60         } else {
61             first = false;
62         }
63         sb << stageSpec;
64     }
65     sb << "]";
66     return sb.str();
67 }
68 }  // namespace
69 
70 constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth;
71 
DocumentSourceLookUp(NamespaceString fromNs,std::string as,const boost::intrusive_ptr<ExpressionContext> & pExpCtx)72 DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
73                                            std::string as,
74                                            const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
75     : DocumentSourceNeedsMongoProcessInterface(pExpCtx),
76       _fromNs(std::move(fromNs)),
77       _as(std::move(as)),
78       _variables(pExpCtx->variables),
79       _variablesParseState(pExpCtx->variablesParseState.copyWith(_variables.useIdGenerator())) {
80     const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_fromNs);
81     _resolvedNs = resolvedNamespace.ns;
82     _resolvedPipeline = resolvedNamespace.pipeline;
83 
84     // We always set an explicit collator on the copied expression context, even if the collator is
85     // null (i.e. the simple collation). Otherwise, in the situation where the user did not specify
86     // a collation and the simple collation was inherited from the local collection, the execution
87     // machinery will incorrectly interpret the null collator and empty user collation as an
88     // indication that it should inherit the foreign collection's collation.
89     _fromExpCtx =
90         pExpCtx->copyWith(_resolvedNs,
91                           boost::none,
92                           pExpCtx->getCollator() ? pExpCtx->getCollator()->clone() : nullptr);
93 
94     _fromExpCtx->subPipelineDepth += 1;
95     uassert(ErrorCodes::MaxSubPipelineDepthExceeded,
96             str::stream() << "Maximum number of nested $lookup sub-pipelines exceeded. Limit is "
97                           << kMaxSubPipelineDepth,
98             _fromExpCtx->subPipelineDepth <= kMaxSubPipelineDepth);
99 }
100 
DocumentSourceLookUp(NamespaceString fromNs,std::string as,std::string localField,std::string foreignField,const boost::intrusive_ptr<ExpressionContext> & pExpCtx)101 DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
102                                            std::string as,
103                                            std::string localField,
104                                            std::string foreignField,
105                                            const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
106     : DocumentSourceLookUp(fromNs, as, pExpCtx) {
107     _localField = std::move(localField);
108     _foreignField = std::move(foreignField);
109     // We append an additional BSONObj to '_resolvedPipeline' as a placeholder for the $match stage
110     // we'll eventually construct from the input document.
111     _resolvedPipeline.reserve(_resolvedPipeline.size() + 1);
112     _resolvedPipeline.push_back(BSONObj());
113 }
114 
DocumentSourceLookUp(NamespaceString fromNs,std::string as,std::vector<BSONObj> pipeline,BSONObj letVariables,const boost::intrusive_ptr<ExpressionContext> & pExpCtx)115 DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
116                                            std::string as,
117                                            std::vector<BSONObj> pipeline,
118                                            BSONObj letVariables,
119                                            const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
120     : DocumentSourceLookUp(fromNs, as, pExpCtx) {
121     // '_resolvedPipeline' will first be initialized by the constructor delegated to within this
122     // constructor's initializer list. It will be populated with view pipeline prefix if 'fromNs'
123     // represents a view. We append the user 'pipeline' to the end of '_resolvedPipeline' to ensure
124     // any view prefix is not overwritten.
125     _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end());
126 
127     _userPipeline = std::move(pipeline);
128 
129     _cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load());
130 
131     for (auto&& varElem : letVariables) {
132         const auto varName = varElem.fieldNameStringData();
133         Variables::uassertValidNameForUserWrite(varName);
134 
135         _letVariables.emplace_back(
136             varName.toString(),
137             Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState),
138             _variablesParseState.defineVariable(varName));
139     }
140 
141     initializeIntrospectionPipeline();
142 }
143 
parse(const AggregationRequest & request,const BSONElement & spec)144 std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse(
145     const AggregationRequest& request, const BSONElement& spec) {
146     uassert(ErrorCodes::FailedToParse,
147             str::stream() << "the $lookup stage specification must be an object, but found "
148                           << typeName(spec.type()),
149             spec.type() == BSONType::Object);
150 
151     auto specObj = spec.Obj();
152     auto fromElement = specObj["from"];
153     uassert(ErrorCodes::FailedToParse,
154             str::stream() << "missing 'from' option to $lookup stage specification: " << specObj,
155             fromElement);
156     uassert(ErrorCodes::FailedToParse,
157             str::stream() << "'from' option to $lookup must be a string, but was type "
158                           << typeName(specObj["from"].type()),
159             fromElement.type() == BSONType::String);
160 
161     NamespaceString fromNss(request.getNamespaceString().db(), fromElement.valueStringData());
162     uassert(ErrorCodes::InvalidNamespace,
163             str::stream() << "invalid $lookup namespace: " << fromNss.ns(),
164             fromNss.isValid());
165 
166     stdx::unordered_set<NamespaceString> foreignNssSet;
167 
168     // Recursively lite parse the nested pipeline, if one exists.
169     auto pipelineElem = specObj["pipeline"];
170     boost::optional<LiteParsedPipeline> liteParsedPipeline;
171     if (pipelineElem) {
172         auto pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(pipelineElem));
173         AggregationRequest foreignAggReq(fromNss, std::move(pipeline));
174         liteParsedPipeline = LiteParsedPipeline(foreignAggReq);
175 
176         auto pipelineInvolvedNamespaces = liteParsedPipeline->getInvolvedNamespaces();
177         foreignNssSet.insert(pipelineInvolvedNamespaces.begin(), pipelineInvolvedNamespaces.end());
178     }
179 
180     foreignNssSet.insert(fromNss);
181 
182     return stdx::make_unique<DocumentSourceLookUp::LiteParsed>(
183         std::move(fromNss), std::move(foreignNssSet), std::move(liteParsedPipeline));
184 }
185 
186 REGISTER_DOCUMENT_SOURCE(lookup,
187                          DocumentSourceLookUp::LiteParsed::parse,
188                          DocumentSourceLookUp::createFromBson);
189 
getSourceName() const190 const char* DocumentSourceLookUp::getSourceName() const {
191     return "$lookup";
192 }
193 
194 namespace {
195 
196 /**
197  * Constructs a query of the following shape:
198  *  {$or: [
199  *    {'fieldName': {$eq: 'values[0]'}},
200  *    {'fieldName': {$eq: 'values[1]'}},
201  *    ...
202  *  ]}
203  */
buildEqualityOrQuery(const std::string & fieldName,const BSONArray & values)204 BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) {
205     BSONObjBuilder orBuilder;
206     {
207         BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or"));
208         for (auto&& value : values) {
209             orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value)));
210         }
211     }
212     return orBuilder.obj();
213 }
214 
215 }  // namespace
216 
getNext()217 DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
218     pExpCtx->checkForInterrupt();
219 
220     if (_unwindSrc) {
221         return unwindResult();
222     }
223 
224     auto nextInput = pSource->getNext();
225     if (!nextInput.isAdvanced()) {
226         return nextInput;
227     }
228 
229     auto inputDoc = nextInput.releaseDocument();
230 
231     // If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind,
232     // '_unwindSrc' would be non-null, and we would not have made it here.
233     invariant(!_matchSrc);
234 
235     if (!wasConstructedWithPipelineSyntax()) {
236         auto matchStage =
237             makeMatchStageFromInput(inputDoc, *_localField, _foreignField->fullPath(), BSONObj());
238         // We've already allocated space for the trailing $match stage in '_resolvedPipeline'.
239         _resolvedPipeline.back() = matchStage;
240     }
241 
242     auto pipeline = buildPipeline(inputDoc);
243 
244     std::vector<Value> results;
245     int objsize = 0;
246 
247     const auto maxBytes = internalLookupStageIntermediateDocumentMaxSizeBytes.load();
248     while (auto result = pipeline->getNext()) {
249         objsize += result->getApproximateSize();
250         uassert(4568,
251                 str::stream() << "Total size of documents in " << _fromNs.coll()
252                               << " matching pipeline's $lookup stage exceeds "
253                               << maxBytes
254                               << " bytes",
255 
256                 objsize <= maxBytes);
257         results.emplace_back(std::move(*result));
258     }
259 
260     MutableDocument output(std::move(inputDoc));
261     output.setNestedField(_as, Value(std::move(results)));
262     return output.freeze();
263 }
264 
buildPipeline(const Document & inputDoc)265 std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline(
266     const Document& inputDoc) {
267     // Copy all 'let' variables into the foreign pipeline's expression context.
268     copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
269 
270     // Resolve the 'let' variables to values per the given input document.
271     resolveLetVariables(inputDoc, &_fromExpCtx->variables);
272 
273     // If we don't have a cache, build and return the pipeline immediately.
274     if (!_cache || _cache->isAbandoned()) {
275         return uassertStatusOK(
276             _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx));
277     }
278 
279     // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
280     // cursor source. If the cache is present and serving, then we will not be adding a cursor
281     // source later, so inject a MongoProcessInterface into all stages that need one.
282     MongoProcessInterface::MakePipelineOptions pipelineOpts;
283 
284     pipelineOpts.optimize = false;
285     pipelineOpts.attachCursorSource = false;
286     pipelineOpts.forceInjectMongoProcessInterface = _cache->isServing();
287 
288     // Construct the basic pipeline without a cache stage.
289     auto pipeline = uassertStatusOK(
290         _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
291 
292     // Add the cache stage at the end and optimize. During the optimization process, the cache will
293     // either move itself to the correct position in the pipeline, or will abandon itself if no
294     // suitable cache position exists.
295     pipeline->addFinalSource(
296         DocumentSourceSequentialDocumentCache::create(_fromExpCtx, _cache.get_ptr()));
297 
298     pipeline->optimizePipeline();
299 
300     if (!_cache->isServing()) {
301         // The cache has either been abandoned or has not yet been built. Attach a cursor.
302         uassertStatusOK(
303             _mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get()));
304     }
305 
306     // If the cache has been abandoned, release it.
307     if (_cache->isAbandoned()) {
308         _cache.reset();
309     }
310 
311     return pipeline;
312 }
313 
getModifiedPaths() const314 DocumentSource::GetModPathsReturn DocumentSourceLookUp::getModifiedPaths() const {
315     std::set<std::string> modifiedPaths{_as.fullPath()};
316     if (_unwindSrc) {
317         auto pathsModifiedByUnwind = _unwindSrc->getModifiedPaths();
318         invariant(pathsModifiedByUnwind.type == GetModPathsReturn::Type::kFiniteSet);
319         modifiedPaths.insert(pathsModifiedByUnwind.paths.begin(),
320                              pathsModifiedByUnwind.paths.end());
321     }
322     return {GetModPathsReturn::Type::kFiniteSet, std::move(modifiedPaths), {}};
323 }
324 
doOptimizeAt(Pipeline::SourceContainer::iterator itr,Pipeline::SourceContainer * container)325 Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
326     Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
327     invariant(*itr == this);
328 
329     auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get());
330 
331     // If we are not already handling an $unwind stage internally, we can combine with the
332     // following $unwind stage.
333     if (nextUnwind && !_unwindSrc && nextUnwind->getUnwindPath() == _as.fullPath()) {
334         _unwindSrc = std::move(nextUnwind);
335         container->erase(std::next(itr));
336         return itr;
337     }
338 
339     // Attempt to internalize any predicates of a $match upon the "_as" field.
340     auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
341 
342     if (!nextMatch) {
343         return std::next(itr);
344     }
345 
346     if (!_unwindSrc || _unwindSrc->indexPath() || _unwindSrc->preserveNullAndEmptyArrays()) {
347         // We must be unwinding our result to internalize a $match. For example, consider the
348         // following pipeline:
349         //
350         // Input: {_id: 0}
351         // Foreign Collection: {a: 0, b: 0}, {a: 0, b: 5}
352         // Pipeline:
353         //   {$lookup: {localField: "_id", foreignField: "a", as: "foo"}}
354         //   {$match: {'foo.b': {$gt: 0}}}
355         // Output: {_id: 0, foo: [{a: 0, b: 0}, {a: 0, b: 5}]}
356         //
357         // If we executed {b: {$gt: 0}} as part of our $lookup, our output would instead be:
358         // {_id: 0, foo: [{a: 0, b: 5}]}
359         //
360         // However, if we are already unwinding 'foo', then we can move the $match inside, since it
361         // will have the same effect as filtering the unwound results, that is, the output will be:
362         // {_id: 0, foo: {a: 0, b: 5}}
363         //
364         // Note that we cannot absorb a $match if the absorbed $unwind has
365         // "preserveNullAndEmptyArrays" set to true, for the following reason: A document that had
366         // an empty output array from $lookup would be preserved by the $unwind, but could be
367         // removed by the $match. However, if we absorb the $match into the $lookup, our joined
368         // query inside the $lookup will output an empty array, which $unwind will then preserve.
369         // Thus, depending on the optimization, the user would see a different output.
370         //
371         // In addition, we must avoid internalizing a $match if an absorbed $unwind has an
372         // "includeArrayIndex" option, since the $match will alter the indices of the returned
373         // values.
374         return std::next(itr);
375     }
376 
377     auto outputPath = _as.fullPath();
378 
379     // Since $match splitting is handled in a generic way, we expect to have already swapped
380     // portions of the $match that do not depend on the 'as' path or on an internalized $unwind's
381     // index path before ourselves. But due to the early return above, we know there is no
382     // internalized $unwind with an index path.
383     //
384     // Therefore, 'nextMatch' should only depend on the 'as' path. We now try to absorb the match on
385     // the 'as' path in order to push down these predicates into the foreign collection.
386     bool isMatchOnlyOnAs = true;
387     auto computeWhetherMatchOnAs = [&isMatchOnlyOnAs, &outputPath](MatchExpression* expression,
388                                                                    std::string path) -> void {
389         // If 'expression' is the child of a $elemMatch, we cannot internalize the $match. For
390         // example, {b: {$elemMatch: {$gt: 1, $lt: 4}}}, where "b" is our "_as" field. This is
391         // because there's no way to modify the expression to be a match just on 'b'--we cannot
392         // change the path to an empty string, or remove the node entirely.
393         if (expression->matchType() == MatchExpression::ELEM_MATCH_VALUE ||
394             expression->matchType() == MatchExpression::ELEM_MATCH_OBJECT) {
395             isMatchOnlyOnAs = false;
396         }
397         if (expression->numChildren() == 0) {
398             // 'expression' is a leaf node; examine the path. It is important that 'outputPath'
399             // not equal 'path', because we cannot change the expression {b: {$eq: 3}}, where
400             // 'path' is 'b', to be a match on a subfield, since no subfield exists.
401             isMatchOnlyOnAs = isMatchOnlyOnAs && expression::isPathPrefixOf(outputPath, path);
402         }
403     };
404 
405     expression::mapOver(nextMatch->getMatchExpression(), computeWhetherMatchOnAs);
406 
407     if (!isMatchOnlyOnAs) {
408         // "nextMatch" does not contain any predicates that can be absorbed into this stage.
409         return std::next(itr);
410     }
411 
412     // We can internalize the $match.
413     if (!_matchSrc) {
414         _matchSrc = nextMatch;
415     } else {
416         // We have already absorbed a $match. We need to join it with 'dependent'.
417         _matchSrc->joinMatchWith(nextMatch);
418     }
419 
420     // Remove the original $match. There may be further optimization between this $lookup and the
421     // new neighbor, so we return an iterator pointing to ourself.
422     container->erase(std::next(itr));
423 
424     // We have internalized a $match, but have not yet computed the descended $match that should
425     // be applied to our queries.
426     _additionalFilter = DocumentSourceMatch::descendMatchOnPath(
427                             _matchSrc->getMatchExpression(), _as.fullPath(), pExpCtx)
428                             ->getQuery()
429                             .getOwned();
430 
431     if (wasConstructedWithPipelineSyntax()) {
432         auto matchObj = BSON("$match" << *_additionalFilter);
433         _resolvedPipeline.push_back(matchObj);
434     }
435 
436     return itr;
437 }
438 
getUserPipelineDefinition()439 std::string DocumentSourceLookUp::getUserPipelineDefinition() {
440     if (wasConstructedWithPipelineSyntax()) {
441         return pipelineToString(_userPipeline);
442     }
443 
444     return _resolvedPipeline.back().toString();
445 }
446 
doDispose()447 void DocumentSourceLookUp::doDispose() {
448     if (_pipeline) {
449         _pipeline->dispose(pExpCtx->opCtx);
450         _pipeline.reset();
451     }
452 }
453 
makeMatchStageFromInput(const Document & input,const FieldPath & localFieldPath,const std::string & foreignFieldName,const BSONObj & additionalFilter)454 BSONObj DocumentSourceLookUp::makeMatchStageFromInput(const Document& input,
455                                                       const FieldPath& localFieldPath,
456                                                       const std::string& foreignFieldName,
457                                                       const BSONObj& additionalFilter) {
458     // Add the 'localFieldPath' of 'input' into 'localFieldList'. If 'localFieldPath' references a
459     // field with an array in its path, we may need to join on multiple values, so we add each
460     // element to 'localFieldList'.
461     BSONArrayBuilder arrBuilder;
462     bool containsRegex = false;
463     document_path_support::visitAllValuesAtPath(input, localFieldPath, [&](const Value& nextValue) {
464         arrBuilder << nextValue;
465         if (!containsRegex && nextValue.getType() == BSONType::RegEx) {
466             containsRegex = true;
467         }
468     });
469 
470     if (arrBuilder.arrSize() == 0) {
471         // Missing values are treated as null.
472         arrBuilder << BSONNULL;
473     }
474 
475     const auto localFieldListSize = arrBuilder.arrSize();
476     const auto localFieldList = arrBuilder.arr();
477 
478     // We construct a query of one of the following forms, depending on the contents of
479     // 'localFieldList'.
480     //
481     //   {$and: [{<foreignFieldName>: {$eq: <localFieldList[0]>}}, <additionalFilter>]}
482     //     if 'localFieldList' contains a single element.
483     //
484     //   {$and: [{<foreignFieldName>: {$in: [<value>, <value>, ...]}}, <additionalFilter>]}
485     //     if 'localFieldList' contains more than one element but doesn't contain any that are
486     //     regular expressions.
487     //
488     //   {$and: [{$or: [{<foreignFieldName>: {$eq: <value>}},
489     //                  {<foreignFieldName>: {$eq: <value>}}, ...]},
490     //           <additionalFilter>]}
491     //     if 'localFieldList' contains more than one element and it contains at least one element
492     //     that is a regular expression.
493 
494     // We wrap the query in a $match so that it can be parsed into a DocumentSourceMatch when
495     // constructing a pipeline to execute.
496     BSONObjBuilder match;
497     BSONObjBuilder query(match.subobjStart("$match"));
498 
499     BSONArrayBuilder andObj(query.subarrayStart("$and"));
500     BSONObjBuilder joiningObj(andObj.subobjStart());
501 
502     if (localFieldListSize > 1) {
503         // A $lookup on an array value corresponds to finding documents in the foreign collection
504         // that have a value of any of the elements in the array value, rather than finding
505         // documents that have a value equal to the entire array value. These semantics are
506         // automatically provided to us by using the $in query operator.
507         if (containsRegex) {
508             // A regular expression inside the $in query operator will perform pattern matching on
509             // any string values. Since we want regular expressions to only match other RegEx types,
510             // we write the query as a $or of equality comparisons instead.
511             BSONObj orQuery = buildEqualityOrQuery(foreignFieldName, localFieldList);
512             joiningObj.appendElements(orQuery);
513         } else {
514             // { <foreignFieldName> : { "$in" : <localFieldList> } }
515             BSONObjBuilder subObj(joiningObj.subobjStart(foreignFieldName));
516             subObj << "$in" << localFieldList;
517             subObj.doneFast();
518         }
519     } else {
520         // { <foreignFieldName> : { "$eq" : <localFieldList[0]> } }
521         BSONObjBuilder subObj(joiningObj.subobjStart(foreignFieldName));
522         subObj << "$eq" << localFieldList[0];
523         subObj.doneFast();
524     }
525 
526     joiningObj.doneFast();
527 
528     BSONObjBuilder additionalFilterObj(andObj.subobjStart());
529     additionalFilterObj.appendElements(additionalFilter);
530     additionalFilterObj.doneFast();
531 
532     andObj.doneFast();
533 
534     query.doneFast();
535     return match.obj();
536 }
537 
unwindResult()538 DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
539     const boost::optional<FieldPath> indexPath(_unwindSrc->indexPath());
540 
541     // Loop until we get a document that has at least one match.
542     // Note we may return early from this loop if our source stage is exhausted or if the unwind
543     // source was asked to return empty arrays and we get a document without a match.
544     while (!_pipeline || !_nextValue) {
545         auto nextInput = pSource->getNext();
546         if (!nextInput.isAdvanced()) {
547             return nextInput;
548         }
549 
550         _input = nextInput.releaseDocument();
551 
552         if (!wasConstructedWithPipelineSyntax()) {
553             BSONObj filter = _additionalFilter.value_or(BSONObj());
554             auto matchStage =
555                 makeMatchStageFromInput(*_input, *_localField, _foreignField->fullPath(), filter);
556             // We've already allocated space for the trailing $match stage in '_resolvedPipeline'.
557             _resolvedPipeline.back() = matchStage;
558         }
559 
560         if (_pipeline) {
561             _pipeline->dispose(pExpCtx->opCtx);
562         }
563 
564         _pipeline = buildPipeline(*_input);
565 
566         // The $lookup stage takes responsibility for disposing of its Pipeline, since it will
567         // potentially be used by multiple OperationContexts, and the $lookup stage is part of an
568         // outer Pipeline that will propagate dispose() calls before being destroyed.
569         _pipeline.get_deleter().dismissDisposal();
570 
571         _cursorIndex = 0;
572         _nextValue = _pipeline->getNext();
573 
574         if (_unwindSrc->preserveNullAndEmptyArrays() && !_nextValue) {
575             // There were no results for this cursor, but the $unwind was asked to preserve empty
576             // arrays, so we should return a document without the array.
577             MutableDocument output(std::move(*_input));
578             // Note this will correctly create objects in the prefix of '_as', to act as if we had
579             // created an empty array and then removed it.
580             output.setNestedField(_as, Value());
581             if (indexPath) {
582                 output.setNestedField(*indexPath, Value(BSONNULL));
583             }
584             return output.freeze();
585         }
586     }
587 
588     invariant(bool(_input) && bool(_nextValue));
589     auto currentValue = *_nextValue;
590     _nextValue = _pipeline->getNext();
591 
592     // Move input document into output if this is the last or only result, otherwise perform a copy.
593     MutableDocument output(_nextValue ? *_input : std::move(*_input));
594     output.setNestedField(_as, Value(currentValue));
595 
596     if (indexPath) {
597         output.setNestedField(*indexPath, Value(_cursorIndex));
598     }
599 
600     ++_cursorIndex;
601     return output.freeze();
602 }
603 
copyVariablesToExpCtx(const Variables & vars,const VariablesParseState & vps,ExpressionContext * expCtx)604 void DocumentSourceLookUp::copyVariablesToExpCtx(const Variables& vars,
605                                                  const VariablesParseState& vps,
606                                                  ExpressionContext* expCtx) {
607     expCtx->variables = vars;
608     expCtx->variablesParseState = vps.copyWith(expCtx->variables.useIdGenerator());
609 }
610 
resolveLetVariables(const Document & localDoc,Variables * variables)611 void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variables* variables) {
612     invariant(variables);
613 
614     for (auto& letVar : _letVariables) {
615         auto value = letVar.expression->evaluate(localDoc, &pExpCtx->variables);
616         variables->setConstantValue(letVar.id, value);
617     }
618 }
619 
initializeIntrospectionPipeline()620 void DocumentSourceLookUp::initializeIntrospectionPipeline() {
621     copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
622     _parsedIntrospectionPipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
623 
624     auto& sources = _parsedIntrospectionPipeline->getSources();
625 
626     // Ensure that the pipeline does not contain a $changeStream stage. This check will be
627     // performed recursively on all sub-pipelines.
628     uassert(ErrorCodes::IllegalOperation,
629             "$changeStream is not allowed within a $lookup foreign pipeline",
630             sources.empty() || !sources.front()->constraints().isChangeStreamStage());
631 }
632 
serializeToArray(std::vector<Value> & array,boost::optional<ExplainOptions::Verbosity> explain) const633 void DocumentSourceLookUp::serializeToArray(
634     std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
635     Document doc;
636     if (wasConstructedWithPipelineSyntax()) {
637         MutableDocument exprList;
638         for (auto letVar : _letVariables) {
639             exprList.addField(letVar.name,
640                               letVar.expression->serialize(static_cast<bool>(explain)));
641         }
642 
643         auto pipeline = _userPipeline;
644         if (_additionalFilter) {
645             pipeline.push_back(BSON("$match" << *_additionalFilter));
646         }
647 
648         doc = Document{{getSourceName(),
649                         Document{{"from", _fromNs.coll()},
650                                  {"as", _as.fullPath()},
651                                  {"let", exprList.freeze()},
652                                  {"pipeline", pipeline}}}};
653     } else {
654         doc = Document{{getSourceName(),
655                         {Document{{"from", _fromNs.coll()},
656                                   {"as", _as.fullPath()},
657                                   {"localField", _localField->fullPath()},
658                                   {"foreignField", _foreignField->fullPath()}}}}};
659     }
660 
661     MutableDocument output(doc);
662     if (explain) {
663         if (_unwindSrc) {
664             const boost::optional<FieldPath> indexPath = _unwindSrc->indexPath();
665             output[getSourceName()]["unwinding"] =
666                 Value(DOC("preserveNullAndEmptyArrays"
667                           << _unwindSrc->preserveNullAndEmptyArrays()
668                           << "includeArrayIndex"
669                           << (indexPath ? Value(indexPath->fullPath()) : Value())));
670         }
671 
672         // Only add _matchSrc for explain when $lookup was constructed with localField/foreignField
673         // syntax. For pipeline sytax, _matchSrc will be included as part of the pipeline
674         // definition.
675         if (!wasConstructedWithPipelineSyntax() && _additionalFilter) {
676             // Our output does not have to be parseable, so include a "matching" field with the
677             // descended match expression.
678             output[getSourceName()]["matching"] = Value(*_additionalFilter);
679         }
680 
681         array.push_back(Value(output.freeze()));
682     } else {
683         array.push_back(Value(output.freeze()));
684 
685         if (_unwindSrc) {
686             _unwindSrc->serializeToArray(array);
687         }
688 
689         if (!wasConstructedWithPipelineSyntax() && _matchSrc) {
690             // '_matchSrc' tracks the originally specified $match. We descend upon the $match in the
691             // first call to getNext(), at which point we are confident that we no longer need to
692             // serialize the $lookup again.
693             _matchSrc->serializeToArray(array);
694         }
695     }
696 }
697 
getDependencies(DepsTracker * deps) const698 DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
699     if (wasConstructedWithPipelineSyntax()) {
700         // We will use the introspection pipeline which we prebuilt during construction.
701         invariant(_parsedIntrospectionPipeline);
702 
703         DepsTracker subDeps(deps->getMetadataAvailable());
704 
705         // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables
706         // declared by this $lookup and variables declared externally.
707         for (auto&& source : _parsedIntrospectionPipeline->getSources()) {
708             source->getDependencies(&subDeps);
709         }
710 
711         // Add the 'let' dependencies to the tracker. Because the caller is only interested in
712         // references to external variables, filter out any subpipeline references to 'let'
713         // variables declared by this $lookup.
714         for (auto&& letVar : _letVariables) {
715             letVar.expression->addDependencies(deps);
716             subDeps.vars.erase(letVar.id);
717         }
718 
719         // Add sub-pipeline variable dependencies. Do not add field dependencies, since these refer
720         // to the fields from the foreign collection rather than the local collection.
721         deps->vars.insert(subDeps.vars.begin(), subDeps.vars.end());
722     } else {
723         deps->fields.insert(_localField->fullPath());
724     }
725     return SEE_NEXT;
726 }
727 
doDetachFromOperationContext()728 void DocumentSourceLookUp::doDetachFromOperationContext() {
729     if (_pipeline) {
730         // We have a pipeline we're going to be executing across multiple calls to getNext(), so we
731         // use Pipeline::detachFromOperationContext() to take care of updating '_fromExpCtx->opCtx'.
732         _pipeline->detachFromOperationContext();
733         invariant(_fromExpCtx->opCtx == nullptr);
734     } else if (_fromExpCtx) {
735         _fromExpCtx->opCtx = nullptr;
736     }
737 }
738 
doReattachToOperationContext(OperationContext * opCtx)739 void DocumentSourceLookUp::doReattachToOperationContext(OperationContext* opCtx) {
740     if (_pipeline) {
741         // We have a pipeline we're going to be executing across multiple calls to getNext(), so we
742         // use Pipeline::reattachToOperationContext() to take care of updating '_fromExpCtx->opCtx'.
743         _pipeline->reattachToOperationContext(opCtx);
744         invariant(_fromExpCtx->opCtx == opCtx);
745     } else if (_fromExpCtx) {
746         _fromExpCtx->opCtx = opCtx;
747     }
748 }
749 
createFromBson(BSONElement elem,const boost::intrusive_ptr<ExpressionContext> & pExpCtx)750 intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
751     BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
752     uassert(ErrorCodes::FailedToParse,
753             "the $lookup specification must be an Object",
754             elem.type() == BSONType::Object);
755 
756     NamespaceString fromNs;
757     std::string as;
758 
759     std::string localField;
760     std::string foreignField;
761 
762     BSONObj letVariables;
763     std::vector<BSONObj> pipeline;
764     bool hasPipeline = false;
765     bool hasLet = false;
766 
767     for (auto&& argument : elem.Obj()) {
768         const auto argName = argument.fieldNameStringData();
769 
770         if (argName == "pipeline") {
771             auto result = AggregationRequest::parsePipelineFromBSON(argument);
772             if (!result.isOK()) {
773                 uasserted(ErrorCodes::FailedToParse,
774                           str::stream() << "invalid $lookup pipeline definition: "
775                                         << result.getStatus().toString());
776             }
777             pipeline = std::move(result.getValue());
778             hasPipeline = true;
779             continue;
780         }
781 
782         if (argName == "let") {
783             uassert(ErrorCodes::FailedToParse,
784                     str::stream() << "$lookup argument '" << argument
785                                   << "' must be an object, is type "
786                                   << argument.type(),
787                     argument.type() == BSONType::Object);
788             letVariables = argument.Obj();
789             hasLet = true;
790             continue;
791         }
792 
793         uassert(ErrorCodes::FailedToParse,
794                 str::stream() << "$lookup argument '" << argument << "' must be a string, is type "
795                               << argument.type(),
796                 argument.type() == BSONType::String);
797 
798         if (argName == "from") {
799             fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String());
800         } else if (argName == "as") {
801             as = argument.String();
802         } else if (argName == "localField") {
803             localField = argument.String();
804         } else if (argName == "foreignField") {
805             foreignField = argument.String();
806         } else {
807             uasserted(ErrorCodes::FailedToParse,
808                       str::stream() << "unknown argument to $lookup: " << argument.fieldName());
809         }
810     }
811 
812     uassert(
813         ErrorCodes::FailedToParse, "must specify 'from' field for a $lookup", !fromNs.ns().empty());
814     uassert(ErrorCodes::FailedToParse, "must specify 'as' field for a $lookup", !as.empty());
815 
816     if (hasPipeline) {
817         uassert(ErrorCodes::FailedToParse,
818                 "$lookup with 'pipeline' may not specify 'localField' or 'foreignField'",
819                 localField.empty() && foreignField.empty());
820 
821         return new DocumentSourceLookUp(std::move(fromNs),
822                                         std::move(as),
823                                         std::move(pipeline),
824                                         std::move(letVariables),
825                                         pExpCtx);
826     } else {
827         uassert(ErrorCodes::FailedToParse,
828                 "$lookup requires either 'pipeline' or both 'localField' and 'foreignField' to be "
829                 "specified",
830                 !localField.empty() && !foreignField.empty());
831         uassert(ErrorCodes::FailedToParse,
832                 "$lookup with a 'let' argument must also specify 'pipeline'",
833                 !hasLet);
834 
835         return new DocumentSourceLookUp(std::move(fromNs),
836                                         std::move(as),
837                                         std::move(localField),
838                                         std::move(foreignField),
839                                         pExpCtx);
840     }
841 }
842 }  // namespace mongo
843