1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/db/pipeline/document_source_lookup.h"
34
35 #include "mongo/base/init.h"
36 #include "mongo/db/jsobj.h"
37 #include "mongo/db/matcher/expression_algo.h"
38 #include "mongo/db/pipeline/document.h"
39 #include "mongo/db/pipeline/document_path_support.h"
40 #include "mongo/db/pipeline/expression.h"
41 #include "mongo/db/pipeline/expression_context.h"
42 #include "mongo/db/pipeline/value.h"
43 #include "mongo/db/query/query_knobs.h"
44 #include "mongo/stdx/memory.h"
45
46 namespace mongo {
47
48 using boost::intrusive_ptr;
49 using std::vector;
50
51 namespace {
pipelineToString(const vector<BSONObj> & pipeline)52 std::string pipelineToString(const vector<BSONObj>& pipeline) {
53 StringBuilder sb;
54 sb << "[";
55
56 auto first = true;
57 for (auto& stageSpec : pipeline) {
58 if (!first) {
59 sb << ", ";
60 } else {
61 first = false;
62 }
63 sb << stageSpec;
64 }
65 sb << "]";
66 return sb.str();
67 }
68 } // namespace
69
70 constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth;
71
DocumentSourceLookUp(NamespaceString fromNs,std::string as,const boost::intrusive_ptr<ExpressionContext> & pExpCtx)72 DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
73 std::string as,
74 const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
75 : DocumentSourceNeedsMongoProcessInterface(pExpCtx),
76 _fromNs(std::move(fromNs)),
77 _as(std::move(as)),
78 _variables(pExpCtx->variables),
79 _variablesParseState(pExpCtx->variablesParseState.copyWith(_variables.useIdGenerator())) {
80 const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_fromNs);
81 _resolvedNs = resolvedNamespace.ns;
82 _resolvedPipeline = resolvedNamespace.pipeline;
83
84 // We always set an explicit collator on the copied expression context, even if the collator is
85 // null (i.e. the simple collation). Otherwise, in the situation where the user did not specify
86 // a collation and the simple collation was inherited from the local collection, the execution
87 // machinery will incorrectly interpret the null collator and empty user collation as an
88 // indication that it should inherit the foreign collection's collation.
89 _fromExpCtx =
90 pExpCtx->copyWith(_resolvedNs,
91 boost::none,
92 pExpCtx->getCollator() ? pExpCtx->getCollator()->clone() : nullptr);
93
94 _fromExpCtx->subPipelineDepth += 1;
95 uassert(ErrorCodes::MaxSubPipelineDepthExceeded,
96 str::stream() << "Maximum number of nested $lookup sub-pipelines exceeded. Limit is "
97 << kMaxSubPipelineDepth,
98 _fromExpCtx->subPipelineDepth <= kMaxSubPipelineDepth);
99 }
100
DocumentSourceLookUp(NamespaceString fromNs,std::string as,std::string localField,std::string foreignField,const boost::intrusive_ptr<ExpressionContext> & pExpCtx)101 DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
102 std::string as,
103 std::string localField,
104 std::string foreignField,
105 const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
106 : DocumentSourceLookUp(fromNs, as, pExpCtx) {
107 _localField = std::move(localField);
108 _foreignField = std::move(foreignField);
109 // We append an additional BSONObj to '_resolvedPipeline' as a placeholder for the $match stage
110 // we'll eventually construct from the input document.
111 _resolvedPipeline.reserve(_resolvedPipeline.size() + 1);
112 _resolvedPipeline.push_back(BSONObj());
113 }
114
DocumentSourceLookUp(NamespaceString fromNs,std::string as,std::vector<BSONObj> pipeline,BSONObj letVariables,const boost::intrusive_ptr<ExpressionContext> & pExpCtx)115 DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
116 std::string as,
117 std::vector<BSONObj> pipeline,
118 BSONObj letVariables,
119 const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
120 : DocumentSourceLookUp(fromNs, as, pExpCtx) {
121 // '_resolvedPipeline' will first be initialized by the constructor delegated to within this
122 // constructor's initializer list. It will be populated with view pipeline prefix if 'fromNs'
123 // represents a view. We append the user 'pipeline' to the end of '_resolvedPipeline' to ensure
124 // any view prefix is not overwritten.
125 _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end());
126
127 _userPipeline = std::move(pipeline);
128
129 _cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load());
130
131 for (auto&& varElem : letVariables) {
132 const auto varName = varElem.fieldNameStringData();
133 Variables::uassertValidNameForUserWrite(varName);
134
135 _letVariables.emplace_back(
136 varName.toString(),
137 Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState),
138 _variablesParseState.defineVariable(varName));
139 }
140
141 initializeIntrospectionPipeline();
142 }
143
parse(const AggregationRequest & request,const BSONElement & spec)144 std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse(
145 const AggregationRequest& request, const BSONElement& spec) {
146 uassert(ErrorCodes::FailedToParse,
147 str::stream() << "the $lookup stage specification must be an object, but found "
148 << typeName(spec.type()),
149 spec.type() == BSONType::Object);
150
151 auto specObj = spec.Obj();
152 auto fromElement = specObj["from"];
153 uassert(ErrorCodes::FailedToParse,
154 str::stream() << "missing 'from' option to $lookup stage specification: " << specObj,
155 fromElement);
156 uassert(ErrorCodes::FailedToParse,
157 str::stream() << "'from' option to $lookup must be a string, but was type "
158 << typeName(specObj["from"].type()),
159 fromElement.type() == BSONType::String);
160
161 NamespaceString fromNss(request.getNamespaceString().db(), fromElement.valueStringData());
162 uassert(ErrorCodes::InvalidNamespace,
163 str::stream() << "invalid $lookup namespace: " << fromNss.ns(),
164 fromNss.isValid());
165
166 stdx::unordered_set<NamespaceString> foreignNssSet;
167
168 // Recursively lite parse the nested pipeline, if one exists.
169 auto pipelineElem = specObj["pipeline"];
170 boost::optional<LiteParsedPipeline> liteParsedPipeline;
171 if (pipelineElem) {
172 auto pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(pipelineElem));
173 AggregationRequest foreignAggReq(fromNss, std::move(pipeline));
174 liteParsedPipeline = LiteParsedPipeline(foreignAggReq);
175
176 auto pipelineInvolvedNamespaces = liteParsedPipeline->getInvolvedNamespaces();
177 foreignNssSet.insert(pipelineInvolvedNamespaces.begin(), pipelineInvolvedNamespaces.end());
178 }
179
180 foreignNssSet.insert(fromNss);
181
182 return stdx::make_unique<DocumentSourceLookUp::LiteParsed>(
183 std::move(fromNss), std::move(foreignNssSet), std::move(liteParsedPipeline));
184 }
185
186 REGISTER_DOCUMENT_SOURCE(lookup,
187 DocumentSourceLookUp::LiteParsed::parse,
188 DocumentSourceLookUp::createFromBson);
189
getSourceName() const190 const char* DocumentSourceLookUp::getSourceName() const {
191 return "$lookup";
192 }
193
194 namespace {
195
196 /**
197 * Constructs a query of the following shape:
198 * {$or: [
199 * {'fieldName': {$eq: 'values[0]'}},
200 * {'fieldName': {$eq: 'values[1]'}},
201 * ...
202 * ]}
203 */
buildEqualityOrQuery(const std::string & fieldName,const BSONArray & values)204 BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& values) {
205 BSONObjBuilder orBuilder;
206 {
207 BSONArrayBuilder orPredicatesBuilder(orBuilder.subarrayStart("$or"));
208 for (auto&& value : values) {
209 orPredicatesBuilder.append(BSON(fieldName << BSON("$eq" << value)));
210 }
211 }
212 return orBuilder.obj();
213 }
214
215 } // namespace
216
getNext()217 DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
218 pExpCtx->checkForInterrupt();
219
220 if (_unwindSrc) {
221 return unwindResult();
222 }
223
224 auto nextInput = pSource->getNext();
225 if (!nextInput.isAdvanced()) {
226 return nextInput;
227 }
228
229 auto inputDoc = nextInput.releaseDocument();
230
231 // If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind,
232 // '_unwindSrc' would be non-null, and we would not have made it here.
233 invariant(!_matchSrc);
234
235 if (!wasConstructedWithPipelineSyntax()) {
236 auto matchStage =
237 makeMatchStageFromInput(inputDoc, *_localField, _foreignField->fullPath(), BSONObj());
238 // We've already allocated space for the trailing $match stage in '_resolvedPipeline'.
239 _resolvedPipeline.back() = matchStage;
240 }
241
242 auto pipeline = buildPipeline(inputDoc);
243
244 std::vector<Value> results;
245 int objsize = 0;
246
247 const auto maxBytes = internalLookupStageIntermediateDocumentMaxSizeBytes.load();
248 while (auto result = pipeline->getNext()) {
249 objsize += result->getApproximateSize();
250 uassert(4568,
251 str::stream() << "Total size of documents in " << _fromNs.coll()
252 << " matching pipeline's $lookup stage exceeds "
253 << maxBytes
254 << " bytes",
255
256 objsize <= maxBytes);
257 results.emplace_back(std::move(*result));
258 }
259
260 MutableDocument output(std::move(inputDoc));
261 output.setNestedField(_as, Value(std::move(results)));
262 return output.freeze();
263 }
264
buildPipeline(const Document & inputDoc)265 std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline(
266 const Document& inputDoc) {
267 // Copy all 'let' variables into the foreign pipeline's expression context.
268 copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
269
270 // Resolve the 'let' variables to values per the given input document.
271 resolveLetVariables(inputDoc, &_fromExpCtx->variables);
272
273 // If we don't have a cache, build and return the pipeline immediately.
274 if (!_cache || _cache->isAbandoned()) {
275 return uassertStatusOK(
276 _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx));
277 }
278
279 // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
280 // cursor source. If the cache is present and serving, then we will not be adding a cursor
281 // source later, so inject a MongoProcessInterface into all stages that need one.
282 MongoProcessInterface::MakePipelineOptions pipelineOpts;
283
284 pipelineOpts.optimize = false;
285 pipelineOpts.attachCursorSource = false;
286 pipelineOpts.forceInjectMongoProcessInterface = _cache->isServing();
287
288 // Construct the basic pipeline without a cache stage.
289 auto pipeline = uassertStatusOK(
290 _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
291
292 // Add the cache stage at the end and optimize. During the optimization process, the cache will
293 // either move itself to the correct position in the pipeline, or will abandon itself if no
294 // suitable cache position exists.
295 pipeline->addFinalSource(
296 DocumentSourceSequentialDocumentCache::create(_fromExpCtx, _cache.get_ptr()));
297
298 pipeline->optimizePipeline();
299
300 if (!_cache->isServing()) {
301 // The cache has either been abandoned or has not yet been built. Attach a cursor.
302 uassertStatusOK(
303 _mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get()));
304 }
305
306 // If the cache has been abandoned, release it.
307 if (_cache->isAbandoned()) {
308 _cache.reset();
309 }
310
311 return pipeline;
312 }
313
getModifiedPaths() const314 DocumentSource::GetModPathsReturn DocumentSourceLookUp::getModifiedPaths() const {
315 std::set<std::string> modifiedPaths{_as.fullPath()};
316 if (_unwindSrc) {
317 auto pathsModifiedByUnwind = _unwindSrc->getModifiedPaths();
318 invariant(pathsModifiedByUnwind.type == GetModPathsReturn::Type::kFiniteSet);
319 modifiedPaths.insert(pathsModifiedByUnwind.paths.begin(),
320 pathsModifiedByUnwind.paths.end());
321 }
322 return {GetModPathsReturn::Type::kFiniteSet, std::move(modifiedPaths), {}};
323 }
324
doOptimizeAt(Pipeline::SourceContainer::iterator itr,Pipeline::SourceContainer * container)325 Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
326 Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
327 invariant(*itr == this);
328
329 auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get());
330
331 // If we are not already handling an $unwind stage internally, we can combine with the
332 // following $unwind stage.
333 if (nextUnwind && !_unwindSrc && nextUnwind->getUnwindPath() == _as.fullPath()) {
334 _unwindSrc = std::move(nextUnwind);
335 container->erase(std::next(itr));
336 return itr;
337 }
338
339 // Attempt to internalize any predicates of a $match upon the "_as" field.
340 auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
341
342 if (!nextMatch) {
343 return std::next(itr);
344 }
345
346 if (!_unwindSrc || _unwindSrc->indexPath() || _unwindSrc->preserveNullAndEmptyArrays()) {
347 // We must be unwinding our result to internalize a $match. For example, consider the
348 // following pipeline:
349 //
350 // Input: {_id: 0}
351 // Foreign Collection: {a: 0, b: 0}, {a: 0, b: 5}
352 // Pipeline:
353 // {$lookup: {localField: "_id", foreignField: "a", as: "foo"}}
354 // {$match: {'foo.b': {$gt: 0}}}
355 // Output: {_id: 0, foo: [{a: 0, b: 0}, {a: 0, b: 5}]}
356 //
357 // If we executed {b: {$gt: 0}} as part of our $lookup, our output would instead be:
358 // {_id: 0, foo: [{a: 0, b: 5}]}
359 //
360 // However, if we are already unwinding 'foo', then we can move the $match inside, since it
361 // will have the same effect as filtering the unwound results, that is, the output will be:
362 // {_id: 0, foo: {a: 0, b: 5}}
363 //
364 // Note that we cannot absorb a $match if the absorbed $unwind has
365 // "preserveNullAndEmptyArrays" set to true, for the following reason: A document that had
366 // an empty output array from $lookup would be preserved by the $unwind, but could be
367 // removed by the $match. However, if we absorb the $match into the $lookup, our joined
368 // query inside the $lookup will output an empty array, which $unwind will then preserve.
369 // Thus, depending on the optimization, the user would see a different output.
370 //
371 // In addition, we must avoid internalizing a $match if an absorbed $unwind has an
372 // "includeArrayIndex" option, since the $match will alter the indices of the returned
373 // values.
374 return std::next(itr);
375 }
376
377 auto outputPath = _as.fullPath();
378
379 // Since $match splitting is handled in a generic way, we expect to have already swapped
380 // portions of the $match that do not depend on the 'as' path or on an internalized $unwind's
381 // index path before ourselves. But due to the early return above, we know there is no
382 // internalized $unwind with an index path.
383 //
384 // Therefore, 'nextMatch' should only depend on the 'as' path. We now try to absorb the match on
385 // the 'as' path in order to push down these predicates into the foreign collection.
386 bool isMatchOnlyOnAs = true;
387 auto computeWhetherMatchOnAs = [&isMatchOnlyOnAs, &outputPath](MatchExpression* expression,
388 std::string path) -> void {
389 // If 'expression' is the child of a $elemMatch, we cannot internalize the $match. For
390 // example, {b: {$elemMatch: {$gt: 1, $lt: 4}}}, where "b" is our "_as" field. This is
391 // because there's no way to modify the expression to be a match just on 'b'--we cannot
392 // change the path to an empty string, or remove the node entirely.
393 if (expression->matchType() == MatchExpression::ELEM_MATCH_VALUE ||
394 expression->matchType() == MatchExpression::ELEM_MATCH_OBJECT) {
395 isMatchOnlyOnAs = false;
396 }
397 if (expression->numChildren() == 0) {
398 // 'expression' is a leaf node; examine the path. It is important that 'outputPath'
399 // not equal 'path', because we cannot change the expression {b: {$eq: 3}}, where
400 // 'path' is 'b', to be a match on a subfield, since no subfield exists.
401 isMatchOnlyOnAs = isMatchOnlyOnAs && expression::isPathPrefixOf(outputPath, path);
402 }
403 };
404
405 expression::mapOver(nextMatch->getMatchExpression(), computeWhetherMatchOnAs);
406
407 if (!isMatchOnlyOnAs) {
408 // "nextMatch" does not contain any predicates that can be absorbed into this stage.
409 return std::next(itr);
410 }
411
412 // We can internalize the $match.
413 if (!_matchSrc) {
414 _matchSrc = nextMatch;
415 } else {
416 // We have already absorbed a $match. We need to join it with 'dependent'.
417 _matchSrc->joinMatchWith(nextMatch);
418 }
419
420 // Remove the original $match. There may be further optimization between this $lookup and the
421 // new neighbor, so we return an iterator pointing to ourself.
422 container->erase(std::next(itr));
423
424 // We have internalized a $match, but have not yet computed the descended $match that should
425 // be applied to our queries.
426 _additionalFilter = DocumentSourceMatch::descendMatchOnPath(
427 _matchSrc->getMatchExpression(), _as.fullPath(), pExpCtx)
428 ->getQuery()
429 .getOwned();
430
431 if (wasConstructedWithPipelineSyntax()) {
432 auto matchObj = BSON("$match" << *_additionalFilter);
433 _resolvedPipeline.push_back(matchObj);
434 }
435
436 return itr;
437 }
438
getUserPipelineDefinition()439 std::string DocumentSourceLookUp::getUserPipelineDefinition() {
440 if (wasConstructedWithPipelineSyntax()) {
441 return pipelineToString(_userPipeline);
442 }
443
444 return _resolvedPipeline.back().toString();
445 }
446
doDispose()447 void DocumentSourceLookUp::doDispose() {
448 if (_pipeline) {
449 _pipeline->dispose(pExpCtx->opCtx);
450 _pipeline.reset();
451 }
452 }
453
makeMatchStageFromInput(const Document & input,const FieldPath & localFieldPath,const std::string & foreignFieldName,const BSONObj & additionalFilter)454 BSONObj DocumentSourceLookUp::makeMatchStageFromInput(const Document& input,
455 const FieldPath& localFieldPath,
456 const std::string& foreignFieldName,
457 const BSONObj& additionalFilter) {
458 // Add the 'localFieldPath' of 'input' into 'localFieldList'. If 'localFieldPath' references a
459 // field with an array in its path, we may need to join on multiple values, so we add each
460 // element to 'localFieldList'.
461 BSONArrayBuilder arrBuilder;
462 bool containsRegex = false;
463 document_path_support::visitAllValuesAtPath(input, localFieldPath, [&](const Value& nextValue) {
464 arrBuilder << nextValue;
465 if (!containsRegex && nextValue.getType() == BSONType::RegEx) {
466 containsRegex = true;
467 }
468 });
469
470 if (arrBuilder.arrSize() == 0) {
471 // Missing values are treated as null.
472 arrBuilder << BSONNULL;
473 }
474
475 const auto localFieldListSize = arrBuilder.arrSize();
476 const auto localFieldList = arrBuilder.arr();
477
478 // We construct a query of one of the following forms, depending on the contents of
479 // 'localFieldList'.
480 //
481 // {$and: [{<foreignFieldName>: {$eq: <localFieldList[0]>}}, <additionalFilter>]}
482 // if 'localFieldList' contains a single element.
483 //
484 // {$and: [{<foreignFieldName>: {$in: [<value>, <value>, ...]}}, <additionalFilter>]}
485 // if 'localFieldList' contains more than one element but doesn't contain any that are
486 // regular expressions.
487 //
488 // {$and: [{$or: [{<foreignFieldName>: {$eq: <value>}},
489 // {<foreignFieldName>: {$eq: <value>}}, ...]},
490 // <additionalFilter>]}
491 // if 'localFieldList' contains more than one element and it contains at least one element
492 // that is a regular expression.
493
494 // We wrap the query in a $match so that it can be parsed into a DocumentSourceMatch when
495 // constructing a pipeline to execute.
496 BSONObjBuilder match;
497 BSONObjBuilder query(match.subobjStart("$match"));
498
499 BSONArrayBuilder andObj(query.subarrayStart("$and"));
500 BSONObjBuilder joiningObj(andObj.subobjStart());
501
502 if (localFieldListSize > 1) {
503 // A $lookup on an array value corresponds to finding documents in the foreign collection
504 // that have a value of any of the elements in the array value, rather than finding
505 // documents that have a value equal to the entire array value. These semantics are
506 // automatically provided to us by using the $in query operator.
507 if (containsRegex) {
508 // A regular expression inside the $in query operator will perform pattern matching on
509 // any string values. Since we want regular expressions to only match other RegEx types,
510 // we write the query as a $or of equality comparisons instead.
511 BSONObj orQuery = buildEqualityOrQuery(foreignFieldName, localFieldList);
512 joiningObj.appendElements(orQuery);
513 } else {
514 // { <foreignFieldName> : { "$in" : <localFieldList> } }
515 BSONObjBuilder subObj(joiningObj.subobjStart(foreignFieldName));
516 subObj << "$in" << localFieldList;
517 subObj.doneFast();
518 }
519 } else {
520 // { <foreignFieldName> : { "$eq" : <localFieldList[0]> } }
521 BSONObjBuilder subObj(joiningObj.subobjStart(foreignFieldName));
522 subObj << "$eq" << localFieldList[0];
523 subObj.doneFast();
524 }
525
526 joiningObj.doneFast();
527
528 BSONObjBuilder additionalFilterObj(andObj.subobjStart());
529 additionalFilterObj.appendElements(additionalFilter);
530 additionalFilterObj.doneFast();
531
532 andObj.doneFast();
533
534 query.doneFast();
535 return match.obj();
536 }
537
unwindResult()538 DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
539 const boost::optional<FieldPath> indexPath(_unwindSrc->indexPath());
540
541 // Loop until we get a document that has at least one match.
542 // Note we may return early from this loop if our source stage is exhausted or if the unwind
543 // source was asked to return empty arrays and we get a document without a match.
544 while (!_pipeline || !_nextValue) {
545 auto nextInput = pSource->getNext();
546 if (!nextInput.isAdvanced()) {
547 return nextInput;
548 }
549
550 _input = nextInput.releaseDocument();
551
552 if (!wasConstructedWithPipelineSyntax()) {
553 BSONObj filter = _additionalFilter.value_or(BSONObj());
554 auto matchStage =
555 makeMatchStageFromInput(*_input, *_localField, _foreignField->fullPath(), filter);
556 // We've already allocated space for the trailing $match stage in '_resolvedPipeline'.
557 _resolvedPipeline.back() = matchStage;
558 }
559
560 if (_pipeline) {
561 _pipeline->dispose(pExpCtx->opCtx);
562 }
563
564 _pipeline = buildPipeline(*_input);
565
566 // The $lookup stage takes responsibility for disposing of its Pipeline, since it will
567 // potentially be used by multiple OperationContexts, and the $lookup stage is part of an
568 // outer Pipeline that will propagate dispose() calls before being destroyed.
569 _pipeline.get_deleter().dismissDisposal();
570
571 _cursorIndex = 0;
572 _nextValue = _pipeline->getNext();
573
574 if (_unwindSrc->preserveNullAndEmptyArrays() && !_nextValue) {
575 // There were no results for this cursor, but the $unwind was asked to preserve empty
576 // arrays, so we should return a document without the array.
577 MutableDocument output(std::move(*_input));
578 // Note this will correctly create objects in the prefix of '_as', to act as if we had
579 // created an empty array and then removed it.
580 output.setNestedField(_as, Value());
581 if (indexPath) {
582 output.setNestedField(*indexPath, Value(BSONNULL));
583 }
584 return output.freeze();
585 }
586 }
587
588 invariant(bool(_input) && bool(_nextValue));
589 auto currentValue = *_nextValue;
590 _nextValue = _pipeline->getNext();
591
592 // Move input document into output if this is the last or only result, otherwise perform a copy.
593 MutableDocument output(_nextValue ? *_input : std::move(*_input));
594 output.setNestedField(_as, Value(currentValue));
595
596 if (indexPath) {
597 output.setNestedField(*indexPath, Value(_cursorIndex));
598 }
599
600 ++_cursorIndex;
601 return output.freeze();
602 }
603
copyVariablesToExpCtx(const Variables & vars,const VariablesParseState & vps,ExpressionContext * expCtx)604 void DocumentSourceLookUp::copyVariablesToExpCtx(const Variables& vars,
605 const VariablesParseState& vps,
606 ExpressionContext* expCtx) {
607 expCtx->variables = vars;
608 expCtx->variablesParseState = vps.copyWith(expCtx->variables.useIdGenerator());
609 }
610
resolveLetVariables(const Document & localDoc,Variables * variables)611 void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variables* variables) {
612 invariant(variables);
613
614 for (auto& letVar : _letVariables) {
615 auto value = letVar.expression->evaluate(localDoc, &pExpCtx->variables);
616 variables->setConstantValue(letVar.id, value);
617 }
618 }
619
initializeIntrospectionPipeline()620 void DocumentSourceLookUp::initializeIntrospectionPipeline() {
621 copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
622 _parsedIntrospectionPipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
623
624 auto& sources = _parsedIntrospectionPipeline->getSources();
625
626 // Ensure that the pipeline does not contain a $changeStream stage. This check will be
627 // performed recursively on all sub-pipelines.
628 uassert(ErrorCodes::IllegalOperation,
629 "$changeStream is not allowed within a $lookup foreign pipeline",
630 sources.empty() || !sources.front()->constraints().isChangeStreamStage());
631 }
632
serializeToArray(std::vector<Value> & array,boost::optional<ExplainOptions::Verbosity> explain) const633 void DocumentSourceLookUp::serializeToArray(
634 std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
635 Document doc;
636 if (wasConstructedWithPipelineSyntax()) {
637 MutableDocument exprList;
638 for (auto letVar : _letVariables) {
639 exprList.addField(letVar.name,
640 letVar.expression->serialize(static_cast<bool>(explain)));
641 }
642
643 auto pipeline = _userPipeline;
644 if (_additionalFilter) {
645 pipeline.push_back(BSON("$match" << *_additionalFilter));
646 }
647
648 doc = Document{{getSourceName(),
649 Document{{"from", _fromNs.coll()},
650 {"as", _as.fullPath()},
651 {"let", exprList.freeze()},
652 {"pipeline", pipeline}}}};
653 } else {
654 doc = Document{{getSourceName(),
655 {Document{{"from", _fromNs.coll()},
656 {"as", _as.fullPath()},
657 {"localField", _localField->fullPath()},
658 {"foreignField", _foreignField->fullPath()}}}}};
659 }
660
661 MutableDocument output(doc);
662 if (explain) {
663 if (_unwindSrc) {
664 const boost::optional<FieldPath> indexPath = _unwindSrc->indexPath();
665 output[getSourceName()]["unwinding"] =
666 Value(DOC("preserveNullAndEmptyArrays"
667 << _unwindSrc->preserveNullAndEmptyArrays()
668 << "includeArrayIndex"
669 << (indexPath ? Value(indexPath->fullPath()) : Value())));
670 }
671
672 // Only add _matchSrc for explain when $lookup was constructed with localField/foreignField
673 // syntax. For pipeline sytax, _matchSrc will be included as part of the pipeline
674 // definition.
675 if (!wasConstructedWithPipelineSyntax() && _additionalFilter) {
676 // Our output does not have to be parseable, so include a "matching" field with the
677 // descended match expression.
678 output[getSourceName()]["matching"] = Value(*_additionalFilter);
679 }
680
681 array.push_back(Value(output.freeze()));
682 } else {
683 array.push_back(Value(output.freeze()));
684
685 if (_unwindSrc) {
686 _unwindSrc->serializeToArray(array);
687 }
688
689 if (!wasConstructedWithPipelineSyntax() && _matchSrc) {
690 // '_matchSrc' tracks the originally specified $match. We descend upon the $match in the
691 // first call to getNext(), at which point we are confident that we no longer need to
692 // serialize the $lookup again.
693 _matchSrc->serializeToArray(array);
694 }
695 }
696 }
697
getDependencies(DepsTracker * deps) const698 DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
699 if (wasConstructedWithPipelineSyntax()) {
700 // We will use the introspection pipeline which we prebuilt during construction.
701 invariant(_parsedIntrospectionPipeline);
702
703 DepsTracker subDeps(deps->getMetadataAvailable());
704
705 // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables
706 // declared by this $lookup and variables declared externally.
707 for (auto&& source : _parsedIntrospectionPipeline->getSources()) {
708 source->getDependencies(&subDeps);
709 }
710
711 // Add the 'let' dependencies to the tracker. Because the caller is only interested in
712 // references to external variables, filter out any subpipeline references to 'let'
713 // variables declared by this $lookup.
714 for (auto&& letVar : _letVariables) {
715 letVar.expression->addDependencies(deps);
716 subDeps.vars.erase(letVar.id);
717 }
718
719 // Add sub-pipeline variable dependencies. Do not add field dependencies, since these refer
720 // to the fields from the foreign collection rather than the local collection.
721 deps->vars.insert(subDeps.vars.begin(), subDeps.vars.end());
722 } else {
723 deps->fields.insert(_localField->fullPath());
724 }
725 return SEE_NEXT;
726 }
727
doDetachFromOperationContext()728 void DocumentSourceLookUp::doDetachFromOperationContext() {
729 if (_pipeline) {
730 // We have a pipeline we're going to be executing across multiple calls to getNext(), so we
731 // use Pipeline::detachFromOperationContext() to take care of updating '_fromExpCtx->opCtx'.
732 _pipeline->detachFromOperationContext();
733 invariant(_fromExpCtx->opCtx == nullptr);
734 } else if (_fromExpCtx) {
735 _fromExpCtx->opCtx = nullptr;
736 }
737 }
738
doReattachToOperationContext(OperationContext * opCtx)739 void DocumentSourceLookUp::doReattachToOperationContext(OperationContext* opCtx) {
740 if (_pipeline) {
741 // We have a pipeline we're going to be executing across multiple calls to getNext(), so we
742 // use Pipeline::reattachToOperationContext() to take care of updating '_fromExpCtx->opCtx'.
743 _pipeline->reattachToOperationContext(opCtx);
744 invariant(_fromExpCtx->opCtx == opCtx);
745 } else if (_fromExpCtx) {
746 _fromExpCtx->opCtx = opCtx;
747 }
748 }
749
createFromBson(BSONElement elem,const boost::intrusive_ptr<ExpressionContext> & pExpCtx)750 intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
751 BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
752 uassert(ErrorCodes::FailedToParse,
753 "the $lookup specification must be an Object",
754 elem.type() == BSONType::Object);
755
756 NamespaceString fromNs;
757 std::string as;
758
759 std::string localField;
760 std::string foreignField;
761
762 BSONObj letVariables;
763 std::vector<BSONObj> pipeline;
764 bool hasPipeline = false;
765 bool hasLet = false;
766
767 for (auto&& argument : elem.Obj()) {
768 const auto argName = argument.fieldNameStringData();
769
770 if (argName == "pipeline") {
771 auto result = AggregationRequest::parsePipelineFromBSON(argument);
772 if (!result.isOK()) {
773 uasserted(ErrorCodes::FailedToParse,
774 str::stream() << "invalid $lookup pipeline definition: "
775 << result.getStatus().toString());
776 }
777 pipeline = std::move(result.getValue());
778 hasPipeline = true;
779 continue;
780 }
781
782 if (argName == "let") {
783 uassert(ErrorCodes::FailedToParse,
784 str::stream() << "$lookup argument '" << argument
785 << "' must be an object, is type "
786 << argument.type(),
787 argument.type() == BSONType::Object);
788 letVariables = argument.Obj();
789 hasLet = true;
790 continue;
791 }
792
793 uassert(ErrorCodes::FailedToParse,
794 str::stream() << "$lookup argument '" << argument << "' must be a string, is type "
795 << argument.type(),
796 argument.type() == BSONType::String);
797
798 if (argName == "from") {
799 fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String());
800 } else if (argName == "as") {
801 as = argument.String();
802 } else if (argName == "localField") {
803 localField = argument.String();
804 } else if (argName == "foreignField") {
805 foreignField = argument.String();
806 } else {
807 uasserted(ErrorCodes::FailedToParse,
808 str::stream() << "unknown argument to $lookup: " << argument.fieldName());
809 }
810 }
811
812 uassert(
813 ErrorCodes::FailedToParse, "must specify 'from' field for a $lookup", !fromNs.ns().empty());
814 uassert(ErrorCodes::FailedToParse, "must specify 'as' field for a $lookup", !as.empty());
815
816 if (hasPipeline) {
817 uassert(ErrorCodes::FailedToParse,
818 "$lookup with 'pipeline' may not specify 'localField' or 'foreignField'",
819 localField.empty() && foreignField.empty());
820
821 return new DocumentSourceLookUp(std::move(fromNs),
822 std::move(as),
823 std::move(pipeline),
824 std::move(letVariables),
825 pExpCtx);
826 } else {
827 uassert(ErrorCodes::FailedToParse,
828 "$lookup requires either 'pipeline' or both 'localField' and 'foreignField' to be "
829 "specified",
830 !localField.empty() && !foreignField.empty());
831 uassert(ErrorCodes::FailedToParse,
832 "$lookup with a 'let' argument must also specify 'pipeline'",
833 !hasLet);
834
835 return new DocumentSourceLookUp(std::move(fromNs),
836 std::move(as),
837 std::move(localField),
838 std::move(foreignField),
839 pExpCtx);
840 }
841 }
842 } // namespace mongo
843