1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include "mongo/client/connpool.h"
34 #include "mongo/db/clientcursor.h"
35 #include "mongo/db/cursor_id.h"
36 #include "mongo/db/pipeline/document_source.h"
37 
38 namespace mongo {
39 
40 class DocumentSourceMergeCursors : public DocumentSource {
41 public:
42     static constexpr StringData kStageName = "$mergeCursors"_sd;
43 
44     struct CursorDescriptor {
CursorDescriptorCursorDescriptor45         CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId)
46             : connectionString(std::move(connectionString)),
47               ns(std::move(ns)),
48               cursorId(cursorId) {}
49 
50         ConnectionString connectionString;
51         std::string ns;
52         CursorId cursorId;
53     };
54 
55     GetNextResult getNext() final;
56 
getSourceName()57     const char* getSourceName() const final {
58         return kStageName.rawData();
59     }
60 
61     Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
62 
constraints(Pipeline::SplitState pipeState)63     StageConstraints constraints(Pipeline::SplitState pipeState) const final {
64         StageConstraints constraints(StreamType::kStreaming,
65                                      PositionRequirement::kFirst,
66                                      HostTypeRequirement::kAnyShard,
67                                      DiskUseRequirement::kNoDiskUse,
68                                      FacetRequirement::kNotAllowed);
69 
70         constraints.requiresInputDocSource = false;
71         return constraints;
72     }
73 
74     static boost::intrusive_ptr<DocumentSource> createFromBson(
75         BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
76 
77     static boost::intrusive_ptr<DocumentSource> create(
78         std::vector<CursorDescriptor> cursorDescriptors,
79         const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
80 
81     /**
82      * Returns true if all remotes have reported that their cursors are closed.
83      */
84     bool remotesExhausted() const;
85 
86     /** Returns non-owning pointers to cursors managed by this stage.
87      *  Call this instead of getNext() if you want access to the raw streams.
88      *  This method should only be called at most once.
89      */
90     std::vector<DBClientCursor*> getCursors();
91 
92     /**
93      * Returns the next object from the cursor, throwing an appropriate exception if the cursor
94      * reported an error. This is a better form of DBClientCursor::nextSafe.
95      */
96     static Document nextSafeFrom(DBClientCursor* cursor);
97 
98 protected:
99     void doDispose() final;
100 
101 private:
102     struct CursorAndConnection {
103         CursorAndConnection(const CursorDescriptor& cursorDescriptor);
104         ScopedDbConnection connection;
105         DBClientCursor cursor;
106     };
107 
108     // using list to enable removing arbitrary elements
109     typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors;
110 
111     DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors,
112                                const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
113 
114     // Converts _cursorDescriptors into active _cursors.
115     void start();
116 
117     // This is the description of cursors to merge.
118     const std::vector<CursorDescriptor> _cursorDescriptors;
119 
120     // These are the actual cursors we are merging. Created lazily.
121     Cursors _cursors;
122     Cursors::iterator _currentCursor;
123 
124     bool _unstarted;
125 };
126 
127 }  // namespace mongo
128