1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 // This file defines functions from both of these headers
34 #include "mongo/db/pipeline/pipeline.h"
35 #include "mongo/db/pipeline/pipeline_optimizations.h"
36 
37 #include <algorithm>
38 
39 #include "mongo/base/error_codes.h"
40 #include "mongo/db/bson/dotted_path_support.h"
41 #include "mongo/db/catalog/document_validation.h"
42 #include "mongo/db/jsobj.h"
43 #include "mongo/db/operation_context.h"
44 #include "mongo/db/pipeline/accumulator.h"
45 #include "mongo/db/pipeline/document.h"
46 #include "mongo/db/pipeline/document_source.h"
47 #include "mongo/db/pipeline/document_source_geo_near.h"
48 #include "mongo/db/pipeline/document_source_match.h"
49 #include "mongo/db/pipeline/document_source_merge_cursors.h"
50 #include "mongo/db/pipeline/document_source_out.h"
51 #include "mongo/db/pipeline/document_source_project.h"
52 #include "mongo/db/pipeline/document_source_unwind.h"
53 #include "mongo/db/pipeline/expression.h"
54 #include "mongo/db/pipeline/expression_context.h"
55 #include "mongo/util/mongoutils/str.h"
56 
57 namespace mongo {
58 
59 using boost::intrusive_ptr;
60 using std::endl;
61 using std::ostringstream;
62 using std::string;
63 using std::vector;
64 
65 namespace dps = ::mongo::dotted_path_support;
66 
67 using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement;
68 using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
69 using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
70 using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
71 using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement;
72 using StreamType = DocumentSource::StageConstraints::StreamType;
73 
Pipeline(const intrusive_ptr<ExpressionContext> & pTheCtx)74 Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
75 
Pipeline(SourceContainer stages,const intrusive_ptr<ExpressionContext> & expCtx)76 Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx)
77     : _sources(std::move(stages)), pCtx(expCtx) {}
78 
~Pipeline()79 Pipeline::~Pipeline() {
80     invariant(_disposed);
81 }
82 
parse(const std::vector<BSONObj> & rawPipeline,const intrusive_ptr<ExpressionContext> & expCtx)83 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse(
84     const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
85     return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false);
86 }
87 
parseFacetPipeline(const std::vector<BSONObj> & rawPipeline,const intrusive_ptr<ExpressionContext> & expCtx)88 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseFacetPipeline(
89     const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
90     return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true);
91 }
92 
parseTopLevelOrFacetPipeline(const std::vector<BSONObj> & rawPipeline,const intrusive_ptr<ExpressionContext> & expCtx,const bool isFacetPipeline)93 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseTopLevelOrFacetPipeline(
94     const std::vector<BSONObj>& rawPipeline,
95     const intrusive_ptr<ExpressionContext>& expCtx,
96     const bool isFacetPipeline) {
97 
98     SourceContainer stages;
99 
100     for (auto&& stageObj : rawPipeline) {
101         auto parsedSources = DocumentSource::parse(expCtx, stageObj);
102         stages.insert(stages.end(), parsedSources.begin(), parsedSources.end());
103     }
104 
105     return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline);
106 }
107 
create(SourceContainer stages,const intrusive_ptr<ExpressionContext> & expCtx)108 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create(
109     SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
110     return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false);
111 }
112 
createFacetPipeline(SourceContainer stages,const intrusive_ptr<ExpressionContext> & expCtx)113 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::createFacetPipeline(
114     SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
115     return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true);
116 }
117 
createTopLevelOrFacetPipeline(SourceContainer stages,const intrusive_ptr<ExpressionContext> & expCtx,const bool isFacetPipeline)118 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::createTopLevelOrFacetPipeline(
119     SourceContainer stages,
120     const intrusive_ptr<ExpressionContext>& expCtx,
121     const bool isFacetPipeline) {
122     std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(std::move(stages), expCtx),
123                                                           Pipeline::Deleter(expCtx->opCtx));
124     try {
125         if (isFacetPipeline) {
126             pipeline->validateFacetPipeline();
127         } else {
128             pipeline->validatePipeline();
129         }
130     } catch (const DBException& ex) {
131         return ex.toStatus();
132     }
133 
134     pipeline->stitch();
135     return std::move(pipeline);
136 }
137 
validatePipeline() const138 void Pipeline::validatePipeline() const {
139     // Verify that the specified namespace is valid for the initial stage of this pipeline.
140     const NamespaceString& nss = pCtx->ns;
141 
142     if (_sources.empty()) {
143         if (nss.isCollectionlessAggregateNS()) {
144             uasserted(ErrorCodes::InvalidNamespace,
145                       "{aggregate: 1} is not valid for an empty pipeline.");
146         }
147     } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
148         // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
149         // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
150         const auto firstStageConstraints = _sources.front()->constraints(_splitState);
151 
152         if (nss.isCollectionlessAggregateNS() &&
153             !firstStageConstraints.isIndependentOfAnyCollection) {
154             uasserted(ErrorCodes::InvalidNamespace,
155                       str::stream() << "{aggregate: 1} is not valid for '"
156                                     << _sources.front()->getSourceName()
157                                     << "'; a collection is required.");
158         }
159 
160         if (!nss.isCollectionlessAggregateNS() &&
161             firstStageConstraints.isIndependentOfAnyCollection) {
162             uasserted(ErrorCodes::InvalidNamespace,
163                       str::stream() << "'" << _sources.front()->getSourceName()
164                                     << "' can only be run with {aggregate: 1}");
165         }
166 
167         // If the first stage is a $changeStream stage, then all stages in the pipeline must be
168         // either $changeStream stages or whitelisted as being able to run in a change stream.
169         if (firstStageConstraints.isChangeStreamStage()) {
170             for (auto&& source : _sources) {
171                 uassert(ErrorCodes::IllegalOperation,
172                         str::stream() << source->getSourceName()
173                                       << " is not permitted in a $changeStream pipeline",
174                         source->constraints(_splitState).isAllowedInChangeStream());
175             }
176         }
177     }
178 
179     // Verify that each stage is in a legal position within the pipeline.
180     ensureAllStagesAreInLegalPositions();
181 }
182 
validateFacetPipeline() const183 void Pipeline::validateFacetPipeline() const {
184     if (_sources.empty()) {
185         uasserted(ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty");
186     }
187 
188     for (auto&& stage : _sources) {
189         auto stageConstraints = stage->constraints(_splitState);
190         if (!stageConstraints.isAllowedInsideFacetStage()) {
191             uasserted(40600,
192                       str::stream() << stage->getSourceName()
193                                     << " is not allowed to be used within a $facet stage");
194         }
195         // We expect a stage within a $facet stage to have these properties.
196         invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
197         invariant(!stageConstraints.isIndependentOfAnyCollection);
198     }
199 
200     // Facet pipelines cannot have any stages which are initial sources. We've already validated the
201     // first stage, and the 'ensureAllStagesAreInLegalPositions' method checks that there are no
202     // initial sources in positions 1...N, so we can just return its result directly.
203     ensureAllStagesAreInLegalPositions();
204 }
205 
ensureAllStagesAreInLegalPositions() const206 void Pipeline::ensureAllStagesAreInLegalPositions() const {
207     size_t i = 0;
208     for (auto&& stage : _sources) {
209         auto constraints = stage->constraints(_splitState);
210 
211         // Verify that all stages adhere to their PositionRequirement constraints.
212         if (constraints.requiredPosition == PositionRequirement::kFirst && i != 0) {
213             uasserted(40602,
214                       str::stream() << stage->getSourceName()
215                                     << " is only valid as the first stage in a pipeline.");
216         }
217         auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
218         if (i != 0 && matchStage && matchStage->isTextQuery()) {
219             uasserted(17313, "$match with $text is only allowed as the first pipeline stage");
220         }
221 
222         if (constraints.requiredPosition == PositionRequirement::kLast &&
223             i != _sources.size() - 1) {
224             uasserted(40601,
225                       str::stream() << stage->getSourceName()
226                                     << " can only be the final stage in the pipeline");
227         }
228         ++i;
229 
230         // Verify that we are not attempting to run a mongoS-only stage on mongoD.
231         uassert(40644,
232                 str::stream() << stage->getSourceName() << " can only be run on mongoS",
233                 !(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos));
234     }
235 }
236 
optimizePipeline()237 void Pipeline::optimizePipeline() {
238     SourceContainer optimizedSources;
239 
240     SourceContainer::iterator itr = _sources.begin();
241 
242     // We could be swapping around stages during this process, so disconnect the pipeline to prevent
243     // us from entering a state with dangling pointers.
244     unstitch();
245     while (itr != _sources.end()) {
246         invariant((*itr).get());
247         itr = (*itr).get()->optimizeAt(itr, &_sources);
248     }
249 
250     // Once we have reached our final number of stages, optimize each individually.
251     for (auto&& source : _sources) {
252         if (auto out = source->optimize()) {
253             optimizedSources.push_back(out);
254         }
255     }
256     _sources.swap(optimizedSources);
257     stitch();
258 }
259 
aggSupportsWriteConcern(const BSONObj & cmd)260 bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) {
261     auto pipelineElement = cmd["pipeline"];
262     if (pipelineElement.type() != BSONType::Array) {
263         return false;
264     }
265 
266     for (auto stage : pipelineElement.Obj()) {
267         if (stage.type() != BSONType::Object) {
268             return false;
269         }
270 
271         if (stage.Obj().hasField("$out")) {
272             return true;
273         }
274     }
275 
276     return false;
277 }
278 
detachFromOperationContext()279 void Pipeline::detachFromOperationContext() {
280     pCtx->opCtx = nullptr;
281 
282     for (auto&& source : _sources) {
283         source->detachFromOperationContext();
284     }
285 }
286 
reattachToOperationContext(OperationContext * opCtx)287 void Pipeline::reattachToOperationContext(OperationContext* opCtx) {
288     pCtx->opCtx = opCtx;
289 
290     for (auto&& source : _sources) {
291         source->reattachToOperationContext(opCtx);
292     }
293 }
294 
dispose(OperationContext * opCtx)295 void Pipeline::dispose(OperationContext* opCtx) {
296     try {
297         pCtx->opCtx = opCtx;
298 
299         // Make sure all stages are connected, in case we are being disposed via an error path and
300         // were not stitched at the time of the error.
301         stitch();
302 
303         if (!_sources.empty()) {
304             _sources.back()->dispose();
305         }
306         _disposed = true;
307     } catch (...) {
308         std::terminate();
309     }
310 }
311 
splitForSharded()312 std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
313     invariant(!isSplitForShards());
314     invariant(!isSplitForMerge());
315     invariant(!_unsplitSources);
316 
317     // Create and initialize the shard spec we'll return. We start with an empty pipeline on the
318     // shards and all work being done in the merger. Optimizations can move operations between
319     // the pipelines to be more efficient.
320     std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipeline(new Pipeline(pCtx),
321                                                                Pipeline::Deleter(pCtx->opCtx));
322 
323     // Keep a copy of the original source list in case we need to reset the pipeline from split to
324     // unsplit later.
325     shardPipeline->_unsplitSources.emplace(_sources);
326 
327     // The order in which optimizations are applied can have significant impact on the
328     // efficiency of the final pipeline. Be Careful!
329     Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
330     Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
331     Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
332 
333     shardPipeline->_splitState = SplitState::kSplitForShards;
334     _splitState = SplitState::kSplitForMerge;
335 
336     stitch();
337 
338     return shardPipeline;
339 }
340 
unsplitFromSharded(std::unique_ptr<Pipeline,Pipeline::Deleter> pipelineForMergingShard)341 void Pipeline::unsplitFromSharded(
342     std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard) {
343     invariant(isSplitForShards());
344     invariant(!isSplitForMerge());
345     invariant(pipelineForMergingShard);
346     invariant(_unsplitSources);
347 
348     // Clear the merge source list so that destroying the pipeline object won't dispose of the
349     // stages. We still have a reference to each of the stages which will be moved back to the shard
350     // pipeline via '_unsplitSources'.
351     pipelineForMergingShard->_sources.clear();
352     pipelineForMergingShard.reset();
353 
354     // Set '_sources' to its original state, re-stitch, and clear the '_unsplitSources' optional.
355     _sources = *_unsplitSources;
356     _unsplitSources.reset();
357 
358     _splitState = SplitState::kUnsplit;
359 
360     stitch();
361 }
362 
findSplitPoint(Pipeline * shardPipe,Pipeline * mergePipe)363 void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
364     while (!mergePipe->_sources.empty()) {
365         intrusive_ptr<DocumentSource> current = mergePipe->_sources.front();
366         mergePipe->_sources.pop_front();
367 
368         // Check if this source is splittable.
369         SplittableDocumentSource* splittable =
370             dynamic_cast<SplittableDocumentSource*>(current.get());
371 
372         if (!splittable) {
373             // Move the source from the merger _sources to the shard _sources.
374             shardPipe->_sources.push_back(current);
375         } else {
376             // Split this source into 'merge' and 'shard' _sources.
377             intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
378             auto mergeSources = splittable->getMergeSources();
379 
380             // A source may not simultaneously be present on both sides of the split.
381             invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
382                       mergeSources.end());
383 
384             if (shardSource)
385                 shardPipe->_sources.push_back(shardSource);
386 
387             // Add the stages in reverse order, so that they appear in the pipeline in the same
388             // order as they were returned by the stage.
389             for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
390                 mergePipe->_sources.push_front(*it);
391             }
392 
393             break;
394         }
395     }
396 }
397 
moveFinalUnwindFromShardsToMerger(Pipeline * shardPipe,Pipeline * mergePipe)398 void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
399                                                                          Pipeline* mergePipe) {
400     while (!shardPipe->_sources.empty() &&
401            dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) {
402         mergePipe->_sources.push_front(shardPipe->_sources.back());
403         shardPipe->_sources.pop_back();
404     }
405 }
406 
limitFieldsSentFromShardsToMerger(Pipeline * shardPipe,Pipeline * mergePipe)407 void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
408                                                                          Pipeline* mergePipe) {
409     auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
410         ? DepsTracker::MetadataAvailable::kTextScore
411         : DepsTracker::MetadataAvailable::kNoMetadata;
412     DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
413     if (mergeDeps.needWholeDocument)
414         return;  // the merge needs all fields, so nothing we can do.
415 
416     // Empty project is "special" so if no fields are needed, we just ask for _id instead.
417     if (mergeDeps.fields.empty())
418         mergeDeps.fields.insert("_id");
419 
420     // Remove metadata from dependencies since it automatically flows through projection and we
421     // don't want to project it in to the document.
422     mergeDeps.setNeedTextScore(false);
423 
424     // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
425     // field dependencies. While this may not be 100% ideal in all cases, it is simple and
426     // avoids the worst cases by ensuring that:
427     // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
428     //    dependencies. This situation can happen when a $sort is before the first $project or
429     //    $group. Without the optimization, the shards would have to reify and transmit full
430     //    objects even though only a subset of fields are needed.
431     // 2) Optimization IS NOT applied immediately following a $project or $group since it would
432     //    add an unnecessary project (and therefore a deep-copy).
433     for (auto&& source : shardPipe->_sources) {
434         DepsTracker dt(depsMetadata);
435         if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
436             return;
437     }
438     // if we get here, add the project.
439     boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson(
440         BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx);
441     shardPipe->_sources.push_back(project);
442 }
443 
getInitialQuery() const444 BSONObj Pipeline::getInitialQuery() const {
445     if (_sources.empty())
446         return BSONObj();
447 
448     /* look for an initial $match */
449     DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(_sources.front().get());
450     if (match) {
451         return match->getQuery();
452     }
453 
454     DocumentSourceGeoNear* geoNear = dynamic_cast<DocumentSourceGeoNear*>(_sources.front().get());
455     if (geoNear) {
456         return geoNear->getQuery();
457     }
458 
459     return BSONObj();
460 }
461 
needsPrimaryShardMerger() const462 bool Pipeline::needsPrimaryShardMerger() const {
463     return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
464         return stage->constraints(SplitState::kSplitForMerge).hostRequirement ==
465             HostTypeRequirement::kPrimaryShard;
466     });
467 }
468 
needsMongosMerger() const469 bool Pipeline::needsMongosMerger() const {
470     return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
471         return stage->constraints(SplitState::kSplitForMerge).resolvedHostTypeRequirement(pCtx) ==
472             HostTypeRequirement::kMongoS;
473     });
474 }
475 
canRunOnMongos() const476 bool Pipeline::canRunOnMongos() const {
477     return _pipelineCanRunOnMongoS().isOK();
478 }
479 
requiredToRunOnMongos() const480 bool Pipeline::requiredToRunOnMongos() const {
481     invariant(!isSplitForShards());
482 
483     for (auto&& stage : _sources) {
484         // If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline
485         // as a whole is not required to run on mongoS.
486         if (isUnsplit() && dynamic_cast<SplittableDocumentSource*>(stage.get())) {
487             return false;
488         }
489 
490         auto hostRequirement = stage->constraints(_splitState).resolvedHostTypeRequirement(pCtx);
491 
492         // If a mongoS-only stage occurs before a splittable stage, or if the pipeline is already
493         // split, this entire pipeline must run on mongoS.
494         if (hostRequirement == HostTypeRequirement::kMongoS) {
495             // Verify that the remainder of this pipeline can run on mongoS.
496             auto mongosRunStatus = _pipelineCanRunOnMongoS();
497 
498             uassert(mongosRunStatus.code(),
499                     str::stream() << stage->getSourceName() << " must run on mongoS, but "
500                                   << mongosRunStatus.reason(),
501                     mongosRunStatus.isOK());
502 
503             return true;
504         }
505     }
506 
507     return false;
508 }
509 
getInvolvedCollections() const510 std::vector<NamespaceString> Pipeline::getInvolvedCollections() const {
511     std::vector<NamespaceString> collections;
512     for (auto&& source : _sources) {
513         source->addInvolvedCollections(&collections);
514     }
515     return collections;
516 }
517 
serialize() const518 vector<Value> Pipeline::serialize() const {
519     vector<Value> serializedSources;
520     for (auto&& source : _sources) {
521         source->serializeToArray(serializedSources);
522     }
523     return serializedSources;
524 }
525 
unstitch()526 void Pipeline::unstitch() {
527     for (auto&& stage : _sources) {
528         stage->setSource(nullptr);
529     }
530 }
531 
stitch()532 void Pipeline::stitch() {
533     if (_sources.empty()) {
534         return;
535     }
536     // Chain together all the stages.
537     DocumentSource* prevSource = _sources.front().get();
538     prevSource->setSource(nullptr);
539     for (SourceContainer::iterator iter(++_sources.begin()), listEnd(_sources.end());
540          iter != listEnd;
541          ++iter) {
542         intrusive_ptr<DocumentSource> pTemp(*iter);
543         pTemp->setSource(prevSource);
544         prevSource = pTemp.get();
545     }
546 }
547 
getNext()548 boost::optional<Document> Pipeline::getNext() {
549     invariant(!_sources.empty());
550     auto nextResult = _sources.back()->getNext();
551     while (nextResult.isPaused()) {
552         nextResult = _sources.back()->getNext();
553     }
554     return nextResult.isEOF() ? boost::none
555                               : boost::optional<Document>{nextResult.releaseDocument()};
556 }
557 
writeExplainOps(ExplainOptions::Verbosity verbosity) const558 vector<Value> Pipeline::writeExplainOps(ExplainOptions::Verbosity verbosity) const {
559     vector<Value> array;
560     for (SourceContainer::const_iterator it = _sources.begin(); it != _sources.end(); ++it) {
561         (*it)->serializeToArray(array, verbosity);
562     }
563     return array;
564 }
565 
addInitialSource(intrusive_ptr<DocumentSource> source)566 void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) {
567     if (!_sources.empty()) {
568         _sources.front()->setSource(source.get());
569     }
570     _sources.push_front(source);
571 }
572 
addFinalSource(intrusive_ptr<DocumentSource> source)573 void Pipeline::addFinalSource(intrusive_ptr<DocumentSource> source) {
574     if (!_sources.empty()) {
575         source->setSource(_sources.back().get());
576     }
577     _sources.push_back(source);
578 }
579 
getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const580 DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const {
581     DepsTracker deps(metadataAvailable);
582     const bool scopeHasVariables = pCtx->variablesParseState.hasDefinedVariables();
583     bool skipFieldsAndMetadataDeps = false;
584     bool knowAllFields = false;
585     bool knowAllMeta = false;
586     for (auto&& source : _sources) {
587         DepsTracker localDeps(deps.getMetadataAvailable());
588         DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps);
589 
590         deps.vars.insert(localDeps.vars.begin(), localDeps.vars.end());
591 
592         if ((skipFieldsAndMetadataDeps |= (status == DocumentSource::NOT_SUPPORTED))) {
593             // Assume this stage needs everything. We may still know something about our
594             // dependencies if an earlier stage returned EXHAUSTIVE_FIELDS or EXHAUSTIVE_META. If
595             // this scope has variables, we need to keep enumerating the remaining stages but will
596             // skip adding any further field or metadata dependencies.
597             if (scopeHasVariables) {
598                 continue;
599             } else {
600                 break;
601             }
602         }
603 
604         if (!knowAllFields) {
605             deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end());
606             if (localDeps.needWholeDocument)
607                 deps.needWholeDocument = true;
608             knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS;
609         }
610 
611         if (!knowAllMeta) {
612             if (localDeps.getNeedTextScore())
613                 deps.setNeedTextScore(true);
614 
615             if (localDeps.getNeedSortKey())
616                 deps.setNeedSortKey(true);
617 
618             knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
619         }
620 
621         // If there are variables defined at this pipeline's scope, there may be dependencies upon
622         // them in subsequent stages. Keep enumerating.
623         if (knowAllMeta && knowAllFields && !scopeHasVariables) {
624             break;
625         }
626     }
627 
628     if (!knowAllFields)
629         deps.needWholeDocument = true;  // don't know all fields we need
630 
631     if (metadataAvailable & DepsTracker::MetadataAvailable::kTextScore) {
632         // If there is a text score, assume we need to keep it if we can't prove we don't. If we are
633         // the first half of a pipeline which has been split, future stages might need it.
634         if (!knowAllMeta)
635             deps.setNeedTextScore(true);
636     } else {
637         // If there is no text score available, then we don't need to ask for it.
638         deps.setNeedTextScore(false);
639     }
640 
641     return deps;
642 }
643 
_pipelineCanRunOnMongoS() const644 Status Pipeline::_pipelineCanRunOnMongoS() const {
645     for (auto&& stage : _sources) {
646         auto constraints = stage->constraints(_splitState);
647         auto hostRequirement = constraints.resolvedHostTypeRequirement(pCtx);
648 
649         const bool needsShard = (hostRequirement == HostTypeRequirement::kAnyShard ||
650                                  hostRequirement == HostTypeRequirement::kPrimaryShard);
651 
652         const bool mustWriteToDisk =
653             (constraints.diskRequirement == DiskUseRequirement::kWritesPersistentData);
654         const bool mayWriteTmpDataAndDiskUseIsAllowed =
655             (pCtx->allowDiskUse &&
656              constraints.diskRequirement == DiskUseRequirement::kWritesTmpData);
657         const bool needsDisk = (mustWriteToDisk || mayWriteTmpDataAndDiskUseIsAllowed);
658 
659         const bool needsToBlock = (constraints.streamType == StreamType::kBlocking);
660         const bool blockingIsPermitted = !internalQueryProhibitBlockingMergeOnMongoS.load();
661 
662         // If nothing prevents this stage from running on mongoS, continue to the next stage.
663         if (!needsShard && !needsDisk && (!needsToBlock || blockingIsPermitted)) {
664             continue;
665         }
666 
667         // Otherwise, return an error with an explanation.
668         StringBuilder ss;
669         ss << stage->getSourceName();
670 
671         if (needsShard) {
672             ss << " must run on a shard";
673         } else if (needsToBlock && !blockingIsPermitted) {
674             ss << " is a blocking stage; running these stages on mongoS is disabled";
675         } else if (mustWriteToDisk) {
676             ss << " must write to disk";
677         } else if (mayWriteTmpDataAndDiskUseIsAllowed) {
678             ss << " may write to disk when 'allowDiskUse' is enabled";
679         } else {
680             MONGO_UNREACHABLE;
681         }
682 
683         return {ErrorCodes::IllegalOperation, ss.str()};
684     }
685 
686     return Status::OK();
687 }
688 
popFrontWithCriteria(StringData targetStageName,stdx::function<bool (const DocumentSource * const)> predicate)689 boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
690     StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) {
691     if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
692         return nullptr;
693     }
694     auto targetStage = _sources.front();
695 
696     if (predicate && !predicate(targetStage.get())) {
697         return nullptr;
698     }
699 
700     _sources.pop_front();
701     stitch();
702     return targetStage;
703 }
704 }  // namespace mongo
705