1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include "mongo/client/connpool.h" 34 #include "mongo/db/clientcursor.h" 35 #include "mongo/db/cursor_id.h" 36 #include "mongo/db/pipeline/document_source.h" 37 38 namespace mongo { 39 40 class DocumentSourceMergeCursors : public DocumentSource { 41 public: 42 static constexpr StringData kStageName = "$mergeCursors"_sd; 43 44 struct CursorDescriptor { CursorDescriptorCursorDescriptor45 CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId) 46 : connectionString(std::move(connectionString)), 47 ns(std::move(ns)), 48 cursorId(cursorId) {} 49 50 ConnectionString connectionString; 51 std::string ns; 52 CursorId cursorId; 53 }; 54 55 GetNextResult getNext() final; 56 getSourceName()57 const char* getSourceName() const final { 58 return kStageName.rawData(); 59 } 60 61 Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; 62 constraints(Pipeline::SplitState pipeState)63 StageConstraints constraints(Pipeline::SplitState pipeState) const final { 64 StageConstraints constraints(StreamType::kStreaming, 65 PositionRequirement::kFirst, 66 HostTypeRequirement::kAnyShard, 67 DiskUseRequirement::kNoDiskUse, 68 FacetRequirement::kNotAllowed); 69 70 constraints.requiresInputDocSource = false; 71 return constraints; 72 } 73 74 static boost::intrusive_ptr<DocumentSource> createFromBson( 75 BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); 76 77 static boost::intrusive_ptr<DocumentSource> create( 78 std::vector<CursorDescriptor> cursorDescriptors, 79 const boost::intrusive_ptr<ExpressionContext>& pExpCtx); 80 81 /** 82 * Returns true if all remotes have reported that their cursors are closed. 83 */ 84 bool remotesExhausted() const; 85 86 /** Returns non-owning pointers to cursors managed by this stage. 87 * Call this instead of getNext() if you want access to the raw streams. 88 * This method should only be called at most once. 89 */ 90 std::vector<DBClientCursor*> getCursors(); 91 92 /** 93 * Returns the next object from the cursor, throwing an appropriate exception if the cursor 94 * reported an error. This is a better form of DBClientCursor::nextSafe. 95 */ 96 static Document nextSafeFrom(DBClientCursor* cursor); 97 98 protected: 99 void doDispose() final; 100 101 private: 102 struct CursorAndConnection { 103 CursorAndConnection(const CursorDescriptor& cursorDescriptor); 104 ScopedDbConnection connection; 105 DBClientCursor cursor; 106 }; 107 108 // using list to enable removing arbitrary elements 109 typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors; 110 111 DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors, 112 const boost::intrusive_ptr<ExpressionContext>& pExpCtx); 113 114 // Converts _cursorDescriptors into active _cursors. 115 void start(); 116 117 // This is the description of cursors to merge. 118 const std::vector<CursorDescriptor> _cursorDescriptors; 119 120 // These are the actual cursors we are merging. Created lazily. 121 Cursors _cursors; 122 Cursors::iterator _currentCursor; 123 124 bool _unstarted; 125 }; 126 127 } // namespace mongo 128