1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <list>
34 #include <vector>
35 
36 #include <boost/intrusive_ptr.hpp>
37 
38 #include "mongo/db/matcher/expression_parser.h"
39 #include "mongo/db/namespace_string.h"
40 #include "mongo/db/pipeline/dependencies.h"
41 #include "mongo/db/pipeline/value.h"
42 #include "mongo/db/query/explain_options.h"
43 #include "mongo/db/query/query_knobs.h"
44 #include "mongo/stdx/functional.h"
45 #include "mongo/util/intrusive_counter.h"
46 #include "mongo/util/timer.h"
47 
48 namespace mongo {
49 class BSONObj;
50 class BSONObjBuilder;
51 class ExpressionContext;
52 class DocumentSource;
53 class CollatorInterface;
54 class OperationContext;
55 
56 /**
57  * A Pipeline object represents a list of DocumentSources and is responsible for optimizing the
58  * pipeline.
59  */
60 class Pipeline {
61 public:
62     typedef std::list<boost::intrusive_ptr<DocumentSource>> SourceContainer;
63 
64     /**
65      * A SplitState specifies whether the pipeline is currently unsplit, split for the shards, or
66      * split for merging.
67      */
68     enum class SplitState { kUnsplit, kSplitForShards, kSplitForMerge };
69 
70     /**
71      * This class will ensure a Pipeline is disposed before it is deleted.
72      */
73     class Deleter {
74     public:
75         /**
76          * Constructs an empty deleter. Useful for creating a
77          * unique_ptr<Pipeline, Pipeline::Deleter> without populating it.
78          */
Deleter()79         Deleter() {}
80 
Deleter(OperationContext * opCtx)81         explicit Deleter(OperationContext* opCtx) : _opCtx(opCtx) {}
82 
83         /**
84          * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume
85          * responsibility for calling PlanExecutor::dispose(), they can call dismissDisposal(). If
86          * dismissed, a Deleter will not call dispose() when deleting the PlanExecutor.
87          */
dismissDisposal()88         void dismissDisposal() {
89             _dismissed = true;
90         }
91 
92         /**
93          * Calls dispose() on 'pipeline', unless this Deleter has been dismissed.
94          */
operator()95         void operator()(Pipeline* pipeline) {
96             // It is illegal to call this method on a default-constructed Deleter.
97             invariant(_opCtx);
98             if (!_dismissed) {
99                 pipeline->dispose(_opCtx);
100             }
101             delete pipeline;
102         }
103 
104     private:
105         OperationContext* _opCtx = nullptr;
106 
107         bool _dismissed = false;
108     };
109 
110     /**
111      * List of supported match expression features in a pipeline.
112      */
113     static constexpr MatchExpressionParser::AllowedFeatureSet kAllowedMatcherFeatures =
114         MatchExpressionParser::AllowedFeatures::kText |
115         MatchExpressionParser::AllowedFeatures::kExpr |
116         MatchExpressionParser::AllowedFeatures::kJSONSchema;
117 
118     /**
119      * Parses a Pipeline from a vector of BSONObjs. Returns a non-OK status if it failed to parse.
120      * The returned pipeline is not optimized, but the caller may convert it to an optimized
121      * pipeline by calling optimizePipeline().
122      *
123      * It is illegal to create a pipeline using an ExpressionContext which contains a collation that
124      * will not be used during execution of the pipeline. Doing so may cause comparisons made during
125      * parse-time to return the wrong results.
126      */
127     static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parse(
128         const std::vector<BSONObj>& rawPipeline,
129         const boost::intrusive_ptr<ExpressionContext>& expCtx);
130 
131     /**
132      * Parses a $facet Pipeline from a vector of BSONObjs. Validation checks which are only relevant
133      * to top-level pipelines are skipped, and additional checks applicable to $facet pipelines are
134      * performed. Returns a non-OK status if it failed to parse. The returned pipeline is not
135      * optimized, but the caller may convert it to an optimized pipeline by calling
136      * optimizePipeline().
137      */
138     static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parseFacetPipeline(
139         const std::vector<BSONObj>& rawPipeline,
140         const boost::intrusive_ptr<ExpressionContext>& expCtx);
141 
142     /**
143      * Creates a Pipeline from an existing SourceContainer.
144      *
145      * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage
146      * is present but is not the last stage.
147      */
148     static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> create(
149         SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx);
150 
151     /**
152      * Creates a $facet Pipeline from an existing SourceContainer.
153      *
154      * Returns a non-OK status if any stage is invalid. For example, if the pipeline is empty or if
155      * any stage is an initial source.
156      */
157     static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> createFacetPipeline(
158         SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx);
159 
160     /**
161      * Returns true if the provided aggregation command has a $out stage.
162      */
163     static bool aggSupportsWriteConcern(const BSONObj& cmd);
164 
getContext()165     const boost::intrusive_ptr<ExpressionContext>& getContext() const {
166         return pCtx;
167     }
168 
169     /**
170      * Sets the OperationContext of 'pCtx' to nullptr.
171      *
172      * The PipelineProxyStage is responsible for detaching the OperationContext and releasing any
173      * storage-engine state of the DocumentSourceCursor that may be present in '_sources'.
174      */
175     void detachFromOperationContext();
176 
177     /**
178      * Sets the OperationContext of 'pCtx' to 'opCtx'.
179      *
180      * The PipelineProxyStage is responsible for reattaching the OperationContext and reacquiring
181      * any storage-engine state of the DocumentSourceCursor that may be present in '_sources'.
182      */
183     void reattachToOperationContext(OperationContext* opCtx);
184 
185     /**
186      * Releases any resources held by this pipeline such as PlanExecutors or in-memory structures.
187      * Must be called before deleting a Pipeline.
188      *
189      * There are multiple cleanup scenarios:
190      *  - This Pipeline will only ever use one OperationContext. In this case the Pipeline::Deleter
191      *    will automatically call dispose() before deleting the Pipeline, and the owner need not
192      *    call dispose().
193      *  - This Pipeline may use multiple OperationContexts over its lifetime. In this case it
194      *    is the owner's responsibility to call dispose() with a valid OperationContext before
195      *    deleting the Pipeline.
196      */
197     void dispose(OperationContext* opCtx);
198 
199     /**
200      * Split the current Pipeline into a Pipeline for each shard, and a Pipeline that combines the
201      * results within mongos. This permanently alters this pipeline for the merging operation, and
202      * returns a Pipeline object that should be executed on each targeted shard.
203     */
204     std::unique_ptr<Pipeline, Pipeline::Deleter> splitForSharded();
205 
206     /**
207      * Reassemble a split shard pipeline into its original form. Upon return, this pipeline will
208      * contain the original source list. Must be called on the shards part of a split pipeline
209      * returned by a call to splitForSharded(). It is an error to call this on the merge part of the
210      * pipeline, or on a pipeline that has not been split.
211      */
212     void unsplitFromSharded(std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard);
213 
214     /**
215      * Returns true if this pipeline has not been split.
216      */
isUnsplit()217     bool isUnsplit() const {
218         return _splitState == SplitState::kUnsplit;
219     }
220 
221     /**
222      * Returns true if this pipeline is the part of a split pipeline which should be targeted to the
223      * shards.
224      */
isSplitForShards()225     bool isSplitForShards() const {
226         return _splitState == SplitState::kSplitForShards;
227     }
228 
229     /**
230      * Returns true if this pipeline is the part of a split pipeline which is responsible for
231      * merging the results from the shards.
232      */
isSplitForMerge()233     bool isSplitForMerge() const {
234         return _splitState == SplitState::kSplitForMerge;
235     }
236 
237     /** If the pipeline starts with a $match, return its BSON predicate.
238      *  Returns empty BSON if the first stage isn't $match.
239      */
240     BSONObj getInitialQuery() const;
241 
242     /**
243      * Returns 'true' if the pipeline must merge on the primary shard.
244      */
245     bool needsPrimaryShardMerger() const;
246 
247     /**
248      * Returns 'true' if the pipeline must merge on mongoS.
249      */
250     bool needsMongosMerger() const;
251 
252     /**
253      * Returns true if the pipeline can run on mongoS, but is not obliged to; that is, it can run
254      * either on mongoS or on a shard.
255      */
256     bool canRunOnMongos() const;
257 
258     /**
259      * Returns true if this pipeline must only run on mongoS. Can be called on unsplit or merge
260      * pipelines, but not on the shards part of a split pipeline.
261      */
262     bool requiredToRunOnMongos() const;
263 
264     /**
265      * Modifies the pipeline, optimizing it by combining and swapping stages.
266      */
267     void optimizePipeline();
268 
269     /**
270      * Returns any other collections involved in the pipeline in addition to the collection the
271      * aggregation is run on.
272      */
273     std::vector<NamespaceString> getInvolvedCollections() const;
274 
275     /**
276      * Serializes the pipeline into a form that can be parsed into an equivalent pipeline.
277      */
278     std::vector<Value> serialize() const;
279 
280     // The initial source is special since it varies between mongos and mongod.
281     void addInitialSource(boost::intrusive_ptr<DocumentSource> source);
282 
283     void addFinalSource(boost::intrusive_ptr<DocumentSource> source);
284 
285     /**
286      * Returns the next result from the pipeline, or boost::none if there are no more results.
287      */
288     boost::optional<Document> getNext();
289 
290     /**
291      * Write the pipeline's operators to a std::vector<Value>, providing the level of detail
292      * specified by 'verbosity'.
293      */
294     std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const;
295 
296     /**
297      * Returns the dependencies needed by this pipeline. 'metadataAvailable' should reflect what
298      * metadata is present on documents that are input to the front of the pipeline.
299      */
300     DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const;
301 
getSources()302     const SourceContainer& getSources() const {
303         return _sources;
304     }
305 
306     /**
307      * Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the
308      * given 'predicate' function, if present, returns 'true' when called with a pointer to the
309      * stage. Returns nullptr if there is no first stage which meets these criteria.
310      */
311     boost::intrusive_ptr<DocumentSource> popFrontWithCriteria(
312         StringData targetStageName,
313         stdx::function<bool(const DocumentSource* const)> predicate = nullptr);
314 
315     /**
316      * PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists
317      * because of linkage requirements. Pipeline needs to function in mongod and mongos. PipelineD
318      * contains extra functionality required in mongod, and which can't appear in mongos because the
319      * required symbols are unavailable for linking there. Consider PipelineD to be an extension of
320      * this class for mongod only.
321      */
322     friend class PipelineD;
323 
324 private:
325     class Optimizations {
326     public:
327         // This contains static functions that optimize pipelines in various ways.
328         // This is a class rather than a namespace so that it can be a friend of Pipeline.
329         // It is defined in pipeline_optimizations.h.
330         class Sharded;
331     };
332 
333     friend class Optimizations::Sharded;
334 
335     /**
336      * Used by both Pipeline::parse() and Pipeline::parseFacetPipeline() to build and validate the
337      * pipeline.
338      */
339     static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parseTopLevelOrFacetPipeline(
340         const std::vector<BSONObj>& rawPipeline,
341         const boost::intrusive_ptr<ExpressionContext>& expCtx,
342         const bool isFacetPipeline);
343 
344     /**
345      * Used by both Pipeline::create() and Pipeline::createFacetPipeline() to build and validate the
346      * pipeline.
347      */
348     static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> createTopLevelOrFacetPipeline(
349         SourceContainer sources,
350         const boost::intrusive_ptr<ExpressionContext>& expCtx,
351         const bool isSubPipeline);
352 
353     Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx);
354     Pipeline(SourceContainer stages, const boost::intrusive_ptr<ExpressionContext>& pCtx);
355 
356     ~Pipeline();
357 
358     /**
359      * Stitch together the source pointers by calling setSource() for each source in '_sources'.
360      * This function must be called any time the order of stages within the pipeline changes, e.g.
361      * in optimizePipeline().
362      */
363     void stitch();
364 
365     /**
366      * Reset all stages' child pointers to nullptr. Used to prevent dangling pointers during the
367      * optimization process, where we might swap or destroy stages.
368      */
369     void unstitch();
370 
371     /**
372      * Throws if the pipeline fails any of a set of semantic checks. For example, if an $out stage
373      * is present then it must come last in the pipeline, while initial stages such as $indexStats
374      * must be at the start.
375      */
376     void validatePipeline() const;
377 
378     /**
379      * Throws if the $facet pipeline fails any of a set of semantic checks. For example, the
380      * pipeline cannot be empty and may not contain any initial stages.
381      */
382     void validateFacetPipeline() const;
383 
384     /**
385      * Helper method which validates that each stage in pipeline is in a legal position. For
386      * example, $out must be at the end, while a $match stage with a text query must be at the
387      * start. Note that this method accepts an initial source as the first stage, which is illegal
388      * for $facet pipelines.
389      */
390     void ensureAllStagesAreInLegalPositions() const;
391 
392     /**
393      * Returns Status::OK if the pipeline can run on mongoS, or an error with a message explaining
394      * why it cannot.
395      */
396     Status _pipelineCanRunOnMongoS() const;
397 
398     SourceContainer _sources;
399 
400     // When a pipeline is split via splitForSharded(), the resulting shards pipeline will set
401     // '_unsplitSources' to be the original list of DocumentSources representing the full pipeline.
402     // This is to allow the split pipelines to be subsequently reassembled into the original
403     // pipeline, if necessary.
404     boost::optional<SourceContainer> _unsplitSources;
405 
406     SplitState _splitState = SplitState::kUnsplit;
407     boost::intrusive_ptr<ExpressionContext> pCtx;
408     bool _disposed = false;
409 };
410 }  // namespace mongo
411