1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 // This file defines functions from both of these headers
34 #include "mongo/db/pipeline/pipeline.h"
35 #include "mongo/db/pipeline/pipeline_optimizations.h"
36
37 #include <algorithm>
38
39 #include "mongo/base/error_codes.h"
40 #include "mongo/db/bson/dotted_path_support.h"
41 #include "mongo/db/catalog/document_validation.h"
42 #include "mongo/db/jsobj.h"
43 #include "mongo/db/operation_context.h"
44 #include "mongo/db/pipeline/accumulator.h"
45 #include "mongo/db/pipeline/document.h"
46 #include "mongo/db/pipeline/document_source.h"
47 #include "mongo/db/pipeline/document_source_geo_near.h"
48 #include "mongo/db/pipeline/document_source_match.h"
49 #include "mongo/db/pipeline/document_source_merge_cursors.h"
50 #include "mongo/db/pipeline/document_source_out.h"
51 #include "mongo/db/pipeline/document_source_project.h"
52 #include "mongo/db/pipeline/document_source_unwind.h"
53 #include "mongo/db/pipeline/expression.h"
54 #include "mongo/db/pipeline/expression_context.h"
55 #include "mongo/util/mongoutils/str.h"
56
57 namespace mongo {
58
59 using boost::intrusive_ptr;
60 using std::endl;
61 using std::ostringstream;
62 using std::string;
63 using std::vector;
64
65 namespace dps = ::mongo::dotted_path_support;
66
67 using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement;
68 using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
69 using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
70 using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
71 using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement;
72 using StreamType = DocumentSource::StageConstraints::StreamType;
73
Pipeline(const intrusive_ptr<ExpressionContext> & pTheCtx)74 Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
75
Pipeline(SourceContainer stages,const intrusive_ptr<ExpressionContext> & expCtx)76 Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx)
77 : _sources(std::move(stages)), pCtx(expCtx) {}
78
~Pipeline()79 Pipeline::~Pipeline() {
80 invariant(_disposed);
81 }
82
parse(const std::vector<BSONObj> & rawPipeline,const intrusive_ptr<ExpressionContext> & expCtx)83 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse(
84 const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
85 return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false);
86 }
87
parseFacetPipeline(const std::vector<BSONObj> & rawPipeline,const intrusive_ptr<ExpressionContext> & expCtx)88 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseFacetPipeline(
89 const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
90 return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true);
91 }
92
parseTopLevelOrFacetPipeline(const std::vector<BSONObj> & rawPipeline,const intrusive_ptr<ExpressionContext> & expCtx,const bool isFacetPipeline)93 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseTopLevelOrFacetPipeline(
94 const std::vector<BSONObj>& rawPipeline,
95 const intrusive_ptr<ExpressionContext>& expCtx,
96 const bool isFacetPipeline) {
97
98 SourceContainer stages;
99
100 for (auto&& stageObj : rawPipeline) {
101 auto parsedSources = DocumentSource::parse(expCtx, stageObj);
102 stages.insert(stages.end(), parsedSources.begin(), parsedSources.end());
103 }
104
105 return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline);
106 }
107
create(SourceContainer stages,const intrusive_ptr<ExpressionContext> & expCtx)108 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create(
109 SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
110 return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false);
111 }
112
createFacetPipeline(SourceContainer stages,const intrusive_ptr<ExpressionContext> & expCtx)113 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::createFacetPipeline(
114 SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
115 return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true);
116 }
117
createTopLevelOrFacetPipeline(SourceContainer stages,const intrusive_ptr<ExpressionContext> & expCtx,const bool isFacetPipeline)118 StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::createTopLevelOrFacetPipeline(
119 SourceContainer stages,
120 const intrusive_ptr<ExpressionContext>& expCtx,
121 const bool isFacetPipeline) {
122 std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(std::move(stages), expCtx),
123 Pipeline::Deleter(expCtx->opCtx));
124 try {
125 if (isFacetPipeline) {
126 pipeline->validateFacetPipeline();
127 } else {
128 pipeline->validatePipeline();
129 }
130 } catch (const DBException& ex) {
131 return ex.toStatus();
132 }
133
134 pipeline->stitch();
135 return std::move(pipeline);
136 }
137
validatePipeline() const138 void Pipeline::validatePipeline() const {
139 // Verify that the specified namespace is valid for the initial stage of this pipeline.
140 const NamespaceString& nss = pCtx->ns;
141
142 if (_sources.empty()) {
143 if (nss.isCollectionlessAggregateNS()) {
144 uasserted(ErrorCodes::InvalidNamespace,
145 "{aggregate: 1} is not valid for an empty pipeline.");
146 }
147 } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
148 // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
149 // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
150 const auto firstStageConstraints = _sources.front()->constraints(_splitState);
151
152 if (nss.isCollectionlessAggregateNS() &&
153 !firstStageConstraints.isIndependentOfAnyCollection) {
154 uasserted(ErrorCodes::InvalidNamespace,
155 str::stream() << "{aggregate: 1} is not valid for '"
156 << _sources.front()->getSourceName()
157 << "'; a collection is required.");
158 }
159
160 if (!nss.isCollectionlessAggregateNS() &&
161 firstStageConstraints.isIndependentOfAnyCollection) {
162 uasserted(ErrorCodes::InvalidNamespace,
163 str::stream() << "'" << _sources.front()->getSourceName()
164 << "' can only be run with {aggregate: 1}");
165 }
166
167 // If the first stage is a $changeStream stage, then all stages in the pipeline must be
168 // either $changeStream stages or whitelisted as being able to run in a change stream.
169 if (firstStageConstraints.isChangeStreamStage()) {
170 for (auto&& source : _sources) {
171 uassert(ErrorCodes::IllegalOperation,
172 str::stream() << source->getSourceName()
173 << " is not permitted in a $changeStream pipeline",
174 source->constraints(_splitState).isAllowedInChangeStream());
175 }
176 }
177 }
178
179 // Verify that each stage is in a legal position within the pipeline.
180 ensureAllStagesAreInLegalPositions();
181 }
182
validateFacetPipeline() const183 void Pipeline::validateFacetPipeline() const {
184 if (_sources.empty()) {
185 uasserted(ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty");
186 }
187
188 for (auto&& stage : _sources) {
189 auto stageConstraints = stage->constraints(_splitState);
190 if (!stageConstraints.isAllowedInsideFacetStage()) {
191 uasserted(40600,
192 str::stream() << stage->getSourceName()
193 << " is not allowed to be used within a $facet stage");
194 }
195 // We expect a stage within a $facet stage to have these properties.
196 invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
197 invariant(!stageConstraints.isIndependentOfAnyCollection);
198 }
199
200 // Facet pipelines cannot have any stages which are initial sources. We've already validated the
201 // first stage, and the 'ensureAllStagesAreInLegalPositions' method checks that there are no
202 // initial sources in positions 1...N, so we can just return its result directly.
203 ensureAllStagesAreInLegalPositions();
204 }
205
ensureAllStagesAreInLegalPositions() const206 void Pipeline::ensureAllStagesAreInLegalPositions() const {
207 size_t i = 0;
208 for (auto&& stage : _sources) {
209 auto constraints = stage->constraints(_splitState);
210
211 // Verify that all stages adhere to their PositionRequirement constraints.
212 if (constraints.requiredPosition == PositionRequirement::kFirst && i != 0) {
213 uasserted(40602,
214 str::stream() << stage->getSourceName()
215 << " is only valid as the first stage in a pipeline.");
216 }
217 auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
218 if (i != 0 && matchStage && matchStage->isTextQuery()) {
219 uasserted(17313, "$match with $text is only allowed as the first pipeline stage");
220 }
221
222 if (constraints.requiredPosition == PositionRequirement::kLast &&
223 i != _sources.size() - 1) {
224 uasserted(40601,
225 str::stream() << stage->getSourceName()
226 << " can only be the final stage in the pipeline");
227 }
228 ++i;
229
230 // Verify that we are not attempting to run a mongoS-only stage on mongoD.
231 uassert(40644,
232 str::stream() << stage->getSourceName() << " can only be run on mongoS",
233 !(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos));
234 }
235 }
236
optimizePipeline()237 void Pipeline::optimizePipeline() {
238 SourceContainer optimizedSources;
239
240 SourceContainer::iterator itr = _sources.begin();
241
242 // We could be swapping around stages during this process, so disconnect the pipeline to prevent
243 // us from entering a state with dangling pointers.
244 unstitch();
245 while (itr != _sources.end()) {
246 invariant((*itr).get());
247 itr = (*itr).get()->optimizeAt(itr, &_sources);
248 }
249
250 // Once we have reached our final number of stages, optimize each individually.
251 for (auto&& source : _sources) {
252 if (auto out = source->optimize()) {
253 optimizedSources.push_back(out);
254 }
255 }
256 _sources.swap(optimizedSources);
257 stitch();
258 }
259
aggSupportsWriteConcern(const BSONObj & cmd)260 bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) {
261 auto pipelineElement = cmd["pipeline"];
262 if (pipelineElement.type() != BSONType::Array) {
263 return false;
264 }
265
266 for (auto stage : pipelineElement.Obj()) {
267 if (stage.type() != BSONType::Object) {
268 return false;
269 }
270
271 if (stage.Obj().hasField("$out")) {
272 return true;
273 }
274 }
275
276 return false;
277 }
278
detachFromOperationContext()279 void Pipeline::detachFromOperationContext() {
280 pCtx->opCtx = nullptr;
281
282 for (auto&& source : _sources) {
283 source->detachFromOperationContext();
284 }
285 }
286
reattachToOperationContext(OperationContext * opCtx)287 void Pipeline::reattachToOperationContext(OperationContext* opCtx) {
288 pCtx->opCtx = opCtx;
289
290 for (auto&& source : _sources) {
291 source->reattachToOperationContext(opCtx);
292 }
293 }
294
dispose(OperationContext * opCtx)295 void Pipeline::dispose(OperationContext* opCtx) {
296 try {
297 pCtx->opCtx = opCtx;
298
299 // Make sure all stages are connected, in case we are being disposed via an error path and
300 // were not stitched at the time of the error.
301 stitch();
302
303 if (!_sources.empty()) {
304 _sources.back()->dispose();
305 }
306 _disposed = true;
307 } catch (...) {
308 std::terminate();
309 }
310 }
311
splitForSharded()312 std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
313 invariant(!isSplitForShards());
314 invariant(!isSplitForMerge());
315 invariant(!_unsplitSources);
316
317 // Create and initialize the shard spec we'll return. We start with an empty pipeline on the
318 // shards and all work being done in the merger. Optimizations can move operations between
319 // the pipelines to be more efficient.
320 std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipeline(new Pipeline(pCtx),
321 Pipeline::Deleter(pCtx->opCtx));
322
323 // Keep a copy of the original source list in case we need to reset the pipeline from split to
324 // unsplit later.
325 shardPipeline->_unsplitSources.emplace(_sources);
326
327 // The order in which optimizations are applied can have significant impact on the
328 // efficiency of the final pipeline. Be Careful!
329 Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
330 Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
331 Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
332
333 shardPipeline->_splitState = SplitState::kSplitForShards;
334 _splitState = SplitState::kSplitForMerge;
335
336 stitch();
337
338 return shardPipeline;
339 }
340
unsplitFromSharded(std::unique_ptr<Pipeline,Pipeline::Deleter> pipelineForMergingShard)341 void Pipeline::unsplitFromSharded(
342 std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard) {
343 invariant(isSplitForShards());
344 invariant(!isSplitForMerge());
345 invariant(pipelineForMergingShard);
346 invariant(_unsplitSources);
347
348 // Clear the merge source list so that destroying the pipeline object won't dispose of the
349 // stages. We still have a reference to each of the stages which will be moved back to the shard
350 // pipeline via '_unsplitSources'.
351 pipelineForMergingShard->_sources.clear();
352 pipelineForMergingShard.reset();
353
354 // Set '_sources' to its original state, re-stitch, and clear the '_unsplitSources' optional.
355 _sources = *_unsplitSources;
356 _unsplitSources.reset();
357
358 _splitState = SplitState::kUnsplit;
359
360 stitch();
361 }
362
findSplitPoint(Pipeline * shardPipe,Pipeline * mergePipe)363 void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
364 while (!mergePipe->_sources.empty()) {
365 intrusive_ptr<DocumentSource> current = mergePipe->_sources.front();
366 mergePipe->_sources.pop_front();
367
368 // Check if this source is splittable.
369 SplittableDocumentSource* splittable =
370 dynamic_cast<SplittableDocumentSource*>(current.get());
371
372 if (!splittable) {
373 // Move the source from the merger _sources to the shard _sources.
374 shardPipe->_sources.push_back(current);
375 } else {
376 // Split this source into 'merge' and 'shard' _sources.
377 intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
378 auto mergeSources = splittable->getMergeSources();
379
380 // A source may not simultaneously be present on both sides of the split.
381 invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
382 mergeSources.end());
383
384 if (shardSource)
385 shardPipe->_sources.push_back(shardSource);
386
387 // Add the stages in reverse order, so that they appear in the pipeline in the same
388 // order as they were returned by the stage.
389 for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
390 mergePipe->_sources.push_front(*it);
391 }
392
393 break;
394 }
395 }
396 }
397
moveFinalUnwindFromShardsToMerger(Pipeline * shardPipe,Pipeline * mergePipe)398 void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
399 Pipeline* mergePipe) {
400 while (!shardPipe->_sources.empty() &&
401 dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) {
402 mergePipe->_sources.push_front(shardPipe->_sources.back());
403 shardPipe->_sources.pop_back();
404 }
405 }
406
limitFieldsSentFromShardsToMerger(Pipeline * shardPipe,Pipeline * mergePipe)407 void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
408 Pipeline* mergePipe) {
409 auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
410 ? DepsTracker::MetadataAvailable::kTextScore
411 : DepsTracker::MetadataAvailable::kNoMetadata;
412 DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
413 if (mergeDeps.needWholeDocument)
414 return; // the merge needs all fields, so nothing we can do.
415
416 // Empty project is "special" so if no fields are needed, we just ask for _id instead.
417 if (mergeDeps.fields.empty())
418 mergeDeps.fields.insert("_id");
419
420 // Remove metadata from dependencies since it automatically flows through projection and we
421 // don't want to project it in to the document.
422 mergeDeps.setNeedTextScore(false);
423
424 // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
425 // field dependencies. While this may not be 100% ideal in all cases, it is simple and
426 // avoids the worst cases by ensuring that:
427 // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
428 // dependencies. This situation can happen when a $sort is before the first $project or
429 // $group. Without the optimization, the shards would have to reify and transmit full
430 // objects even though only a subset of fields are needed.
431 // 2) Optimization IS NOT applied immediately following a $project or $group since it would
432 // add an unnecessary project (and therefore a deep-copy).
433 for (auto&& source : shardPipe->_sources) {
434 DepsTracker dt(depsMetadata);
435 if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
436 return;
437 }
438 // if we get here, add the project.
439 boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson(
440 BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx);
441 shardPipe->_sources.push_back(project);
442 }
443
getInitialQuery() const444 BSONObj Pipeline::getInitialQuery() const {
445 if (_sources.empty())
446 return BSONObj();
447
448 /* look for an initial $match */
449 DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(_sources.front().get());
450 if (match) {
451 return match->getQuery();
452 }
453
454 DocumentSourceGeoNear* geoNear = dynamic_cast<DocumentSourceGeoNear*>(_sources.front().get());
455 if (geoNear) {
456 return geoNear->getQuery();
457 }
458
459 return BSONObj();
460 }
461
needsPrimaryShardMerger() const462 bool Pipeline::needsPrimaryShardMerger() const {
463 return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
464 return stage->constraints(SplitState::kSplitForMerge).hostRequirement ==
465 HostTypeRequirement::kPrimaryShard;
466 });
467 }
468
needsMongosMerger() const469 bool Pipeline::needsMongosMerger() const {
470 return std::any_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
471 return stage->constraints(SplitState::kSplitForMerge).resolvedHostTypeRequirement(pCtx) ==
472 HostTypeRequirement::kMongoS;
473 });
474 }
475
canRunOnMongos() const476 bool Pipeline::canRunOnMongos() const {
477 return _pipelineCanRunOnMongoS().isOK();
478 }
479
requiredToRunOnMongos() const480 bool Pipeline::requiredToRunOnMongos() const {
481 invariant(!isSplitForShards());
482
483 for (auto&& stage : _sources) {
484 // If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline
485 // as a whole is not required to run on mongoS.
486 if (isUnsplit() && dynamic_cast<SplittableDocumentSource*>(stage.get())) {
487 return false;
488 }
489
490 auto hostRequirement = stage->constraints(_splitState).resolvedHostTypeRequirement(pCtx);
491
492 // If a mongoS-only stage occurs before a splittable stage, or if the pipeline is already
493 // split, this entire pipeline must run on mongoS.
494 if (hostRequirement == HostTypeRequirement::kMongoS) {
495 // Verify that the remainder of this pipeline can run on mongoS.
496 auto mongosRunStatus = _pipelineCanRunOnMongoS();
497
498 uassert(mongosRunStatus.code(),
499 str::stream() << stage->getSourceName() << " must run on mongoS, but "
500 << mongosRunStatus.reason(),
501 mongosRunStatus.isOK());
502
503 return true;
504 }
505 }
506
507 return false;
508 }
509
getInvolvedCollections() const510 std::vector<NamespaceString> Pipeline::getInvolvedCollections() const {
511 std::vector<NamespaceString> collections;
512 for (auto&& source : _sources) {
513 source->addInvolvedCollections(&collections);
514 }
515 return collections;
516 }
517
serialize() const518 vector<Value> Pipeline::serialize() const {
519 vector<Value> serializedSources;
520 for (auto&& source : _sources) {
521 source->serializeToArray(serializedSources);
522 }
523 return serializedSources;
524 }
525
unstitch()526 void Pipeline::unstitch() {
527 for (auto&& stage : _sources) {
528 stage->setSource(nullptr);
529 }
530 }
531
stitch()532 void Pipeline::stitch() {
533 if (_sources.empty()) {
534 return;
535 }
536 // Chain together all the stages.
537 DocumentSource* prevSource = _sources.front().get();
538 prevSource->setSource(nullptr);
539 for (SourceContainer::iterator iter(++_sources.begin()), listEnd(_sources.end());
540 iter != listEnd;
541 ++iter) {
542 intrusive_ptr<DocumentSource> pTemp(*iter);
543 pTemp->setSource(prevSource);
544 prevSource = pTemp.get();
545 }
546 }
547
getNext()548 boost::optional<Document> Pipeline::getNext() {
549 invariant(!_sources.empty());
550 auto nextResult = _sources.back()->getNext();
551 while (nextResult.isPaused()) {
552 nextResult = _sources.back()->getNext();
553 }
554 return nextResult.isEOF() ? boost::none
555 : boost::optional<Document>{nextResult.releaseDocument()};
556 }
557
writeExplainOps(ExplainOptions::Verbosity verbosity) const558 vector<Value> Pipeline::writeExplainOps(ExplainOptions::Verbosity verbosity) const {
559 vector<Value> array;
560 for (SourceContainer::const_iterator it = _sources.begin(); it != _sources.end(); ++it) {
561 (*it)->serializeToArray(array, verbosity);
562 }
563 return array;
564 }
565
addInitialSource(intrusive_ptr<DocumentSource> source)566 void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) {
567 if (!_sources.empty()) {
568 _sources.front()->setSource(source.get());
569 }
570 _sources.push_front(source);
571 }
572
addFinalSource(intrusive_ptr<DocumentSource> source)573 void Pipeline::addFinalSource(intrusive_ptr<DocumentSource> source) {
574 if (!_sources.empty()) {
575 source->setSource(_sources.back().get());
576 }
577 _sources.push_back(source);
578 }
579
getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const580 DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const {
581 DepsTracker deps(metadataAvailable);
582 const bool scopeHasVariables = pCtx->variablesParseState.hasDefinedVariables();
583 bool skipFieldsAndMetadataDeps = false;
584 bool knowAllFields = false;
585 bool knowAllMeta = false;
586 for (auto&& source : _sources) {
587 DepsTracker localDeps(deps.getMetadataAvailable());
588 DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps);
589
590 deps.vars.insert(localDeps.vars.begin(), localDeps.vars.end());
591
592 if ((skipFieldsAndMetadataDeps |= (status == DocumentSource::NOT_SUPPORTED))) {
593 // Assume this stage needs everything. We may still know something about our
594 // dependencies if an earlier stage returned EXHAUSTIVE_FIELDS or EXHAUSTIVE_META. If
595 // this scope has variables, we need to keep enumerating the remaining stages but will
596 // skip adding any further field or metadata dependencies.
597 if (scopeHasVariables) {
598 continue;
599 } else {
600 break;
601 }
602 }
603
604 if (!knowAllFields) {
605 deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end());
606 if (localDeps.needWholeDocument)
607 deps.needWholeDocument = true;
608 knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS;
609 }
610
611 if (!knowAllMeta) {
612 if (localDeps.getNeedTextScore())
613 deps.setNeedTextScore(true);
614
615 if (localDeps.getNeedSortKey())
616 deps.setNeedSortKey(true);
617
618 knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
619 }
620
621 // If there are variables defined at this pipeline's scope, there may be dependencies upon
622 // them in subsequent stages. Keep enumerating.
623 if (knowAllMeta && knowAllFields && !scopeHasVariables) {
624 break;
625 }
626 }
627
628 if (!knowAllFields)
629 deps.needWholeDocument = true; // don't know all fields we need
630
631 if (metadataAvailable & DepsTracker::MetadataAvailable::kTextScore) {
632 // If there is a text score, assume we need to keep it if we can't prove we don't. If we are
633 // the first half of a pipeline which has been split, future stages might need it.
634 if (!knowAllMeta)
635 deps.setNeedTextScore(true);
636 } else {
637 // If there is no text score available, then we don't need to ask for it.
638 deps.setNeedTextScore(false);
639 }
640
641 return deps;
642 }
643
_pipelineCanRunOnMongoS() const644 Status Pipeline::_pipelineCanRunOnMongoS() const {
645 for (auto&& stage : _sources) {
646 auto constraints = stage->constraints(_splitState);
647 auto hostRequirement = constraints.resolvedHostTypeRequirement(pCtx);
648
649 const bool needsShard = (hostRequirement == HostTypeRequirement::kAnyShard ||
650 hostRequirement == HostTypeRequirement::kPrimaryShard);
651
652 const bool mustWriteToDisk =
653 (constraints.diskRequirement == DiskUseRequirement::kWritesPersistentData);
654 const bool mayWriteTmpDataAndDiskUseIsAllowed =
655 (pCtx->allowDiskUse &&
656 constraints.diskRequirement == DiskUseRequirement::kWritesTmpData);
657 const bool needsDisk = (mustWriteToDisk || mayWriteTmpDataAndDiskUseIsAllowed);
658
659 const bool needsToBlock = (constraints.streamType == StreamType::kBlocking);
660 const bool blockingIsPermitted = !internalQueryProhibitBlockingMergeOnMongoS.load();
661
662 // If nothing prevents this stage from running on mongoS, continue to the next stage.
663 if (!needsShard && !needsDisk && (!needsToBlock || blockingIsPermitted)) {
664 continue;
665 }
666
667 // Otherwise, return an error with an explanation.
668 StringBuilder ss;
669 ss << stage->getSourceName();
670
671 if (needsShard) {
672 ss << " must run on a shard";
673 } else if (needsToBlock && !blockingIsPermitted) {
674 ss << " is a blocking stage; running these stages on mongoS is disabled";
675 } else if (mustWriteToDisk) {
676 ss << " must write to disk";
677 } else if (mayWriteTmpDataAndDiskUseIsAllowed) {
678 ss << " may write to disk when 'allowDiskUse' is enabled";
679 } else {
680 MONGO_UNREACHABLE;
681 }
682
683 return {ErrorCodes::IllegalOperation, ss.str()};
684 }
685
686 return Status::OK();
687 }
688
popFrontWithCriteria(StringData targetStageName,stdx::function<bool (const DocumentSource * const)> predicate)689 boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
690 StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) {
691 if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
692 return nullptr;
693 }
694 auto targetStage = _sources.front();
695
696 if (predicate && !predicate(targetStage.get())) {
697 return nullptr;
698 }
699
700 _sources.pop_front();
701 stitch();
702 return targetStage;
703 }
704 } // namespace mongo
705