1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <boost/optional.hpp> 34 35 #include "mongo/db/pipeline/document_source.h" 36 #include "mongo/db/pipeline/document_source_match.h" 37 #include "mongo/db/pipeline/document_source_sequential_document_cache.h" 38 #include "mongo/db/pipeline/document_source_unwind.h" 39 #include "mongo/db/pipeline/expression.h" 40 #include "mongo/db/pipeline/lite_parsed_pipeline.h" 41 #include "mongo/db/pipeline/lookup_set_cache.h" 42 #include "mongo/db/pipeline/value_comparator.h" 43 44 namespace mongo { 45 46 /** 47 * Queries separate collection for equality matches with documents in the pipeline collection. 48 * Adds matching documents to a new array field in the input document. 49 */ 50 class DocumentSourceLookUp final : public DocumentSourceNeedsMongoProcessInterface, 51 public SplittableDocumentSource { 52 public: 53 static constexpr size_t kMaxSubPipelineDepth = 20; 54 55 class LiteParsed final : public LiteParsedDocumentSource { 56 public: 57 static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, 58 const BSONElement& spec); 59 LiteParsed(NamespaceString fromNss,stdx::unordered_set<NamespaceString> foreignNssSet,boost::optional<LiteParsedPipeline> liteParsedPipeline)60 LiteParsed(NamespaceString fromNss, 61 stdx::unordered_set<NamespaceString> foreignNssSet, 62 boost::optional<LiteParsedPipeline> liteParsedPipeline) 63 : _fromNss{std::move(fromNss)}, 64 _foreignNssSet(std::move(foreignNssSet)), 65 _liteParsedPipeline(std::move(liteParsedPipeline)) {} 66 getInvolvedNamespaces()67 stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { 68 return {_foreignNssSet}; 69 } 70 requiredPrivileges(bool isMongos)71 PrivilegeVector requiredPrivileges(bool isMongos) const final { 72 PrivilegeVector requiredPrivileges; 73 Privilege::addPrivilegeToPrivilegeVector( 74 &requiredPrivileges, 75 Privilege(ResourcePattern::forExactNamespace(_fromNss), ActionType::find)); 76 77 if (_liteParsedPipeline) { 78 Privilege::addPrivilegesToPrivilegeVector( 79 &requiredPrivileges, _liteParsedPipeline->requiredPrivileges(isMongos)); 80 } 81 82 return requiredPrivileges; 83 } 84 85 private: 86 const NamespaceString _fromNss; 87 const stdx::unordered_set<NamespaceString> _foreignNssSet; 88 const boost::optional<LiteParsedPipeline> _liteParsedPipeline; 89 }; 90 91 GetNextResult getNext() final; 92 const char* getSourceName() const final; 93 void serializeToArray( 94 std::vector<Value>& array, 95 boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; 96 97 /** 98 * Returns the 'as' path, and possibly fields modified by an absorbed $unwind. 99 */ 100 GetModPathsReturn getModifiedPaths() const final; 101 constraints(Pipeline::SplitState pipeState)102 StageConstraints constraints(Pipeline::SplitState pipeState) const final { 103 const bool mayUseDisk = wasConstructedWithPipelineSyntax() && 104 std::any_of(_parsedIntrospectionPipeline->getSources().begin(), 105 _parsedIntrospectionPipeline->getSources().end(), 106 [](const auto& source) { 107 return source->constraints().diskRequirement == 108 DiskUseRequirement::kWritesTmpData; 109 }); 110 111 StageConstraints constraints(StreamType::kStreaming, 112 PositionRequirement::kNone, 113 HostTypeRequirement::kPrimaryShard, 114 mayUseDisk ? DiskUseRequirement::kWritesTmpData 115 : DiskUseRequirement::kNoDiskUse, 116 FacetRequirement::kAllowed); 117 118 constraints.canSwapWithMatch = true; 119 return constraints; 120 } 121 122 GetDepsReturn getDependencies(DepsTracker* deps) const final; 123 getOutputSorts()124 BSONObjSet getOutputSorts() final { 125 return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); 126 } 127 getShardSource()128 boost::intrusive_ptr<DocumentSource> getShardSource() final { 129 return nullptr; 130 } 131 getMergeSources()132 std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() final { 133 return {this}; 134 } 135 addInvolvedCollections(std::vector<NamespaceString> * collections)136 void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { 137 collections->push_back(_fromNs); 138 if (_parsedIntrospectionPipeline) { 139 for (auto&& stage : _parsedIntrospectionPipeline->getSources()) { 140 stage->addInvolvedCollections(collections); 141 } 142 } 143 } 144 145 void doDetachFromOperationContext() final; 146 147 void doReattachToOperationContext(OperationContext* opCtx) final; 148 149 static boost::intrusive_ptr<DocumentSource> createFromBson( 150 BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); 151 createFromBsonWithCacheSize(BSONElement elem,const boost::intrusive_ptr<ExpressionContext> & pExpCtx,size_t maxCacheSizeBytes)152 static boost::intrusive_ptr<DocumentSource> createFromBsonWithCacheSize( 153 BSONElement elem, 154 const boost::intrusive_ptr<ExpressionContext>& pExpCtx, 155 size_t maxCacheSizeBytes) { 156 auto dsLookup = createFromBson(elem, pExpCtx); 157 static_cast<DocumentSourceLookUp*>(dsLookup.get())->reInitializeCache(maxCacheSizeBytes); 158 return dsLookup; 159 } 160 161 /** 162 * Builds the BSONObj used to query the foreign collection and wraps it in a $match. 163 */ 164 static BSONObj makeMatchStageFromInput(const Document& input, 165 const FieldPath& localFieldName, 166 const std::string& foreignFieldName, 167 const BSONObj& additionalFilter); 168 169 /** 170 * Helper to absorb an $unwind stage. Only used for testing this special behavior. 171 */ setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind> & unwind)172 void setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind>& unwind) { 173 invariant(!_unwindSrc); 174 _unwindSrc = unwind; 175 } 176 177 /** 178 * Returns true if DocumentSourceLookup was constructed with pipeline syntax (as opposed to 179 * localField/foreignField syntax). 180 */ wasConstructedWithPipelineSyntax()181 bool wasConstructedWithPipelineSyntax() const { 182 return !static_cast<bool>(_localField); 183 } 184 getVariables_forTest()185 const Variables& getVariables_forTest() { 186 return _variables; 187 } 188 getVariablesParseState_forTest()189 const VariablesParseState& getVariablesParseState_forTest() { 190 return _variablesParseState; 191 } 192 getSubPipeline_forTest(const Document & inputDoc)193 std::unique_ptr<Pipeline, Pipeline::Deleter> getSubPipeline_forTest(const Document& inputDoc) { 194 return buildPipeline(inputDoc); 195 } 196 197 protected: 198 void doDispose() final; 199 200 /** 201 * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc' 202 * field. 203 */ 204 Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, 205 Pipeline::SourceContainer* container) final; 206 207 private: 208 struct LetVariable { LetVariableLetVariable209 LetVariable(std::string name, boost::intrusive_ptr<Expression> expression, Variables::Id id) 210 : name(std::move(name)), expression(std::move(expression)), id(id) {} 211 212 std::string name; 213 boost::intrusive_ptr<Expression> expression; 214 Variables::Id id; 215 }; 216 217 /** 218 * Target constructor. Handles common-field initialization for the syntax-specific delegating 219 * constructors. 220 */ 221 DocumentSourceLookUp(NamespaceString fromNs, 222 std::string as, 223 const boost::intrusive_ptr<ExpressionContext>& pExpCtx); 224 225 /** 226 * Constructor used for a $lookup stage specified using the {from: ..., localField: ..., 227 * foreignField: ..., as: ...} syntax. 228 */ 229 DocumentSourceLookUp(NamespaceString fromNs, 230 std::string as, 231 std::string localField, 232 std::string foreignField, 233 const boost::intrusive_ptr<ExpressionContext>& pExpCtx); 234 235 /** 236 * Constructor used for a $lookup stage specified using the {from: ..., pipeline: [...], as: 237 * ...} syntax. 238 */ 239 DocumentSourceLookUp(NamespaceString fromNs, 240 std::string as, 241 std::vector<BSONObj> pipeline, 242 BSONObj letVariables, 243 const boost::intrusive_ptr<ExpressionContext>& pExpCtx); 244 245 /** 246 * Should not be called; use serializeToArray instead. 247 */ 248 Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { 249 MONGO_UNREACHABLE; 250 } 251 252 GetNextResult unwindResult(); 253 254 /** 255 * Copies 'vars' and 'vps' to the Variables and VariablesParseState objects in 'expCtx'. These 256 * copies provide access to 'let' defined variables in sub-pipeline execution. 257 */ 258 static void copyVariablesToExpCtx(const Variables& vars, 259 const VariablesParseState& vps, 260 ExpressionContext* expCtx); 261 262 /** 263 * Resolves let defined variables against 'localDoc' and stores the results in 'variables'. 264 */ 265 void resolveLetVariables(const Document& localDoc, Variables* variables); 266 267 /** 268 * Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup 269 * pipelines will be built recursively. 270 */ 271 void initializeIntrospectionPipeline(); 272 273 /** 274 * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a 275 * cursor and/or cache source as appropriate. 276 */ 277 std::unique_ptr<Pipeline, Pipeline::Deleter> buildPipeline(const Document& inputDoc); 278 279 /** 280 * The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that 281 * is executed in that it will not include optimizations or resolved views. 282 */ 283 std::string getUserPipelineDefinition(); 284 285 /** 286 * Reinitialize the cache with a new max size. May only be called if this DSLookup was created 287 * with pipeline syntax, the cache has not been frozen or abandoned, and no data has been added 288 * to it. 289 */ reInitializeCache(size_t maxCacheSizeBytes)290 void reInitializeCache(size_t maxCacheSizeBytes) { 291 invariant(wasConstructedWithPipelineSyntax()); 292 invariant(!_cache || (_cache->isBuilding() && _cache->sizeBytes() == 0)); 293 _cache.emplace(maxCacheSizeBytes); 294 } 295 296 NamespaceString _fromNs; 297 NamespaceString _resolvedNs; 298 FieldPath _as; 299 boost::optional<BSONObj> _additionalFilter; 300 301 // For use when $lookup is specified with localField/foreignField syntax. 302 boost::optional<FieldPath> _localField; 303 boost::optional<FieldPath> _foreignField; 304 305 // Holds 'let' defined variables defined both in this stage and in parent pipelines. These are 306 // copied to the '_fromExpCtx' ExpressionContext's 'variables' and 'variablesParseState' for use 307 // in foreign pipeline execution. 308 Variables _variables; 309 VariablesParseState _variablesParseState; 310 311 // Caches documents returned by the non-correlated prefix of the $lookup pipeline during the 312 // first iteration, up to a specified size limit in bytes. If this limit is not exceeded by the 313 // time we hit EOF, subsequent iterations of the pipeline will draw from the cache rather than 314 // from a cursor source. 315 boost::optional<SequentialDocumentCache> _cache; 316 317 // The ExpressionContext used when performing aggregation pipelines against the '_resolvedNs' 318 // namespace. 319 boost::intrusive_ptr<ExpressionContext> _fromExpCtx; 320 321 // The aggregation pipeline to perform against the '_resolvedNs' namespace. Referenced view 322 // namespaces have been resolved. 323 std::vector<BSONObj> _resolvedPipeline; 324 // The aggregation pipeline defined with the user request, prior to optimization and view 325 // resolution. 326 std::vector<BSONObj> _userPipeline; 327 // A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective 328 // functions. If sub-$lookup stages are present, their pipelines are constructed recursively. 329 std::unique_ptr<Pipeline, Pipeline::Deleter> _parsedIntrospectionPipeline; 330 331 std::vector<LetVariable> _letVariables; 332 333 boost::intrusive_ptr<DocumentSourceMatch> _matchSrc; 334 boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc; 335 336 // The following members are used to hold onto state across getNext() calls when '_unwindSrc' is 337 // not null. 338 long long _cursorIndex = 0; 339 std::unique_ptr<Pipeline, Pipeline::Deleter> _pipeline; 340 boost::optional<Document> _input; 341 boost::optional<Document> _nextValue; 342 }; 343 344 } // namespace mongo 345