1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include "mongo/platform/basic.h"
34 
35 #include <boost/intrusive_ptr.hpp>
36 #include <boost/optional.hpp>
37 #include <list>
38 #include <memory>
39 #include <string>
40 #include <vector>
41 
42 #include "mongo/base/init.h"
43 #include "mongo/bson/simple_bsonobj_comparator.h"
44 #include "mongo/client/dbclientinterface.h"
45 #include "mongo/db/collection_index_usage_tracker.h"
46 #include "mongo/db/commands.h"
47 #include "mongo/db/jsobj.h"
48 #include "mongo/db/namespace_string.h"
49 #include "mongo/db/pipeline/dependencies.h"
50 #include "mongo/db/pipeline/document.h"
51 #include "mongo/db/pipeline/expression_context.h"
52 #include "mongo/db/pipeline/field_path.h"
53 #include "mongo/db/pipeline/lite_parsed_document_source.h"
54 #include "mongo/db/pipeline/pipeline.h"
55 #include "mongo/db/pipeline/value.h"
56 #include "mongo/db/query/explain_options.h"
57 #include "mongo/stdx/functional.h"
58 #include "mongo/util/intrusive_counter.h"
59 
60 namespace mongo {
61 
62 class AggregationRequest;
63 class Document;
64 
65 /**
66  * Registers a DocumentSource to have the name 'key'.
67  *
68  * 'liteParser' takes an AggregationRequest and a BSONElement and returns a
69  * LiteParsedDocumentSource. This is used for checks that need to happen before a full parse,
70  * such as checks about which namespaces are referenced by this aggregation.
71  *
72  * 'fullParser' takes a BSONElement and an ExpressionContext and returns a fully-executable
73  * DocumentSource. This will be used for optimization and execution.
74  *
75  * Stages that do not require any special pre-parse checks can use
76  * LiteParsedDocumentSourceDefault::parse as their 'liteParser'.
77  *
78  * As an example, if your stage DocumentSourceFoo looks like {$foo: <args>} and does *not* require
79  * any special pre-parse checks, you should implement a static parser like
80  * DocumentSourceFoo::createFromBson(), and register it like so:
81  * REGISTER_DOCUMENT_SOURCE(foo,
82  *                          LiteParsedDocumentSourceDefault::parse,
83  *                          DocumentSourceFoo::createFromBson);
84  *
85  * If your stage is actually an alias which needs to return more than one stage (such as
86  * $sortByCount), you should use the REGISTER_MULTI_STAGE_ALIAS macro instead.
87  */
88 #define REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, ...)             \
89     MONGO_INITIALIZER(addToDocSourceParserMap_##key)(InitializerContext*) {                  \
90         if (!__VA_ARGS__) {                                                                  \
91             return Status::OK();                                                             \
92         }                                                                                    \
93         auto fullParserWrapper = [](BSONElement stageSpec,                                   \
94                                     const boost::intrusive_ptr<ExpressionContext>& expCtx) { \
95             return std::list<boost::intrusive_ptr<DocumentSource>>{                          \
96                 (fullParser)(stageSpec, expCtx)};                                            \
97         };                                                                                   \
98         LiteParsedDocumentSource::registerParser("$" #key, liteParser);                      \
99         DocumentSource::registerParser("$" #key, fullParserWrapper);                         \
100         return Status::OK();                                                                 \
101     }
102 
103 #define REGISTER_DOCUMENT_SOURCE(key, liteParser, fullParser) \
104     REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, true)
105 
106 #define REGISTER_TEST_DOCUMENT_SOURCE(key, liteParser, fullParser) \
107     REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(                        \
108         key, liteParser, fullParser, Command::testCommandsEnabled)
109 
110 /**
111  * Registers a multi-stage alias (such as $sortByCount) to have the single name 'key'. When a stage
112  * with name '$key' is found, 'liteParser' will be used to produce a LiteParsedDocumentSource,
113  * while 'fullParser' will be called to construct a vector of DocumentSources. See the comments on
114  * REGISTER_DOCUMENT_SOURCE for more information.
115  *
116  * As an example, if your stage alias looks like {$foo: <args>} and does *not* require any special
117  * pre-parse checks, you should implement a static parser like DocumentSourceFoo::createFromBson(),
118  * and register it like so:
119  * REGISTER_MULTI_STAGE_ALIAS(foo,
120  *                            LiteParsedDocumentSourceDefault::parse,
121  *                            DocumentSourceFoo::createFromBson);
122  */
123 #define REGISTER_MULTI_STAGE_ALIAS(key, liteParser, fullParser)                  \
124     MONGO_INITIALIZER(addAliasToDocSourceParserMap_##key)(InitializerContext*) { \
125         LiteParsedDocumentSource::registerParser("$" #key, (liteParser));        \
126         DocumentSource::registerParser("$" #key, (fullParser));                  \
127         return Status::OK();                                                     \
128     }
129 
130 class DocumentSource : public IntrusiveCounterUnsigned {
131 public:
132     using Parser = stdx::function<std::list<boost::intrusive_ptr<DocumentSource>>(
133         BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
134 
135     /**
136      * A struct describing various constraints about where this stage can run, where it must be in
137      * the pipeline, what resources it may require, etc.
138      */
139     struct StageConstraints {
140         /**
141          * A StreamType defines whether this stage is streaming (can produce output based solely on
142          * the current input document) or blocking (must examine subsequent documents before
143          * producing an output document).
144          */
145         enum class StreamType { kStreaming, kBlocking };
146 
147         /**
148          * A PositionRequirement stipulates what specific position the stage must occupy within the
149          * pipeline, if any.
150          */
151         enum class PositionRequirement { kNone, kFirst, kLast };
152 
153         /**
154          * A HostTypeRequirement defines where this stage is permitted to be executed when the
155          * pipeline is run on a sharded cluster.
156          */
157         enum class HostTypeRequirement {
158             // Indicates that the stage can run either on mongoD or mongoS.
159             kNone,
160             // Indicates that the stage must run on the host to which it was originally sent and
161             // cannot be forwarded from mongoS to the shards.
162             kLocalOnly,
163             // Indicates that the stage must run on the primary shard.
164             kPrimaryShard,
165             // Indicates that the stage must run on any participating shard.
166             kAnyShard,
167             // Indicates that the stage can only run on mongoS.
168             kMongoS,
169         };
170 
171         /**
172          * A DiskUseRequirement indicates whether this stage writes permanent data to disk, or
173          * whether it may spill temporary data to disk if its memory usage exceeds a given
174          * threshold. Note that this only indicates that the stage has the ability to spill; if
175          * 'allowDiskUse' is set to false, it will be prevented from doing so.
176          */
177         enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData };
178 
179         /**
180          * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream
181          * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is
182          * blacklisted from $changeStream.
183          */
184         enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist };
185 
186         /**
187          * A FacetRequirement indicates whether this stage may be used within a $facet pipeline.
188          */
189         enum class FacetRequirement { kAllowed, kNotAllowed };
190 
191         StageConstraints(
192             StreamType streamType,
193             PositionRequirement requiredPosition,
194             HostTypeRequirement hostRequirement,
195             DiskUseRequirement diskRequirement,
196             FacetRequirement facetRequirement,
197             ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist)
requiredPositionStageConstraints198             : requiredPosition(requiredPosition),
199               hostRequirement(hostRequirement),
200               diskRequirement(diskRequirement),
201               changeStreamRequirement(changeStreamRequirement),
202               facetRequirement(facetRequirement),
203               streamType(streamType) {
204             // Stages which are allowed to run in $facet must not have any position requirements.
205             invariant(
206                 !(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone));
207 
208             // No change stream stages are permitted to run in a $facet pipeline.
209             invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage()));
210 
211             // Only streaming stages are permitted in $changeStream pipelines.
212             invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking));
213 
214             // A stage which is whitelisted for $changeStream cannot have a requirement to run on a
215             // shard, since it needs to be able to run on mongoS in a cluster.
216             invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
217                         (hostRequirement == HostTypeRequirement::kAnyShard ||
218                          hostRequirement == HostTypeRequirement::kPrimaryShard)));
219 
220             // A stage which is whitelisted for $changeStream cannot have a position requirement.
221             invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
222                         requiredPosition != PositionRequirement::kNone));
223         }
224 
225         /**
226          * Returns the literal HostTypeRequirement used to initialize the StageConstraints, or the
227          * effective HostTypeRequirement (kAnyShard or kMongoS) if kLocalOnly was specified.
228          */
resolvedHostTypeRequirementStageConstraints229         HostTypeRequirement resolvedHostTypeRequirement(
230             const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
231             return (hostRequirement != HostTypeRequirement::kLocalOnly
232                         ? hostRequirement
233                         : (expCtx->inMongos ? HostTypeRequirement::kMongoS
234                                             : HostTypeRequirement::kAnyShard));
235         }
236 
237         /**
238          * True if this stage must run on the same host to which it was originally sent.
239          */
mustRunLocallyStageConstraints240         bool mustRunLocally() const {
241             return hostRequirement == HostTypeRequirement::kLocalOnly;
242         }
243 
244         /**
245          * True if this stage is permitted to run in a $facet pipeline.
246          */
isAllowedInsideFacetStageStageConstraints247         bool isAllowedInsideFacetStage() const {
248             return facetRequirement == FacetRequirement::kAllowed;
249         }
250 
251         /**
252          * True if this stage is permitted to run in a pipeline which starts with $changeStream.
253          */
isAllowedInChangeStreamStageConstraints254         bool isAllowedInChangeStream() const {
255             return changeStreamRequirement != ChangeStreamRequirement::kBlacklist;
256         }
257 
258         /**
259          * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed
260          * to run in a pipeline which begins with $changeStream.
261          */
isChangeStreamStageStageConstraints262         bool isChangeStreamStage() const {
263             return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage;
264         }
265 
266         // Indicates whether this stage needs to be at a particular position in the pipeline.
267         const PositionRequirement requiredPosition;
268 
269         // Indicates whether this stage can only be executed on specific components of a sharded
270         // cluster.
271         const HostTypeRequirement hostRequirement;
272 
273         // Indicates whether this stage may write persistent data to disk, or may spill to temporary
274         // files if its memory usage becomes excessive.
275         const DiskUseRequirement diskRequirement;
276 
277         // Indicates whether this stage is itself a $changeStream stage, or if not whether it may
278         // exist in a pipeline which begins with $changeStream.
279         const ChangeStreamRequirement changeStreamRequirement;
280 
281         // Indicates whether this stage may run inside a $facet stage.
282         const FacetRequirement facetRequirement;
283 
284         // Indicates whether this is a streaming or blocking stage.
285         const StreamType streamType;
286 
287         // True if this stage does not generate results itself, and instead pulls inputs from an
288         // input DocumentSource (via 'pSource').
289         bool requiresInputDocSource = true;
290 
291         // True if this stage operates on a global or database level, like $currentOp.
292         bool isIndependentOfAnyCollection = false;
293 
294         // True if this stage can ever be safely swapped with a subsequent $match stage, provided
295         // that the match does not depend on the paths returned by getModifiedPaths().
296         //
297         // Stages that want to participate in match swapping should set this to true. Such a stage
298         // must also override getModifiedPaths() to provide information about which particular
299         // $match predicates be swapped before itself.
300         bool canSwapWithMatch = false;
301     };
302 
303     using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement;
304     using HostTypeRequirement = StageConstraints::HostTypeRequirement;
305     using PositionRequirement = StageConstraints::PositionRequirement;
306     using DiskUseRequirement = StageConstraints::DiskUseRequirement;
307     using FacetRequirement = StageConstraints::FacetRequirement;
308     using StreamType = StageConstraints::StreamType;
309 
310     /**
311      * This is what is returned from the main DocumentSource API: getNext(). It is essentially a
312      * (ReturnStatus, Document) pair, with the first entry being used to communicate information
313      * about the execution of the DocumentSource, such as whether or not it has been exhausted.
314      */
315     class GetNextResult {
316     public:
317         enum class ReturnStatus {
318             // There is a result to be processed.
319             kAdvanced,
320             // There will be no further results.
321             kEOF,
322             // There is not a result to be processed yet, but there may be more results in the
323             // future. If a DocumentSource retrieves this status from its child, it must propagate
324             // it without doing any further work.
325             kPauseExecution,
326         };
327 
makeEOF()328         static GetNextResult makeEOF() {
329             return GetNextResult(ReturnStatus::kEOF);
330         }
331 
makePauseExecution()332         static GetNextResult makePauseExecution() {
333             return GetNextResult(ReturnStatus::kPauseExecution);
334         }
335 
336         /**
337          * Shortcut constructor for the common case of creating an 'advanced' GetNextResult from the
338          * given 'result'. Accepts only an rvalue reference as an argument, since DocumentSources
339          * will want to move 'result' into this GetNextResult, and should have to opt in to making a
340          * copy.
341          */
GetNextResult(Document && result)342         /* implicit */ GetNextResult(Document&& result)
343             : _status(ReturnStatus::kAdvanced), _result(std::move(result)) {}
344 
345         /**
346          * Gets the result document. It is an error to call this if isAdvanced() returns false.
347          */
getDocument()348         const Document& getDocument() const {
349             dassert(isAdvanced());
350             return _result;
351         }
352 
353         /**
354          * Releases the result document, transferring ownership to the caller. It is an error to
355          * call this if isAdvanced() returns false.
356          */
releaseDocument()357         Document releaseDocument() {
358             dassert(isAdvanced());
359             return std::move(_result);
360         }
361 
getStatus()362         ReturnStatus getStatus() const {
363             return _status;
364         }
365 
isAdvanced()366         bool isAdvanced() const {
367             return _status == ReturnStatus::kAdvanced;
368         }
369 
isEOF()370         bool isEOF() const {
371             return _status == ReturnStatus::kEOF;
372         }
373 
isPaused()374         bool isPaused() const {
375             return _status == ReturnStatus::kPauseExecution;
376         }
377 
378     private:
GetNextResult(ReturnStatus status)379         GetNextResult(ReturnStatus status) : _status(status) {}
380 
381         ReturnStatus _status;
382         Document _result;
383     };
384 
~DocumentSource()385     virtual ~DocumentSource() {}
386 
387     /**
388      * The main execution API of a DocumentSource. Returns an intermediate query result generated by
389      * this DocumentSource.
390      *
391      * All implementers must call pExpCtx->checkForInterrupt().
392      *
393      * For performance reasons, a streaming stage must not keep references to documents across calls
394      * to getNext(). Such stages must retrieve a result from their child and then release it (or
395      * return it) before asking for another result. Failing to do so can result in extra work, since
396      * the Document/Value library must copy data on write when that data has a refcount above one.
397      */
398     virtual GetNextResult getNext() = 0;
399 
400     /**
401      * Returns a struct containing information about any special constraints imposed on using this
402      * stage. Input parameter Pipeline::SplitState is used by stages whose requirements change
403      * depending on whether they are in a split or unsplit pipeline.
404      */
405     virtual StageConstraints constraints(
406         Pipeline::SplitState = Pipeline::SplitState::kUnsplit) const = 0;
407 
408     /**
409      * Informs the stage that it is no longer needed and can release its resources. After dispose()
410      * is called the stage must still be able to handle calls to getNext(), but can return kEOF.
411      *
412      * This is a non-virtual public interface to ensure dispose() is threaded through the entire
413      * pipeline. Subclasses should override doDispose() to implement their disposal.
414      */
dispose()415     void dispose() {
416         doDispose();
417         if (pSource) {
418             pSource->dispose();
419         }
420     }
421 
422     /**
423      * Get the stage's name.
424      */
425     virtual const char* getSourceName() const;
426 
427     /**
428      * Set the underlying source this source should use to get Documents from. Must not throw
429      * exceptions.
430      */
setSource(DocumentSource * source)431     virtual void setSource(DocumentSource* source) {
432         pSource = source;
433     }
434 
435     /**
436      * In the default case, serializes the DocumentSource and adds it to the std::vector<Value>.
437      *
438      * A subclass may choose to overwrite this, rather than serialize, if it should output multiple
439      * stages (eg, $sort sometimes also outputs a $limit).
440      *
441      * The 'explain' parameter indicates the explain verbosity mode, or is equal boost::none if no
442      * explain is requested.
443      */
444     virtual void serializeToArray(
445         std::vector<Value>& array,
446         boost::optional<ExplainOptions::Verbosity> explain = boost::none) const;
447 
448     /**
449      * If DocumentSource uses additional collections, it adds the namespaces to the input vector.
450      */
addInvolvedCollections(std::vector<NamespaceString> * collections)451     virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {}
452 
detachFromOperationContext()453     virtual void detachFromOperationContext() {}
454 
reattachToOperationContext(OperationContext * opCtx)455     virtual void reattachToOperationContext(OperationContext* opCtx) {}
456 
457     /**
458      * Create a DocumentSource pipeline stage from 'stageObj'.
459      */
460     static std::list<boost::intrusive_ptr<DocumentSource>> parse(
461         const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj stageObj);
462 
463     /**
464      * Registers a DocumentSource with a parsing function, so that when a stage with the given name
465      * is encountered, it will call 'parser' to construct that stage.
466      *
467      * DO NOT call this method directly. Instead, use the REGISTER_DOCUMENT_SOURCE macro defined in
468      * this file.
469      */
470     static void registerParser(std::string name, Parser parser);
471 
472     /**
473      * Given a BSONObj, construct a BSONObjSet consisting of all prefixes of that object. For
474      * example, given {a: 1, b: 1, c: 1}, this will return a set: {{a: 1}, {a: 1, b: 1}, {a: 1, b:
475      * 1, c: 1}}.
476      */
477     static BSONObjSet allPrefixes(BSONObj obj);
478 
479     /**
480      * Given a BSONObjSet, where each BSONObj represents a sort key, return the BSONObjSet that
481      * results from truncating each sort key before the first path that is a member of 'fields', or
482      * is a child of a member of 'fields'.
483      */
484     static BSONObjSet truncateSortSet(const BSONObjSet& sorts, const std::set<std::string>& fields);
485 
486     //
487     // Optimization API - These methods give each DocumentSource an opportunity to apply any local
488     // optimizations, and to provide any rule-based optimizations to swap with or absorb subsequent
489     // stages.
490     //
491 
492     /**
493      * The non-virtual public interface for optimization. Attempts to do some generic optimizations
494      * such as pushing $matches as early in the pipeline as possible, then calls out to
495      * doOptimizeAt() for stage-specific optimizations.
496      *
497      * Subclasses should override doOptimizeAt() if they can apply some optimization(s) based on
498      * subsequent stages in the pipeline.
499      */
500     Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
501                                                    Pipeline::SourceContainer* container);
502 
503     /**
504      * Returns an optimized DocumentSource that is semantically equivalent to this one, or
505      * nullptr if this stage is a no-op. Implementations are allowed to modify themselves
506      * in-place and return a pointer to themselves. For best results, first optimize the pipeline
507      * with the optimizePipeline() method defined in pipeline.cpp.
508      *
509      * This is intended for any operations that include expressions, and provides a hook for
510      * those to optimize those operations.
511      *
512      * The default implementation is to do nothing and return yourself.
513      */
514     virtual boost::intrusive_ptr<DocumentSource> optimize();
515 
516     //
517     // Property Analysis - These methods allow a DocumentSource to expose information about
518     // properties of themselves, such as which fields they need to apply their transformations, and
519     // whether or not they produce or preserve a sort order.
520     //
521     // Property analysis can be useful during optimization (e.g. analysis of sort orders determines
522     // whether or not a blocking group can be upgraded to a streaming group).
523     //
524 
525     /**
526      * Gets a BSONObjSet representing the sort order(s) of the output of the stage.
527      */
getOutputSorts()528     virtual BSONObjSet getOutputSorts() {
529         return SimpleBSONObjComparator::kInstance.makeBSONObjSet();
530     }
531 
532     struct GetModPathsReturn {
533         enum class Type {
534             // No information is available about which paths are modified.
535             kNotSupported,
536 
537             // All fields will be modified. This should be used by stages like $replaceRoot which
538             // modify the entire document.
539             kAllPaths,
540 
541             // A finite set of paths will be modified by this stage. This is true for something like
542             // {$project: {a: 0, b: 0}}, which will only modify 'a' and 'b', and leave all other
543             // paths unmodified.
544             kFiniteSet,
545 
546             // This stage will modify an infinite set of paths, but we know which paths it will not
547             // modify. For example, the stage {$project: {_id: 1, a: 1}} will leave only the fields
548             // '_id' and 'a' unmodified, but all other fields will be projected out.
549             kAllExcept,
550         };
551 
GetModPathsReturnGetModPathsReturn552         GetModPathsReturn(Type type,
553                           std::set<std::string>&& paths,
554                           StringMap<std::string>&& renames)
555             : type(type), paths(std::move(paths)), renames(std::move(renames)) {}
556 
557         Type type;
558         std::set<std::string> paths;
559 
560         // Stages may fill out 'renames' to contain information about path renames. Each entry in
561         // 'renames' maps from the new name of the path (valid in documents flowing *out* of this
562         // stage) to the old name of the path (valid in documents flowing *into* this stage).
563         //
564         // For example, consider the stage
565         //
566         //   {$project: {_id: 0, a: 1, b: "$c"}}
567         //
568         // This stage should return kAllExcept, since it modifies all paths other than "a". It can
569         // also fill out 'renames' with the mapping "b" => "c".
570         StringMap<std::string> renames;
571     };
572 
573     /**
574      * Returns information about which paths are added, removed, or updated by this stage. The
575      * default implementation uses kNotSupported to indicate that the set of modified paths for this
576      * stage is not known.
577      *
578      * See GetModPathsReturn above for the possible return values and what they mean.
579      */
getModifiedPaths()580     virtual GetModPathsReturn getModifiedPaths() const {
581         return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}};
582     }
583 
584     enum GetDepsReturn {
585         // The full object and all metadata may be required.
586         NOT_SUPPORTED = 0x0,
587 
588         // Later stages could need either fields or metadata. For example, a $limit stage will pass
589         // through all fields, and they may or may not be needed by future stages.
590         SEE_NEXT = 0x1,
591 
592         // Later stages won't need more fields from input. For example, an inclusion projection like
593         // {_id: 1, a: 1} will only output two fields, so future stages cannot possibly depend on
594         // any other fields.
595         EXHAUSTIVE_FIELDS = 0x2,
596 
597         // Later stages won't need more metadata from input. For example, a $group stage will group
598         // documents together, discarding their text score and sort keys.
599         EXHAUSTIVE_META = 0x4,
600 
601         // Later stages won't need either fields or metadata.
602         EXHAUSTIVE_ALL = EXHAUSTIVE_FIELDS | EXHAUSTIVE_META,
603     };
604 
605     /**
606      * Get the dependencies this operation needs to do its job. If overridden, subclasses must add
607      * all paths needed to apply their transformation to 'deps->fields', and call
608      * 'deps->setNeedTextScore()' if the text score is required.
609      *
610      * See GetDepsReturn above for the possible return values and what they mean.
611      */
getDependencies(DepsTracker * deps)612     virtual GetDepsReturn getDependencies(DepsTracker* deps) const {
613         return NOT_SUPPORTED;
614     }
615 
616 protected:
617     explicit DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
618 
619     /**
620      * Attempt to perform an optimization with the following source in the pipeline. 'container'
621      * refers to the entire pipeline, and 'itr' points to this stage within the pipeline. The caller
622      * must guarantee that std::next(itr) != container->end().
623      *
624      * The return value is an iterator over the same container which points to the first location
625      * in the container at which an optimization may be possible.
626      *
627      * For example, if a swap takes place, the returned iterator should just be the position
628      * directly preceding 'itr', if such a position exists, since the stage at that position may be
629      * able to perform further optimizations with its new neighbor.
630      */
doOptimizeAt(Pipeline::SourceContainer::iterator itr,Pipeline::SourceContainer * container)631     virtual Pipeline::SourceContainer::iterator doOptimizeAt(
632         Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
633         return std::next(itr);
634     };
635 
636     /**
637      * Release any resources held by this stage. After doDispose() is called the stage must still be
638      * able to handle calls to getNext(), but can return kEOF.
639      */
doDispose()640     virtual void doDispose() {}
641 
642     /*
643       Most DocumentSources have an underlying source they get their data
644       from.  This is a convenience for them.
645 
646       The default implementation of setSource() sets this; if you don't
647       need a source, override that to verify().  The default is to
648       verify() if this has already been set.
649     */
650     DocumentSource* pSource;
651 
652     boost::intrusive_ptr<ExpressionContext> pExpCtx;
653 
654 private:
655     /**
656      * Create a Value that represents the document source.
657      *
658      * This is used by the default implementation of serializeToArray() to add this object
659      * to a pipeline being serialized. Returning a missing() Value results in no entry
660      * being added to the array for this stage (DocumentSource).
661      *
662      * The 'explain' parameter indicates the explain verbosity mode, or is equal boost::none if no
663      * explain is requested.
664      */
665     virtual Value serialize(
666         boost::optional<ExplainOptions::Verbosity> explain = boost::none) const = 0;
667 };
668 
669 /**
670  * This class marks DocumentSources that should be split between the merger and the shards. See
671  * Pipeline::Optimizations::Sharded::findSplitPoint() for details.
672  */
673 class SplittableDocumentSource {
674 public:
675     /**
676      * Returns a source to be run on the shards, or NULL if no work should be done on the shards for
677      * this stage. Must not mutate the existing source object; if different behaviour is required in
678      * the split-pipeline case, a new source should be created and configured appropriately. It is
679      * an error for getShardSource() to return a pointer to the same object as getMergeSource(),
680      * since this can result in the source being stitched into both the shard and merge pipelines
681      * when the latter is executed on mongoS.
682      */
683     virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0;
684 
685     /**
686      * Returns a list of stages that combine results from the shards, or an empty list if no work
687      * should be done in the merge pipeline for this stage. Must not mutate the existing source
688      * object; if different behaviour is required, a new source should be created and configured
689      * appropriately. It is an error for getMergeSources() to return a pointer to the same object as
690      * getShardSource().
691      */
692     virtual std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() = 0;
693 
694 protected:
695     // It is invalid to delete through a SplittableDocumentSource-typed pointer.
~SplittableDocumentSource()696     virtual ~SplittableDocumentSource() {}
697 };
698 
699 
700 /**
701  * This class marks DocumentSources which need functionality specific to a mongos or a mongod. It
702  * causes a MongodProcessInterface to be injected when in a mongod and a MongosProcessInterface when
703  * in a mongos.
704  */
705 class DocumentSourceNeedsMongoProcessInterface : public DocumentSource {
706 public:
707     /**
708      * Any functionality needed by an aggregation stage that is either context specific to a mongod
709      * or mongos process, or is only compiled in to one of those two binaries must be accessed via
710      * this interface. This allows all DocumentSources to be parsed on either mongos or mongod, but
711      * only executable where it makes sense.
712      */
713     class MongoProcessInterface {
714     public:
715         enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle };
716         enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
717         enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps };
718 
719         struct MakePipelineOptions {
MakePipelineOptionsMakePipelineOptions720             MakePipelineOptions(){};
721 
722             bool optimize = true;
723             bool attachCursorSource = true;
724 
725             // Ordinarily, a MongoProcessInterface is injected into the pipeline at the point
726             // when the cursor source is added. If true, 'forceInjectMongoProcessInterface' will
727             // inject MongoProcessInterfaces into the pipeline even if 'attachCursorSource' is
728             // false. If 'attachCursorSource' is true, then the value of
729             // 'forceInjectMongoProcessInterface' is irrelevant.
730             bool forceInjectMongoProcessInterface = false;
731         };
732 
~MongoProcessInterface()733         virtual ~MongoProcessInterface(){};
734 
735         /**
736          * Sets the OperationContext of the DBDirectClient returned by directClient(). This method
737          * must be called after updating the 'opCtx' member of the ExpressionContext associated with
738          * the document source.
739          */
740         virtual void setOperationContext(OperationContext* opCtx) = 0;
741 
742         /**
743          * Always returns a DBDirectClient. The return type in the function signature is a
744          * DBClientBase* because DBDirectClient isn't linked into mongos.
745          */
746         virtual DBClientBase* directClient() = 0;
747 
748         // Note that in some rare cases this could return a false negative but will never return
749         // a false positive. This method will be fixed in the future once it becomes possible to
750         // avoid false negatives.
751         virtual bool isSharded(const NamespaceString& ns) = 0;
752 
753         /**
754          * Inserts 'objs' into 'ns' and returns the "detailed" last error object.
755          */
756         virtual BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) = 0;
757 
758         virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
759                                                       const NamespaceString& ns) = 0;
760 
761         /**
762          * Appends operation latency statistics for collection "nss" to "builder"
763          */
764         virtual void appendLatencyStats(const NamespaceString& nss,
765                                         bool includeHistograms,
766                                         BSONObjBuilder* builder) const = 0;
767 
768         /**
769          * Appends storage statistics for collection "nss" to "builder"
770          */
771         virtual Status appendStorageStats(const NamespaceString& nss,
772                                           const BSONObj& param,
773                                           BSONObjBuilder* builder) const = 0;
774 
775         /**
776          * Appends the record count for collection "nss" to "builder".
777          */
778         virtual Status appendRecordCount(const NamespaceString& nss,
779                                          BSONObjBuilder* builder) const = 0;
780 
781         /**
782          * Gets the collection options for the collection given by 'nss'.
783          */
784         virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0;
785 
786         /**
787          * Performs the given rename command if the collection given by 'targetNs' has the same
788          * options as specified in 'originalCollectionOptions', and has the same indexes as
789          * 'originalIndexes'.
790          */
791         virtual Status renameIfOptionsAndIndexesHaveNotChanged(
792             const BSONObj& renameCommandObj,
793             const NamespaceString& targetNs,
794             const BSONObj& originalCollectionOptions,
795             const std::list<BSONObj>& originalIndexes) = 0;
796 
797         /**
798          * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of
799          * the returned pipeline will depend upon the supplied MakePipelineOptions:
800          * - The boolean opts.optimize determines whether the pipeline will be optimized.
801          * - If opts.attachCursorSource is false, the pipeline will be returned without attempting
802          *   to add an initial cursor source.
803          * - If opts.forceInjectMongoProcessInterface is true, then a MongoProcessInterface will be
804          *   provided to each stage which requires one, regardless of whether a cursor source is
805          *   attached to the pipeline.
806          *
807          * This function returns a non-OK status if parsing the pipeline failed.
808          */
809         virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
810             const std::vector<BSONObj>& rawPipeline,
811             const boost::intrusive_ptr<ExpressionContext>& expCtx,
812             const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
813 
814         /**
815          * Attaches a cursor source to the start of a pipeline. Performs no further optimization.
816          * This function asserts if the collection to be aggregated is sharded. NamespaceNotFound
817          * will be returned if ExpressionContext has a UUID and that UUID doesn't exist anymore.
818          * That should be the only case where NamespaceNotFound is returned.
819          */
820         virtual Status attachCursorSourceToPipeline(
821             const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
822 
823         /**
824          * Returns a vector of owned BSONObjs, each of which contains details of an in-progress
825          * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report
826          * operations for all authenticated users; otherwise, report only the current user's
827          * operations.
828          */
829         virtual std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
830                                                    CurrentOpUserMode userMode,
831                                                    CurrentOpTruncateMode) const = 0;
832 
833         /**
834          * Returns the name of the local shard if sharding is enabled, or an empty string.
835          */
836         virtual std::string getShardName(OperationContext* opCtx) const = 0;
837 
838         /**
839          * Returns the fields of the document key (in order) for the current collection, including
840          * the shard key and _id.  If _id is not in the shard key, it is added last.
841          */
842         virtual std::vector<FieldPath> collectDocumentKeyFields(UUID) const = 0;
843 
844         /**
845          * Returns zero or one documents with the document key 'documentKey'. 'documentKey' is
846          * treated as a unique identifier of a document, and may include an _id or all fields from
847          * the shard key and an _id. Throws if more than one match was found. Returns boost::none if
848          * no matching documents were found, including cases where the given namespace does not
849          * exist.
850          */
851         virtual boost::optional<Document> lookupSingleDocument(
852             const NamespaceString& nss,
853             UUID collectionUUID,
854             const Document& documentKey,
855             boost::optional<BSONObj> readConcern) = 0;
856 
857         // Add new methods as needed.
858     };
859 
DocumentSourceNeedsMongoProcessInterface(const boost::intrusive_ptr<ExpressionContext> & expCtx)860     DocumentSourceNeedsMongoProcessInterface(const boost::intrusive_ptr<ExpressionContext>& expCtx)
861         : DocumentSource(expCtx) {}
862 
injectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface)863     void injectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {
864         _mongoProcessInterface = mongoProcessInterface;
865         doInjectMongoProcessInterface(mongoProcessInterface);
866     }
867 
868     /**
869      * Derived classes may override this method to register custom inject functionality.
870      */
doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface)871     virtual void doInjectMongoProcessInterface(
872         std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {}
873 
detachFromOperationContext()874     void detachFromOperationContext() override {
875         invariant(_mongoProcessInterface);
876         _mongoProcessInterface->setOperationContext(nullptr);
877         doDetachFromOperationContext();
878     }
879 
880     /**
881      * Derived classes may override this method to register custom detach functionality.
882      */
doDetachFromOperationContext()883     virtual void doDetachFromOperationContext() {}
884 
reattachToOperationContext(OperationContext * opCtx)885     void reattachToOperationContext(OperationContext* opCtx) final {
886         invariant(_mongoProcessInterface);
887         _mongoProcessInterface->setOperationContext(opCtx);
888         doReattachToOperationContext(opCtx);
889     }
890 
891     /**
892      * Derived classes may override this method to register custom reattach functionality.
893      */
doReattachToOperationContext(OperationContext * opCtx)894     virtual void doReattachToOperationContext(OperationContext* opCtx) {}
895 
896 protected:
897     // It is invalid to delete through a DocumentSourceNeedsMongoProcessInterface-typed pointer.
~DocumentSourceNeedsMongoProcessInterface()898     virtual ~DocumentSourceNeedsMongoProcessInterface() {}
899 
900     // Gives subclasses access to a MongoProcessInterface implementation
901     std::shared_ptr<MongoProcessInterface> _mongoProcessInterface;
902 };
903 
904 
905 }  // namespace mongo
906