1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include "mongo/platform/basic.h" 34 35 #include <boost/intrusive_ptr.hpp> 36 #include <boost/optional.hpp> 37 #include <list> 38 #include <memory> 39 #include <string> 40 #include <vector> 41 42 #include "mongo/base/init.h" 43 #include "mongo/bson/simple_bsonobj_comparator.h" 44 #include "mongo/client/dbclientinterface.h" 45 #include "mongo/db/collection_index_usage_tracker.h" 46 #include "mongo/db/commands.h" 47 #include "mongo/db/jsobj.h" 48 #include "mongo/db/namespace_string.h" 49 #include "mongo/db/pipeline/dependencies.h" 50 #include "mongo/db/pipeline/document.h" 51 #include "mongo/db/pipeline/expression_context.h" 52 #include "mongo/db/pipeline/field_path.h" 53 #include "mongo/db/pipeline/lite_parsed_document_source.h" 54 #include "mongo/db/pipeline/pipeline.h" 55 #include "mongo/db/pipeline/value.h" 56 #include "mongo/db/query/explain_options.h" 57 #include "mongo/stdx/functional.h" 58 #include "mongo/util/intrusive_counter.h" 59 60 namespace mongo { 61 62 class AggregationRequest; 63 class Document; 64 65 /** 66 * Registers a DocumentSource to have the name 'key'. 67 * 68 * 'liteParser' takes an AggregationRequest and a BSONElement and returns a 69 * LiteParsedDocumentSource. This is used for checks that need to happen before a full parse, 70 * such as checks about which namespaces are referenced by this aggregation. 71 * 72 * 'fullParser' takes a BSONElement and an ExpressionContext and returns a fully-executable 73 * DocumentSource. This will be used for optimization and execution. 74 * 75 * Stages that do not require any special pre-parse checks can use 76 * LiteParsedDocumentSourceDefault::parse as their 'liteParser'. 77 * 78 * As an example, if your stage DocumentSourceFoo looks like {$foo: <args>} and does *not* require 79 * any special pre-parse checks, you should implement a static parser like 80 * DocumentSourceFoo::createFromBson(), and register it like so: 81 * REGISTER_DOCUMENT_SOURCE(foo, 82 * LiteParsedDocumentSourceDefault::parse, 83 * DocumentSourceFoo::createFromBson); 84 * 85 * If your stage is actually an alias which needs to return more than one stage (such as 86 * $sortByCount), you should use the REGISTER_MULTI_STAGE_ALIAS macro instead. 87 */ 88 #define REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, ...) \ 89 MONGO_INITIALIZER(addToDocSourceParserMap_##key)(InitializerContext*) { \ 90 if (!__VA_ARGS__) { \ 91 return Status::OK(); \ 92 } \ 93 auto fullParserWrapper = [](BSONElement stageSpec, \ 94 const boost::intrusive_ptr<ExpressionContext>& expCtx) { \ 95 return std::list<boost::intrusive_ptr<DocumentSource>>{ \ 96 (fullParser)(stageSpec, expCtx)}; \ 97 }; \ 98 LiteParsedDocumentSource::registerParser("$" #key, liteParser); \ 99 DocumentSource::registerParser("$" #key, fullParserWrapper); \ 100 return Status::OK(); \ 101 } 102 103 #define REGISTER_DOCUMENT_SOURCE(key, liteParser, fullParser) \ 104 REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, true) 105 106 #define REGISTER_TEST_DOCUMENT_SOURCE(key, liteParser, fullParser) \ 107 REGISTER_DOCUMENT_SOURCE_CONDITIONALLY( \ 108 key, liteParser, fullParser, Command::testCommandsEnabled) 109 110 /** 111 * Registers a multi-stage alias (such as $sortByCount) to have the single name 'key'. When a stage 112 * with name '$key' is found, 'liteParser' will be used to produce a LiteParsedDocumentSource, 113 * while 'fullParser' will be called to construct a vector of DocumentSources. See the comments on 114 * REGISTER_DOCUMENT_SOURCE for more information. 115 * 116 * As an example, if your stage alias looks like {$foo: <args>} and does *not* require any special 117 * pre-parse checks, you should implement a static parser like DocumentSourceFoo::createFromBson(), 118 * and register it like so: 119 * REGISTER_MULTI_STAGE_ALIAS(foo, 120 * LiteParsedDocumentSourceDefault::parse, 121 * DocumentSourceFoo::createFromBson); 122 */ 123 #define REGISTER_MULTI_STAGE_ALIAS(key, liteParser, fullParser) \ 124 MONGO_INITIALIZER(addAliasToDocSourceParserMap_##key)(InitializerContext*) { \ 125 LiteParsedDocumentSource::registerParser("$" #key, (liteParser)); \ 126 DocumentSource::registerParser("$" #key, (fullParser)); \ 127 return Status::OK(); \ 128 } 129 130 class DocumentSource : public IntrusiveCounterUnsigned { 131 public: 132 using Parser = stdx::function<std::list<boost::intrusive_ptr<DocumentSource>>( 133 BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; 134 135 /** 136 * A struct describing various constraints about where this stage can run, where it must be in 137 * the pipeline, what resources it may require, etc. 138 */ 139 struct StageConstraints { 140 /** 141 * A StreamType defines whether this stage is streaming (can produce output based solely on 142 * the current input document) or blocking (must examine subsequent documents before 143 * producing an output document). 144 */ 145 enum class StreamType { kStreaming, kBlocking }; 146 147 /** 148 * A PositionRequirement stipulates what specific position the stage must occupy within the 149 * pipeline, if any. 150 */ 151 enum class PositionRequirement { kNone, kFirst, kLast }; 152 153 /** 154 * A HostTypeRequirement defines where this stage is permitted to be executed when the 155 * pipeline is run on a sharded cluster. 156 */ 157 enum class HostTypeRequirement { 158 // Indicates that the stage can run either on mongoD or mongoS. 159 kNone, 160 // Indicates that the stage must run on the host to which it was originally sent and 161 // cannot be forwarded from mongoS to the shards. 162 kLocalOnly, 163 // Indicates that the stage must run on the primary shard. 164 kPrimaryShard, 165 // Indicates that the stage must run on any participating shard. 166 kAnyShard, 167 // Indicates that the stage can only run on mongoS. 168 kMongoS, 169 }; 170 171 /** 172 * A DiskUseRequirement indicates whether this stage writes permanent data to disk, or 173 * whether it may spill temporary data to disk if its memory usage exceeds a given 174 * threshold. Note that this only indicates that the stage has the ability to spill; if 175 * 'allowDiskUse' is set to false, it will be prevented from doing so. 176 */ 177 enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData }; 178 179 /** 180 * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream 181 * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is 182 * blacklisted from $changeStream. 183 */ 184 enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist }; 185 186 /** 187 * A FacetRequirement indicates whether this stage may be used within a $facet pipeline. 188 */ 189 enum class FacetRequirement { kAllowed, kNotAllowed }; 190 191 StageConstraints( 192 StreamType streamType, 193 PositionRequirement requiredPosition, 194 HostTypeRequirement hostRequirement, 195 DiskUseRequirement diskRequirement, 196 FacetRequirement facetRequirement, 197 ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist) requiredPositionStageConstraints198 : requiredPosition(requiredPosition), 199 hostRequirement(hostRequirement), 200 diskRequirement(diskRequirement), 201 changeStreamRequirement(changeStreamRequirement), 202 facetRequirement(facetRequirement), 203 streamType(streamType) { 204 // Stages which are allowed to run in $facet must not have any position requirements. 205 invariant( 206 !(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone)); 207 208 // No change stream stages are permitted to run in a $facet pipeline. 209 invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage())); 210 211 // Only streaming stages are permitted in $changeStream pipelines. 212 invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking)); 213 214 // A stage which is whitelisted for $changeStream cannot have a requirement to run on a 215 // shard, since it needs to be able to run on mongoS in a cluster. 216 invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && 217 (hostRequirement == HostTypeRequirement::kAnyShard || 218 hostRequirement == HostTypeRequirement::kPrimaryShard))); 219 220 // A stage which is whitelisted for $changeStream cannot have a position requirement. 221 invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && 222 requiredPosition != PositionRequirement::kNone)); 223 } 224 225 /** 226 * Returns the literal HostTypeRequirement used to initialize the StageConstraints, or the 227 * effective HostTypeRequirement (kAnyShard or kMongoS) if kLocalOnly was specified. 228 */ resolvedHostTypeRequirementStageConstraints229 HostTypeRequirement resolvedHostTypeRequirement( 230 const boost::intrusive_ptr<ExpressionContext>& expCtx) const { 231 return (hostRequirement != HostTypeRequirement::kLocalOnly 232 ? hostRequirement 233 : (expCtx->inMongos ? HostTypeRequirement::kMongoS 234 : HostTypeRequirement::kAnyShard)); 235 } 236 237 /** 238 * True if this stage must run on the same host to which it was originally sent. 239 */ mustRunLocallyStageConstraints240 bool mustRunLocally() const { 241 return hostRequirement == HostTypeRequirement::kLocalOnly; 242 } 243 244 /** 245 * True if this stage is permitted to run in a $facet pipeline. 246 */ isAllowedInsideFacetStageStageConstraints247 bool isAllowedInsideFacetStage() const { 248 return facetRequirement == FacetRequirement::kAllowed; 249 } 250 251 /** 252 * True if this stage is permitted to run in a pipeline which starts with $changeStream. 253 */ isAllowedInChangeStreamStageConstraints254 bool isAllowedInChangeStream() const { 255 return changeStreamRequirement != ChangeStreamRequirement::kBlacklist; 256 } 257 258 /** 259 * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed 260 * to run in a pipeline which begins with $changeStream. 261 */ isChangeStreamStageStageConstraints262 bool isChangeStreamStage() const { 263 return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage; 264 } 265 266 // Indicates whether this stage needs to be at a particular position in the pipeline. 267 const PositionRequirement requiredPosition; 268 269 // Indicates whether this stage can only be executed on specific components of a sharded 270 // cluster. 271 const HostTypeRequirement hostRequirement; 272 273 // Indicates whether this stage may write persistent data to disk, or may spill to temporary 274 // files if its memory usage becomes excessive. 275 const DiskUseRequirement diskRequirement; 276 277 // Indicates whether this stage is itself a $changeStream stage, or if not whether it may 278 // exist in a pipeline which begins with $changeStream. 279 const ChangeStreamRequirement changeStreamRequirement; 280 281 // Indicates whether this stage may run inside a $facet stage. 282 const FacetRequirement facetRequirement; 283 284 // Indicates whether this is a streaming or blocking stage. 285 const StreamType streamType; 286 287 // True if this stage does not generate results itself, and instead pulls inputs from an 288 // input DocumentSource (via 'pSource'). 289 bool requiresInputDocSource = true; 290 291 // True if this stage operates on a global or database level, like $currentOp. 292 bool isIndependentOfAnyCollection = false; 293 294 // True if this stage can ever be safely swapped with a subsequent $match stage, provided 295 // that the match does not depend on the paths returned by getModifiedPaths(). 296 // 297 // Stages that want to participate in match swapping should set this to true. Such a stage 298 // must also override getModifiedPaths() to provide information about which particular 299 // $match predicates be swapped before itself. 300 bool canSwapWithMatch = false; 301 }; 302 303 using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement; 304 using HostTypeRequirement = StageConstraints::HostTypeRequirement; 305 using PositionRequirement = StageConstraints::PositionRequirement; 306 using DiskUseRequirement = StageConstraints::DiskUseRequirement; 307 using FacetRequirement = StageConstraints::FacetRequirement; 308 using StreamType = StageConstraints::StreamType; 309 310 /** 311 * This is what is returned from the main DocumentSource API: getNext(). It is essentially a 312 * (ReturnStatus, Document) pair, with the first entry being used to communicate information 313 * about the execution of the DocumentSource, such as whether or not it has been exhausted. 314 */ 315 class GetNextResult { 316 public: 317 enum class ReturnStatus { 318 // There is a result to be processed. 319 kAdvanced, 320 // There will be no further results. 321 kEOF, 322 // There is not a result to be processed yet, but there may be more results in the 323 // future. If a DocumentSource retrieves this status from its child, it must propagate 324 // it without doing any further work. 325 kPauseExecution, 326 }; 327 makeEOF()328 static GetNextResult makeEOF() { 329 return GetNextResult(ReturnStatus::kEOF); 330 } 331 makePauseExecution()332 static GetNextResult makePauseExecution() { 333 return GetNextResult(ReturnStatus::kPauseExecution); 334 } 335 336 /** 337 * Shortcut constructor for the common case of creating an 'advanced' GetNextResult from the 338 * given 'result'. Accepts only an rvalue reference as an argument, since DocumentSources 339 * will want to move 'result' into this GetNextResult, and should have to opt in to making a 340 * copy. 341 */ GetNextResult(Document && result)342 /* implicit */ GetNextResult(Document&& result) 343 : _status(ReturnStatus::kAdvanced), _result(std::move(result)) {} 344 345 /** 346 * Gets the result document. It is an error to call this if isAdvanced() returns false. 347 */ getDocument()348 const Document& getDocument() const { 349 dassert(isAdvanced()); 350 return _result; 351 } 352 353 /** 354 * Releases the result document, transferring ownership to the caller. It is an error to 355 * call this if isAdvanced() returns false. 356 */ releaseDocument()357 Document releaseDocument() { 358 dassert(isAdvanced()); 359 return std::move(_result); 360 } 361 getStatus()362 ReturnStatus getStatus() const { 363 return _status; 364 } 365 isAdvanced()366 bool isAdvanced() const { 367 return _status == ReturnStatus::kAdvanced; 368 } 369 isEOF()370 bool isEOF() const { 371 return _status == ReturnStatus::kEOF; 372 } 373 isPaused()374 bool isPaused() const { 375 return _status == ReturnStatus::kPauseExecution; 376 } 377 378 private: GetNextResult(ReturnStatus status)379 GetNextResult(ReturnStatus status) : _status(status) {} 380 381 ReturnStatus _status; 382 Document _result; 383 }; 384 ~DocumentSource()385 virtual ~DocumentSource() {} 386 387 /** 388 * The main execution API of a DocumentSource. Returns an intermediate query result generated by 389 * this DocumentSource. 390 * 391 * All implementers must call pExpCtx->checkForInterrupt(). 392 * 393 * For performance reasons, a streaming stage must not keep references to documents across calls 394 * to getNext(). Such stages must retrieve a result from their child and then release it (or 395 * return it) before asking for another result. Failing to do so can result in extra work, since 396 * the Document/Value library must copy data on write when that data has a refcount above one. 397 */ 398 virtual GetNextResult getNext() = 0; 399 400 /** 401 * Returns a struct containing information about any special constraints imposed on using this 402 * stage. Input parameter Pipeline::SplitState is used by stages whose requirements change 403 * depending on whether they are in a split or unsplit pipeline. 404 */ 405 virtual StageConstraints constraints( 406 Pipeline::SplitState = Pipeline::SplitState::kUnsplit) const = 0; 407 408 /** 409 * Informs the stage that it is no longer needed and can release its resources. After dispose() 410 * is called the stage must still be able to handle calls to getNext(), but can return kEOF. 411 * 412 * This is a non-virtual public interface to ensure dispose() is threaded through the entire 413 * pipeline. Subclasses should override doDispose() to implement their disposal. 414 */ dispose()415 void dispose() { 416 doDispose(); 417 if (pSource) { 418 pSource->dispose(); 419 } 420 } 421 422 /** 423 * Get the stage's name. 424 */ 425 virtual const char* getSourceName() const; 426 427 /** 428 * Set the underlying source this source should use to get Documents from. Must not throw 429 * exceptions. 430 */ setSource(DocumentSource * source)431 virtual void setSource(DocumentSource* source) { 432 pSource = source; 433 } 434 435 /** 436 * In the default case, serializes the DocumentSource and adds it to the std::vector<Value>. 437 * 438 * A subclass may choose to overwrite this, rather than serialize, if it should output multiple 439 * stages (eg, $sort sometimes also outputs a $limit). 440 * 441 * The 'explain' parameter indicates the explain verbosity mode, or is equal boost::none if no 442 * explain is requested. 443 */ 444 virtual void serializeToArray( 445 std::vector<Value>& array, 446 boost::optional<ExplainOptions::Verbosity> explain = boost::none) const; 447 448 /** 449 * If DocumentSource uses additional collections, it adds the namespaces to the input vector. 450 */ addInvolvedCollections(std::vector<NamespaceString> * collections)451 virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {} 452 detachFromOperationContext()453 virtual void detachFromOperationContext() {} 454 reattachToOperationContext(OperationContext * opCtx)455 virtual void reattachToOperationContext(OperationContext* opCtx) {} 456 457 /** 458 * Create a DocumentSource pipeline stage from 'stageObj'. 459 */ 460 static std::list<boost::intrusive_ptr<DocumentSource>> parse( 461 const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj stageObj); 462 463 /** 464 * Registers a DocumentSource with a parsing function, so that when a stage with the given name 465 * is encountered, it will call 'parser' to construct that stage. 466 * 467 * DO NOT call this method directly. Instead, use the REGISTER_DOCUMENT_SOURCE macro defined in 468 * this file. 469 */ 470 static void registerParser(std::string name, Parser parser); 471 472 /** 473 * Given a BSONObj, construct a BSONObjSet consisting of all prefixes of that object. For 474 * example, given {a: 1, b: 1, c: 1}, this will return a set: {{a: 1}, {a: 1, b: 1}, {a: 1, b: 475 * 1, c: 1}}. 476 */ 477 static BSONObjSet allPrefixes(BSONObj obj); 478 479 /** 480 * Given a BSONObjSet, where each BSONObj represents a sort key, return the BSONObjSet that 481 * results from truncating each sort key before the first path that is a member of 'fields', or 482 * is a child of a member of 'fields'. 483 */ 484 static BSONObjSet truncateSortSet(const BSONObjSet& sorts, const std::set<std::string>& fields); 485 486 // 487 // Optimization API - These methods give each DocumentSource an opportunity to apply any local 488 // optimizations, and to provide any rule-based optimizations to swap with or absorb subsequent 489 // stages. 490 // 491 492 /** 493 * The non-virtual public interface for optimization. Attempts to do some generic optimizations 494 * such as pushing $matches as early in the pipeline as possible, then calls out to 495 * doOptimizeAt() for stage-specific optimizations. 496 * 497 * Subclasses should override doOptimizeAt() if they can apply some optimization(s) based on 498 * subsequent stages in the pipeline. 499 */ 500 Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, 501 Pipeline::SourceContainer* container); 502 503 /** 504 * Returns an optimized DocumentSource that is semantically equivalent to this one, or 505 * nullptr if this stage is a no-op. Implementations are allowed to modify themselves 506 * in-place and return a pointer to themselves. For best results, first optimize the pipeline 507 * with the optimizePipeline() method defined in pipeline.cpp. 508 * 509 * This is intended for any operations that include expressions, and provides a hook for 510 * those to optimize those operations. 511 * 512 * The default implementation is to do nothing and return yourself. 513 */ 514 virtual boost::intrusive_ptr<DocumentSource> optimize(); 515 516 // 517 // Property Analysis - These methods allow a DocumentSource to expose information about 518 // properties of themselves, such as which fields they need to apply their transformations, and 519 // whether or not they produce or preserve a sort order. 520 // 521 // Property analysis can be useful during optimization (e.g. analysis of sort orders determines 522 // whether or not a blocking group can be upgraded to a streaming group). 523 // 524 525 /** 526 * Gets a BSONObjSet representing the sort order(s) of the output of the stage. 527 */ getOutputSorts()528 virtual BSONObjSet getOutputSorts() { 529 return SimpleBSONObjComparator::kInstance.makeBSONObjSet(); 530 } 531 532 struct GetModPathsReturn { 533 enum class Type { 534 // No information is available about which paths are modified. 535 kNotSupported, 536 537 // All fields will be modified. This should be used by stages like $replaceRoot which 538 // modify the entire document. 539 kAllPaths, 540 541 // A finite set of paths will be modified by this stage. This is true for something like 542 // {$project: {a: 0, b: 0}}, which will only modify 'a' and 'b', and leave all other 543 // paths unmodified. 544 kFiniteSet, 545 546 // This stage will modify an infinite set of paths, but we know which paths it will not 547 // modify. For example, the stage {$project: {_id: 1, a: 1}} will leave only the fields 548 // '_id' and 'a' unmodified, but all other fields will be projected out. 549 kAllExcept, 550 }; 551 GetModPathsReturnGetModPathsReturn552 GetModPathsReturn(Type type, 553 std::set<std::string>&& paths, 554 StringMap<std::string>&& renames) 555 : type(type), paths(std::move(paths)), renames(std::move(renames)) {} 556 557 Type type; 558 std::set<std::string> paths; 559 560 // Stages may fill out 'renames' to contain information about path renames. Each entry in 561 // 'renames' maps from the new name of the path (valid in documents flowing *out* of this 562 // stage) to the old name of the path (valid in documents flowing *into* this stage). 563 // 564 // For example, consider the stage 565 // 566 // {$project: {_id: 0, a: 1, b: "$c"}} 567 // 568 // This stage should return kAllExcept, since it modifies all paths other than "a". It can 569 // also fill out 'renames' with the mapping "b" => "c". 570 StringMap<std::string> renames; 571 }; 572 573 /** 574 * Returns information about which paths are added, removed, or updated by this stage. The 575 * default implementation uses kNotSupported to indicate that the set of modified paths for this 576 * stage is not known. 577 * 578 * See GetModPathsReturn above for the possible return values and what they mean. 579 */ getModifiedPaths()580 virtual GetModPathsReturn getModifiedPaths() const { 581 return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}}; 582 } 583 584 enum GetDepsReturn { 585 // The full object and all metadata may be required. 586 NOT_SUPPORTED = 0x0, 587 588 // Later stages could need either fields or metadata. For example, a $limit stage will pass 589 // through all fields, and they may or may not be needed by future stages. 590 SEE_NEXT = 0x1, 591 592 // Later stages won't need more fields from input. For example, an inclusion projection like 593 // {_id: 1, a: 1} will only output two fields, so future stages cannot possibly depend on 594 // any other fields. 595 EXHAUSTIVE_FIELDS = 0x2, 596 597 // Later stages won't need more metadata from input. For example, a $group stage will group 598 // documents together, discarding their text score and sort keys. 599 EXHAUSTIVE_META = 0x4, 600 601 // Later stages won't need either fields or metadata. 602 EXHAUSTIVE_ALL = EXHAUSTIVE_FIELDS | EXHAUSTIVE_META, 603 }; 604 605 /** 606 * Get the dependencies this operation needs to do its job. If overridden, subclasses must add 607 * all paths needed to apply their transformation to 'deps->fields', and call 608 * 'deps->setNeedTextScore()' if the text score is required. 609 * 610 * See GetDepsReturn above for the possible return values and what they mean. 611 */ getDependencies(DepsTracker * deps)612 virtual GetDepsReturn getDependencies(DepsTracker* deps) const { 613 return NOT_SUPPORTED; 614 } 615 616 protected: 617 explicit DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); 618 619 /** 620 * Attempt to perform an optimization with the following source in the pipeline. 'container' 621 * refers to the entire pipeline, and 'itr' points to this stage within the pipeline. The caller 622 * must guarantee that std::next(itr) != container->end(). 623 * 624 * The return value is an iterator over the same container which points to the first location 625 * in the container at which an optimization may be possible. 626 * 627 * For example, if a swap takes place, the returned iterator should just be the position 628 * directly preceding 'itr', if such a position exists, since the stage at that position may be 629 * able to perform further optimizations with its new neighbor. 630 */ doOptimizeAt(Pipeline::SourceContainer::iterator itr,Pipeline::SourceContainer * container)631 virtual Pipeline::SourceContainer::iterator doOptimizeAt( 632 Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { 633 return std::next(itr); 634 }; 635 636 /** 637 * Release any resources held by this stage. After doDispose() is called the stage must still be 638 * able to handle calls to getNext(), but can return kEOF. 639 */ doDispose()640 virtual void doDispose() {} 641 642 /* 643 Most DocumentSources have an underlying source they get their data 644 from. This is a convenience for them. 645 646 The default implementation of setSource() sets this; if you don't 647 need a source, override that to verify(). The default is to 648 verify() if this has already been set. 649 */ 650 DocumentSource* pSource; 651 652 boost::intrusive_ptr<ExpressionContext> pExpCtx; 653 654 private: 655 /** 656 * Create a Value that represents the document source. 657 * 658 * This is used by the default implementation of serializeToArray() to add this object 659 * to a pipeline being serialized. Returning a missing() Value results in no entry 660 * being added to the array for this stage (DocumentSource). 661 * 662 * The 'explain' parameter indicates the explain verbosity mode, or is equal boost::none if no 663 * explain is requested. 664 */ 665 virtual Value serialize( 666 boost::optional<ExplainOptions::Verbosity> explain = boost::none) const = 0; 667 }; 668 669 /** 670 * This class marks DocumentSources that should be split between the merger and the shards. See 671 * Pipeline::Optimizations::Sharded::findSplitPoint() for details. 672 */ 673 class SplittableDocumentSource { 674 public: 675 /** 676 * Returns a source to be run on the shards, or NULL if no work should be done on the shards for 677 * this stage. Must not mutate the existing source object; if different behaviour is required in 678 * the split-pipeline case, a new source should be created and configured appropriately. It is 679 * an error for getShardSource() to return a pointer to the same object as getMergeSource(), 680 * since this can result in the source being stitched into both the shard and merge pipelines 681 * when the latter is executed on mongoS. 682 */ 683 virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0; 684 685 /** 686 * Returns a list of stages that combine results from the shards, or an empty list if no work 687 * should be done in the merge pipeline for this stage. Must not mutate the existing source 688 * object; if different behaviour is required, a new source should be created and configured 689 * appropriately. It is an error for getMergeSources() to return a pointer to the same object as 690 * getShardSource(). 691 */ 692 virtual std::list<boost::intrusive_ptr<DocumentSource>> getMergeSources() = 0; 693 694 protected: 695 // It is invalid to delete through a SplittableDocumentSource-typed pointer. ~SplittableDocumentSource()696 virtual ~SplittableDocumentSource() {} 697 }; 698 699 700 /** 701 * This class marks DocumentSources which need functionality specific to a mongos or a mongod. It 702 * causes a MongodProcessInterface to be injected when in a mongod and a MongosProcessInterface when 703 * in a mongos. 704 */ 705 class DocumentSourceNeedsMongoProcessInterface : public DocumentSource { 706 public: 707 /** 708 * Any functionality needed by an aggregation stage that is either context specific to a mongod 709 * or mongos process, or is only compiled in to one of those two binaries must be accessed via 710 * this interface. This allows all DocumentSources to be parsed on either mongos or mongod, but 711 * only executable where it makes sense. 712 */ 713 class MongoProcessInterface { 714 public: 715 enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle }; 716 enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; 717 enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps }; 718 719 struct MakePipelineOptions { MakePipelineOptionsMakePipelineOptions720 MakePipelineOptions(){}; 721 722 bool optimize = true; 723 bool attachCursorSource = true; 724 725 // Ordinarily, a MongoProcessInterface is injected into the pipeline at the point 726 // when the cursor source is added. If true, 'forceInjectMongoProcessInterface' will 727 // inject MongoProcessInterfaces into the pipeline even if 'attachCursorSource' is 728 // false. If 'attachCursorSource' is true, then the value of 729 // 'forceInjectMongoProcessInterface' is irrelevant. 730 bool forceInjectMongoProcessInterface = false; 731 }; 732 ~MongoProcessInterface()733 virtual ~MongoProcessInterface(){}; 734 735 /** 736 * Sets the OperationContext of the DBDirectClient returned by directClient(). This method 737 * must be called after updating the 'opCtx' member of the ExpressionContext associated with 738 * the document source. 739 */ 740 virtual void setOperationContext(OperationContext* opCtx) = 0; 741 742 /** 743 * Always returns a DBDirectClient. The return type in the function signature is a 744 * DBClientBase* because DBDirectClient isn't linked into mongos. 745 */ 746 virtual DBClientBase* directClient() = 0; 747 748 // Note that in some rare cases this could return a false negative but will never return 749 // a false positive. This method will be fixed in the future once it becomes possible to 750 // avoid false negatives. 751 virtual bool isSharded(const NamespaceString& ns) = 0; 752 753 /** 754 * Inserts 'objs' into 'ns' and returns the "detailed" last error object. 755 */ 756 virtual BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) = 0; 757 758 virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, 759 const NamespaceString& ns) = 0; 760 761 /** 762 * Appends operation latency statistics for collection "nss" to "builder" 763 */ 764 virtual void appendLatencyStats(const NamespaceString& nss, 765 bool includeHistograms, 766 BSONObjBuilder* builder) const = 0; 767 768 /** 769 * Appends storage statistics for collection "nss" to "builder" 770 */ 771 virtual Status appendStorageStats(const NamespaceString& nss, 772 const BSONObj& param, 773 BSONObjBuilder* builder) const = 0; 774 775 /** 776 * Appends the record count for collection "nss" to "builder". 777 */ 778 virtual Status appendRecordCount(const NamespaceString& nss, 779 BSONObjBuilder* builder) const = 0; 780 781 /** 782 * Gets the collection options for the collection given by 'nss'. 783 */ 784 virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0; 785 786 /** 787 * Performs the given rename command if the collection given by 'targetNs' has the same 788 * options as specified in 'originalCollectionOptions', and has the same indexes as 789 * 'originalIndexes'. 790 */ 791 virtual Status renameIfOptionsAndIndexesHaveNotChanged( 792 const BSONObj& renameCommandObj, 793 const NamespaceString& targetNs, 794 const BSONObj& originalCollectionOptions, 795 const std::list<BSONObj>& originalIndexes) = 0; 796 797 /** 798 * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of 799 * the returned pipeline will depend upon the supplied MakePipelineOptions: 800 * - The boolean opts.optimize determines whether the pipeline will be optimized. 801 * - If opts.attachCursorSource is false, the pipeline will be returned without attempting 802 * to add an initial cursor source. 803 * - If opts.forceInjectMongoProcessInterface is true, then a MongoProcessInterface will be 804 * provided to each stage which requires one, regardless of whether a cursor source is 805 * attached to the pipeline. 806 * 807 * This function returns a non-OK status if parsing the pipeline failed. 808 */ 809 virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( 810 const std::vector<BSONObj>& rawPipeline, 811 const boost::intrusive_ptr<ExpressionContext>& expCtx, 812 const MakePipelineOptions opts = MakePipelineOptions{}) = 0; 813 814 /** 815 * Attaches a cursor source to the start of a pipeline. Performs no further optimization. 816 * This function asserts if the collection to be aggregated is sharded. NamespaceNotFound 817 * will be returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. 818 * That should be the only case where NamespaceNotFound is returned. 819 */ 820 virtual Status attachCursorSourceToPipeline( 821 const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; 822 823 /** 824 * Returns a vector of owned BSONObjs, each of which contains details of an in-progress 825 * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report 826 * operations for all authenticated users; otherwise, report only the current user's 827 * operations. 828 */ 829 virtual std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, 830 CurrentOpUserMode userMode, 831 CurrentOpTruncateMode) const = 0; 832 833 /** 834 * Returns the name of the local shard if sharding is enabled, or an empty string. 835 */ 836 virtual std::string getShardName(OperationContext* opCtx) const = 0; 837 838 /** 839 * Returns the fields of the document key (in order) for the current collection, including 840 * the shard key and _id. If _id is not in the shard key, it is added last. 841 */ 842 virtual std::vector<FieldPath> collectDocumentKeyFields(UUID) const = 0; 843 844 /** 845 * Returns zero or one documents with the document key 'documentKey'. 'documentKey' is 846 * treated as a unique identifier of a document, and may include an _id or all fields from 847 * the shard key and an _id. Throws if more than one match was found. Returns boost::none if 848 * no matching documents were found, including cases where the given namespace does not 849 * exist. 850 */ 851 virtual boost::optional<Document> lookupSingleDocument( 852 const NamespaceString& nss, 853 UUID collectionUUID, 854 const Document& documentKey, 855 boost::optional<BSONObj> readConcern) = 0; 856 857 // Add new methods as needed. 858 }; 859 DocumentSourceNeedsMongoProcessInterface(const boost::intrusive_ptr<ExpressionContext> & expCtx)860 DocumentSourceNeedsMongoProcessInterface(const boost::intrusive_ptr<ExpressionContext>& expCtx) 861 : DocumentSource(expCtx) {} 862 injectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface)863 void injectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface) { 864 _mongoProcessInterface = mongoProcessInterface; 865 doInjectMongoProcessInterface(mongoProcessInterface); 866 } 867 868 /** 869 * Derived classes may override this method to register custom inject functionality. 870 */ doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface)871 virtual void doInjectMongoProcessInterface( 872 std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {} 873 detachFromOperationContext()874 void detachFromOperationContext() override { 875 invariant(_mongoProcessInterface); 876 _mongoProcessInterface->setOperationContext(nullptr); 877 doDetachFromOperationContext(); 878 } 879 880 /** 881 * Derived classes may override this method to register custom detach functionality. 882 */ doDetachFromOperationContext()883 virtual void doDetachFromOperationContext() {} 884 reattachToOperationContext(OperationContext * opCtx)885 void reattachToOperationContext(OperationContext* opCtx) final { 886 invariant(_mongoProcessInterface); 887 _mongoProcessInterface->setOperationContext(opCtx); 888 doReattachToOperationContext(opCtx); 889 } 890 891 /** 892 * Derived classes may override this method to register custom reattach functionality. 893 */ doReattachToOperationContext(OperationContext * opCtx)894 virtual void doReattachToOperationContext(OperationContext* opCtx) {} 895 896 protected: 897 // It is invalid to delete through a DocumentSourceNeedsMongoProcessInterface-typed pointer. ~DocumentSourceNeedsMongoProcessInterface()898 virtual ~DocumentSourceNeedsMongoProcessInterface() {} 899 900 // Gives subclasses access to a MongoProcessInterface implementation 901 std::shared_ptr<MongoProcessInterface> _mongoProcessInterface; 902 }; 903 904 905 } // namespace mongo 906