1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <list> 34 #include <vector> 35 36 #include <boost/intrusive_ptr.hpp> 37 38 #include "mongo/db/matcher/expression_parser.h" 39 #include "mongo/db/namespace_string.h" 40 #include "mongo/db/pipeline/dependencies.h" 41 #include "mongo/db/pipeline/value.h" 42 #include "mongo/db/query/explain_options.h" 43 #include "mongo/db/query/query_knobs.h" 44 #include "mongo/stdx/functional.h" 45 #include "mongo/util/intrusive_counter.h" 46 #include "mongo/util/timer.h" 47 48 namespace mongo { 49 class BSONObj; 50 class BSONObjBuilder; 51 class ExpressionContext; 52 class DocumentSource; 53 class CollatorInterface; 54 class OperationContext; 55 56 /** 57 * A Pipeline object represents a list of DocumentSources and is responsible for optimizing the 58 * pipeline. 59 */ 60 class Pipeline { 61 public: 62 typedef std::list<boost::intrusive_ptr<DocumentSource>> SourceContainer; 63 64 /** 65 * A SplitState specifies whether the pipeline is currently unsplit, split for the shards, or 66 * split for merging. 67 */ 68 enum class SplitState { kUnsplit, kSplitForShards, kSplitForMerge }; 69 70 /** 71 * This class will ensure a Pipeline is disposed before it is deleted. 72 */ 73 class Deleter { 74 public: 75 /** 76 * Constructs an empty deleter. Useful for creating a 77 * unique_ptr<Pipeline, Pipeline::Deleter> without populating it. 78 */ Deleter()79 Deleter() {} 80 Deleter(OperationContext * opCtx)81 explicit Deleter(OperationContext* opCtx) : _opCtx(opCtx) {} 82 83 /** 84 * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume 85 * responsibility for calling PlanExecutor::dispose(), they can call dismissDisposal(). If 86 * dismissed, a Deleter will not call dispose() when deleting the PlanExecutor. 87 */ dismissDisposal()88 void dismissDisposal() { 89 _dismissed = true; 90 } 91 92 /** 93 * Calls dispose() on 'pipeline', unless this Deleter has been dismissed. 94 */ operator()95 void operator()(Pipeline* pipeline) { 96 // It is illegal to call this method on a default-constructed Deleter. 97 invariant(_opCtx); 98 if (!_dismissed) { 99 pipeline->dispose(_opCtx); 100 } 101 delete pipeline; 102 } 103 104 private: 105 OperationContext* _opCtx = nullptr; 106 107 bool _dismissed = false; 108 }; 109 110 /** 111 * List of supported match expression features in a pipeline. 112 */ 113 static constexpr MatchExpressionParser::AllowedFeatureSet kAllowedMatcherFeatures = 114 MatchExpressionParser::AllowedFeatures::kText | 115 MatchExpressionParser::AllowedFeatures::kExpr | 116 MatchExpressionParser::AllowedFeatures::kJSONSchema; 117 118 /** 119 * Parses a Pipeline from a vector of BSONObjs. Returns a non-OK status if it failed to parse. 120 * The returned pipeline is not optimized, but the caller may convert it to an optimized 121 * pipeline by calling optimizePipeline(). 122 * 123 * It is illegal to create a pipeline using an ExpressionContext which contains a collation that 124 * will not be used during execution of the pipeline. Doing so may cause comparisons made during 125 * parse-time to return the wrong results. 126 */ 127 static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parse( 128 const std::vector<BSONObj>& rawPipeline, 129 const boost::intrusive_ptr<ExpressionContext>& expCtx); 130 131 /** 132 * Parses a $facet Pipeline from a vector of BSONObjs. Validation checks which are only relevant 133 * to top-level pipelines are skipped, and additional checks applicable to $facet pipelines are 134 * performed. Returns a non-OK status if it failed to parse. The returned pipeline is not 135 * optimized, but the caller may convert it to an optimized pipeline by calling 136 * optimizePipeline(). 137 */ 138 static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parseFacetPipeline( 139 const std::vector<BSONObj>& rawPipeline, 140 const boost::intrusive_ptr<ExpressionContext>& expCtx); 141 142 /** 143 * Creates a Pipeline from an existing SourceContainer. 144 * 145 * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage 146 * is present but is not the last stage. 147 */ 148 static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> create( 149 SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx); 150 151 /** 152 * Creates a $facet Pipeline from an existing SourceContainer. 153 * 154 * Returns a non-OK status if any stage is invalid. For example, if the pipeline is empty or if 155 * any stage is an initial source. 156 */ 157 static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> createFacetPipeline( 158 SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx); 159 160 /** 161 * Returns true if the provided aggregation command has a $out stage. 162 */ 163 static bool aggSupportsWriteConcern(const BSONObj& cmd); 164 getContext()165 const boost::intrusive_ptr<ExpressionContext>& getContext() const { 166 return pCtx; 167 } 168 169 /** 170 * Sets the OperationContext of 'pCtx' to nullptr. 171 * 172 * The PipelineProxyStage is responsible for detaching the OperationContext and releasing any 173 * storage-engine state of the DocumentSourceCursor that may be present in '_sources'. 174 */ 175 void detachFromOperationContext(); 176 177 /** 178 * Sets the OperationContext of 'pCtx' to 'opCtx'. 179 * 180 * The PipelineProxyStage is responsible for reattaching the OperationContext and reacquiring 181 * any storage-engine state of the DocumentSourceCursor that may be present in '_sources'. 182 */ 183 void reattachToOperationContext(OperationContext* opCtx); 184 185 /** 186 * Releases any resources held by this pipeline such as PlanExecutors or in-memory structures. 187 * Must be called before deleting a Pipeline. 188 * 189 * There are multiple cleanup scenarios: 190 * - This Pipeline will only ever use one OperationContext. In this case the Pipeline::Deleter 191 * will automatically call dispose() before deleting the Pipeline, and the owner need not 192 * call dispose(). 193 * - This Pipeline may use multiple OperationContexts over its lifetime. In this case it 194 * is the owner's responsibility to call dispose() with a valid OperationContext before 195 * deleting the Pipeline. 196 */ 197 void dispose(OperationContext* opCtx); 198 199 /** 200 * Split the current Pipeline into a Pipeline for each shard, and a Pipeline that combines the 201 * results within mongos. This permanently alters this pipeline for the merging operation, and 202 * returns a Pipeline object that should be executed on each targeted shard. 203 */ 204 std::unique_ptr<Pipeline, Pipeline::Deleter> splitForSharded(); 205 206 /** 207 * Reassemble a split shard pipeline into its original form. Upon return, this pipeline will 208 * contain the original source list. Must be called on the shards part of a split pipeline 209 * returned by a call to splitForSharded(). It is an error to call this on the merge part of the 210 * pipeline, or on a pipeline that has not been split. 211 */ 212 void unsplitFromSharded(std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard); 213 214 /** 215 * Returns true if this pipeline has not been split. 216 */ isUnsplit()217 bool isUnsplit() const { 218 return _splitState == SplitState::kUnsplit; 219 } 220 221 /** 222 * Returns true if this pipeline is the part of a split pipeline which should be targeted to the 223 * shards. 224 */ isSplitForShards()225 bool isSplitForShards() const { 226 return _splitState == SplitState::kSplitForShards; 227 } 228 229 /** 230 * Returns true if this pipeline is the part of a split pipeline which is responsible for 231 * merging the results from the shards. 232 */ isSplitForMerge()233 bool isSplitForMerge() const { 234 return _splitState == SplitState::kSplitForMerge; 235 } 236 237 /** If the pipeline starts with a $match, return its BSON predicate. 238 * Returns empty BSON if the first stage isn't $match. 239 */ 240 BSONObj getInitialQuery() const; 241 242 /** 243 * Returns 'true' if the pipeline must merge on the primary shard. 244 */ 245 bool needsPrimaryShardMerger() const; 246 247 /** 248 * Returns 'true' if the pipeline must merge on mongoS. 249 */ 250 bool needsMongosMerger() const; 251 252 /** 253 * Returns true if the pipeline can run on mongoS, but is not obliged to; that is, it can run 254 * either on mongoS or on a shard. 255 */ 256 bool canRunOnMongos() const; 257 258 /** 259 * Returns true if this pipeline must only run on mongoS. Can be called on unsplit or merge 260 * pipelines, but not on the shards part of a split pipeline. 261 */ 262 bool requiredToRunOnMongos() const; 263 264 /** 265 * Modifies the pipeline, optimizing it by combining and swapping stages. 266 */ 267 void optimizePipeline(); 268 269 /** 270 * Returns any other collections involved in the pipeline in addition to the collection the 271 * aggregation is run on. 272 */ 273 std::vector<NamespaceString> getInvolvedCollections() const; 274 275 /** 276 * Serializes the pipeline into a form that can be parsed into an equivalent pipeline. 277 */ 278 std::vector<Value> serialize() const; 279 280 // The initial source is special since it varies between mongos and mongod. 281 void addInitialSource(boost::intrusive_ptr<DocumentSource> source); 282 283 void addFinalSource(boost::intrusive_ptr<DocumentSource> source); 284 285 /** 286 * Returns the next result from the pipeline, or boost::none if there are no more results. 287 */ 288 boost::optional<Document> getNext(); 289 290 /** 291 * Write the pipeline's operators to a std::vector<Value>, providing the level of detail 292 * specified by 'verbosity'. 293 */ 294 std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const; 295 296 /** 297 * Returns the dependencies needed by this pipeline. 'metadataAvailable' should reflect what 298 * metadata is present on documents that are input to the front of the pipeline. 299 */ 300 DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const; 301 getSources()302 const SourceContainer& getSources() const { 303 return _sources; 304 } 305 306 /** 307 * Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the 308 * given 'predicate' function, if present, returns 'true' when called with a pointer to the 309 * stage. Returns nullptr if there is no first stage which meets these criteria. 310 */ 311 boost::intrusive_ptr<DocumentSource> popFrontWithCriteria( 312 StringData targetStageName, 313 stdx::function<bool(const DocumentSource* const)> predicate = nullptr); 314 315 /** 316 * PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists 317 * because of linkage requirements. Pipeline needs to function in mongod and mongos. PipelineD 318 * contains extra functionality required in mongod, and which can't appear in mongos because the 319 * required symbols are unavailable for linking there. Consider PipelineD to be an extension of 320 * this class for mongod only. 321 */ 322 friend class PipelineD; 323 324 private: 325 class Optimizations { 326 public: 327 // This contains static functions that optimize pipelines in various ways. 328 // This is a class rather than a namespace so that it can be a friend of Pipeline. 329 // It is defined in pipeline_optimizations.h. 330 class Sharded; 331 }; 332 333 friend class Optimizations::Sharded; 334 335 /** 336 * Used by both Pipeline::parse() and Pipeline::parseFacetPipeline() to build and validate the 337 * pipeline. 338 */ 339 static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parseTopLevelOrFacetPipeline( 340 const std::vector<BSONObj>& rawPipeline, 341 const boost::intrusive_ptr<ExpressionContext>& expCtx, 342 const bool isFacetPipeline); 343 344 /** 345 * Used by both Pipeline::create() and Pipeline::createFacetPipeline() to build and validate the 346 * pipeline. 347 */ 348 static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> createTopLevelOrFacetPipeline( 349 SourceContainer sources, 350 const boost::intrusive_ptr<ExpressionContext>& expCtx, 351 const bool isSubPipeline); 352 353 Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx); 354 Pipeline(SourceContainer stages, const boost::intrusive_ptr<ExpressionContext>& pCtx); 355 356 ~Pipeline(); 357 358 /** 359 * Stitch together the source pointers by calling setSource() for each source in '_sources'. 360 * This function must be called any time the order of stages within the pipeline changes, e.g. 361 * in optimizePipeline(). 362 */ 363 void stitch(); 364 365 /** 366 * Reset all stages' child pointers to nullptr. Used to prevent dangling pointers during the 367 * optimization process, where we might swap or destroy stages. 368 */ 369 void unstitch(); 370 371 /** 372 * Throws if the pipeline fails any of a set of semantic checks. For example, if an $out stage 373 * is present then it must come last in the pipeline, while initial stages such as $indexStats 374 * must be at the start. 375 */ 376 void validatePipeline() const; 377 378 /** 379 * Throws if the $facet pipeline fails any of a set of semantic checks. For example, the 380 * pipeline cannot be empty and may not contain any initial stages. 381 */ 382 void validateFacetPipeline() const; 383 384 /** 385 * Helper method which validates that each stage in pipeline is in a legal position. For 386 * example, $out must be at the end, while a $match stage with a text query must be at the 387 * start. Note that this method accepts an initial source as the first stage, which is illegal 388 * for $facet pipelines. 389 */ 390 void ensureAllStagesAreInLegalPositions() const; 391 392 /** 393 * Returns Status::OK if the pipeline can run on mongoS, or an error with a message explaining 394 * why it cannot. 395 */ 396 Status _pipelineCanRunOnMongoS() const; 397 398 SourceContainer _sources; 399 400 // When a pipeline is split via splitForSharded(), the resulting shards pipeline will set 401 // '_unsplitSources' to be the original list of DocumentSources representing the full pipeline. 402 // This is to allow the split pipelines to be subsequently reassembled into the original 403 // pipeline, if necessary. 404 boost::optional<SourceContainer> _unsplitSources; 405 406 SplitState _splitState = SplitState::kUnsplit; 407 boost::intrusive_ptr<ExpressionContext> pCtx; 408 bool _disposed = false; 409 }; 410 } // namespace mongo 411