1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <boost/optional.hpp>
34 
35 #include "mongo/db/pipeline/document_source.h"
36 #include "mongo/db/pipeline/document_source_match.h"
37 #include "mongo/db/pipeline/document_source_sequential_document_cache.h"
38 #include "mongo/db/pipeline/document_source_unwind.h"
39 #include "mongo/db/pipeline/expression.h"
40 #include "mongo/db/pipeline/lite_parsed_pipeline.h"
41 #include "mongo/db/pipeline/lookup_set_cache.h"
42 #include "mongo/db/pipeline/value_comparator.h"
43 
44 namespace mongo {
45 
46 /**
47  * Queries separate collection for equality matches with documents in the pipeline collection.
48  * Adds matching documents to a new array field in the input document.
49  */
50 class DocumentSourceLookUp final : public DocumentSourceNeedsMongoProcessInterface,
51                                    public SplittableDocumentSource {
52 public:
53     static constexpr size_t kMaxSubPipelineDepth = 20;
54 
55     class LiteParsed final : public LiteParsedDocumentSource {
56     public:
57         static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
58                                                  const BSONElement& spec);
59 
LiteParsed(NamespaceString fromNss,stdx::unordered_set<NamespaceString> foreignNssSet,boost::optional<LiteParsedPipeline> liteParsedPipeline)60         LiteParsed(NamespaceString fromNss,
61                    stdx::unordered_set<NamespaceString> foreignNssSet,
62                    boost::optional<LiteParsedPipeline> liteParsedPipeline)
63             : _fromNss{std::move(fromNss)},
64               _foreignNssSet(std::move(foreignNssSet)),
65               _liteParsedPipeline(std::move(liteParsedPipeline)) {}
66 
getInvolvedNamespaces()67         stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
68             return {_foreignNssSet};
69         }
70 
requiredPrivileges(bool isMongos)71         PrivilegeVector requiredPrivileges(bool isMongos) const final {
72             PrivilegeVector requiredPrivileges;
73             Privilege::addPrivilegeToPrivilegeVector(
74                 &requiredPrivileges,
75                 Privilege(ResourcePattern::forExactNamespace(_fromNss), ActionType::find));
76 
77             if (_liteParsedPipeline) {
78                 Privilege::addPrivilegesToPrivilegeVector(
79                     &requiredPrivileges, _liteParsedPipeline->requiredPrivileges(isMongos));
80             }
81 
82             return requiredPrivileges;
83         }
84 
85     private:
86         const NamespaceString _fromNss;
87         const stdx::unordered_set<NamespaceString> _foreignNssSet;
88         const boost::optional<LiteParsedPipeline> _liteParsedPipeline;
89     };
90 
91     GetNextResult getNext() final;
92     const char* getSourceName() const final;
93     void serializeToArray(
94         std::vector<Value>& array,
95         boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
96 
97     /**
98      * Returns the 'as' path, and possibly fields modified by an absorbed $unwind.
99      */
100     GetModPathsReturn getModifiedPaths() const final;
101 
constraints(Pipeline::SplitState pipeState)102     StageConstraints constraints(Pipeline::SplitState pipeState) const final {
103         const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
104             std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
105                         _parsedIntrospectionPipeline->getSources().end(),
106                         [](const auto& source) {
107                             return source->constraints().diskRequirement ==
108                                 DiskUseRequirement::kWritesTmpData;
109                         });
110 
111         StageConstraints constraints(StreamType::kStreaming,
112                                      PositionRequirement::kNone,
113                                      HostTypeRequirement::kPrimaryShard,
114                                      mayUseDisk ? DiskUseRequirement::kWritesTmpData
115                                                 : DiskUseRequirement::kNoDiskUse,
116                                      FacetRequirement::kAllowed);
117 
118         constraints.canSwapWithMatch = true;
119         return constraints;
120     }
121 
122     GetDepsReturn getDependencies(DepsTracker* deps) const final;
123 
getOutputSorts()124     BSONObjSet getOutputSorts() final {
125         return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()});
126     }
127 
getShardSource()128     boost::intrusive_ptr<DocumentSource> getShardSource() final {
129         return nullptr;
130     }
131 
getMergeSources()132     std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final {
133         return {this};
134     }
135 
addInvolvedCollections(std::vector<NamespaceString> * collections)136     void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
137         collections->push_back(_fromNs);
138         if (_parsedIntrospectionPipeline) {
139             for (auto&& stage : _parsedIntrospectionPipeline->getSources()) {
140                 stage->addInvolvedCollections(collections);
141             }
142         }
143     }
144 
145     void doDetachFromOperationContext() final;
146 
147     void doReattachToOperationContext(OperationContext* opCtx) final;
148 
149     static boost::intrusive_ptr<DocumentSource> createFromBson(
150         BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
151 
createFromBsonWithCacheSize(BSONElement elem,const boost::intrusive_ptr<ExpressionContext> & pExpCtx,size_t maxCacheSizeBytes)152     static boost::intrusive_ptr<DocumentSource> createFromBsonWithCacheSize(
153         BSONElement elem,
154         const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
155         size_t maxCacheSizeBytes) {
156         auto dsLookup = createFromBson(elem, pExpCtx);
157         static_cast<DocumentSourceLookUp*>(dsLookup.get())->reInitializeCache(maxCacheSizeBytes);
158         return dsLookup;
159     }
160 
161     /**
162      * Builds the BSONObj used to query the foreign collection and wraps it in a $match.
163      */
164     static BSONObj makeMatchStageFromInput(const Document& input,
165                                            const FieldPath& localFieldName,
166                                            const std::string& foreignFieldName,
167                                            const BSONObj& additionalFilter);
168 
169     /**
170      * Helper to absorb an $unwind stage. Only used for testing this special behavior.
171      */
setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind> & unwind)172     void setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind>& unwind) {
173         invariant(!_unwindSrc);
174         _unwindSrc = unwind;
175     }
176 
177     /**
178      * Returns true if DocumentSourceLookup was constructed with pipeline syntax (as opposed to
179      * localField/foreignField syntax).
180      */
wasConstructedWithPipelineSyntax()181     bool wasConstructedWithPipelineSyntax() const {
182         return !static_cast<bool>(_localField);
183     }
184 
getVariables_forTest()185     const Variables& getVariables_forTest() {
186         return _variables;
187     }
188 
getVariablesParseState_forTest()189     const VariablesParseState& getVariablesParseState_forTest() {
190         return _variablesParseState;
191     }
192 
getSubPipeline_forTest(const Document & inputDoc)193     std::unique_ptr<Pipeline, Pipeline::Deleter> getSubPipeline_forTest(const Document& inputDoc) {
194         return buildPipeline(inputDoc);
195     }
196 
197 protected:
198     void doDispose() final;
199 
200     /**
201      * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc'
202      * field.
203      */
204     Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
205                                                      Pipeline::SourceContainer* container) final;
206 
207 private:
208     struct LetVariable {
LetVariableLetVariable209         LetVariable(std::string name, boost::intrusive_ptr<Expression> expression, Variables::Id id)
210             : name(std::move(name)), expression(std::move(expression)), id(id) {}
211 
212         std::string name;
213         boost::intrusive_ptr<Expression> expression;
214         Variables::Id id;
215     };
216 
217     /**
218      * Target constructor. Handles common-field initialization for the syntax-specific delegating
219      * constructors.
220      */
221     DocumentSourceLookUp(NamespaceString fromNs,
222                          std::string as,
223                          const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
224 
225     /**
226      * Constructor used for a $lookup stage specified using the {from: ..., localField: ...,
227      * foreignField: ..., as: ...} syntax.
228      */
229     DocumentSourceLookUp(NamespaceString fromNs,
230                          std::string as,
231                          std::string localField,
232                          std::string foreignField,
233                          const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
234 
235     /**
236      * Constructor used for a $lookup stage specified using the {from: ..., pipeline: [...], as:
237      * ...} syntax.
238      */
239     DocumentSourceLookUp(NamespaceString fromNs,
240                          std::string as,
241                          std::vector<BSONObj> pipeline,
242                          BSONObj letVariables,
243                          const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
244 
245     /**
246      * Should not be called; use serializeToArray instead.
247      */
248     Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
249         MONGO_UNREACHABLE;
250     }
251 
252     GetNextResult unwindResult();
253 
254     /**
255      * Copies 'vars' and 'vps' to the Variables and VariablesParseState objects in 'expCtx'. These
256      * copies provide access to 'let' defined variables in sub-pipeline execution.
257      */
258     static void copyVariablesToExpCtx(const Variables& vars,
259                                       const VariablesParseState& vps,
260                                       ExpressionContext* expCtx);
261 
262     /**
263      * Resolves let defined variables against 'localDoc' and stores the results in 'variables'.
264      */
265     void resolveLetVariables(const Document& localDoc, Variables* variables);
266 
267     /**
268      * Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup
269      * pipelines will be built recursively.
270      */
271     void initializeIntrospectionPipeline();
272 
273     /**
274      * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
275      * cursor and/or cache source as appropriate.
276      */
277     std::unique_ptr<Pipeline, Pipeline::Deleter> buildPipeline(const Document& inputDoc);
278 
279     /**
280      * The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that
281      * is executed in that it will not include optimizations or resolved views.
282      */
283     std::string getUserPipelineDefinition();
284 
285     /**
286      * Reinitialize the cache with a new max size. May only be called if this DSLookup was created
287      * with pipeline syntax, the cache has not been frozen or abandoned, and no data has been added
288      * to it.
289      */
reInitializeCache(size_t maxCacheSizeBytes)290     void reInitializeCache(size_t maxCacheSizeBytes) {
291         invariant(wasConstructedWithPipelineSyntax());
292         invariant(!_cache || (_cache->isBuilding() && _cache->sizeBytes() == 0));
293         _cache.emplace(maxCacheSizeBytes);
294     }
295 
296     NamespaceString _fromNs;
297     NamespaceString _resolvedNs;
298     FieldPath _as;
299     boost::optional<BSONObj> _additionalFilter;
300 
301     // For use when $lookup is specified with localField/foreignField syntax.
302     boost::optional<FieldPath> _localField;
303     boost::optional<FieldPath> _foreignField;
304 
305     // Holds 'let' defined variables defined both in this stage and in parent pipelines. These are
306     // copied to the '_fromExpCtx' ExpressionContext's 'variables' and 'variablesParseState' for use
307     // in foreign pipeline execution.
308     Variables _variables;
309     VariablesParseState _variablesParseState;
310 
311     // Caches documents returned by the non-correlated prefix of the $lookup pipeline during the
312     // first iteration, up to a specified size limit in bytes. If this limit is not exceeded by the
313     // time we hit EOF, subsequent iterations of the pipeline will draw from the cache rather than
314     // from a cursor source.
315     boost::optional<SequentialDocumentCache> _cache;
316 
317     // The ExpressionContext used when performing aggregation pipelines against the '_resolvedNs'
318     // namespace.
319     boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
320 
321     // The aggregation pipeline to perform against the '_resolvedNs' namespace. Referenced view
322     // namespaces have been resolved.
323     std::vector<BSONObj> _resolvedPipeline;
324     // The aggregation pipeline defined with the user request, prior to optimization and view
325     // resolution.
326     std::vector<BSONObj> _userPipeline;
327     // A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective
328     // functions. If sub-$lookup stages are present, their pipelines are constructed recursively.
329     std::unique_ptr<Pipeline, Pipeline::Deleter> _parsedIntrospectionPipeline;
330 
331     std::vector<LetVariable> _letVariables;
332 
333     boost::intrusive_ptr<DocumentSourceMatch> _matchSrc;
334     boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc;
335 
336     // The following members are used to hold onto state across getNext() calls when '_unwindSrc' is
337     // not null.
338     long long _cursorIndex = 0;
339     std::unique_ptr<Pipeline, Pipeline::Deleter> _pipeline;
340     boost::optional<Document> _input;
341     boost::optional<Document> _nextValue;
342 };
343 
344 }  // namespace mongo
345