1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <set>
34 #include <string>
35 
36 #include "mongo/client/dbclientinterface.h"
37 #include "mongo/client/query.h"
38 #include "mongo/db/namespace_string.h"
39 #include "mongo/s/client/shard.h"
40 
41 namespace mongo {
42 
43 class ChunkManager;
44 class DBClientCursor;
45 class DBClientCursorHolder;
46 class OperationContext;
47 struct ParallelConnectionMetadata;
48 struct ParallelConnectionState;
49 class StaleConfigException;
50 
51 struct CommandInfo {
52     CommandInfo() = default;
53 
CommandInfoCommandInfo54     CommandInfo(const std::string& vns, const BSONObj& filter, const BSONObj& collation)
55         : versionedNS(vns), cmdFilter(filter), cmdCollation(collation) {}
56 
isEmptyCommandInfo57     bool isEmpty() const {
58         return versionedNS.empty();
59     }
60 
toStringCommandInfo61     std::string toString() const {
62         return str::stream() << "CInfo "
63                              << BSON("v_ns" << versionedNS << "filter" << cmdFilter << "collation"
64                                             << cmdCollation);
65     }
66 
67     std::string versionedNS;
68     BSONObj cmdFilter;
69     BSONObj cmdCollation;
70 };
71 
72 /**
73  * Runs a query in parallel across N servers, enforcing compatible chunk versions for queries
74  * across all shards.
75  *
76  * If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the
77  * query spec, but instead enforces versions across another namespace specified by CommandInfo.
78  * This is to support commands like:
79  * db.runCommand({ fileMD5 : "<coll name>" })
80  *
81  * There is a deprecated legacy mode as well which effectively does a merge-sort across a number
82  * of servers, but does not correctly enforce versioning (used only in mapreduce).
83  */
84 class ParallelSortClusteredCursor {
85 public:
86     ParallelSortClusteredCursor(const QuerySpec& qSpec, const CommandInfo& cInfo);
87 
88     // DEPRECATED legacy constructor for pure mergesort functionality - do not use
89     ParallelSortClusteredCursor(const std::set<std::string>& servers,
90                                 const std::string& ns,
91                                 const Query& q,
92                                 int options = 0,
93                                 const BSONObj& fields = BSONObj());
94 
95     ~ParallelSortClusteredCursor();
96 
97     void init(OperationContext* opCtx);
98 
99     bool more();
100 
101     BSONObj next();
102 
103     /**
104      * Returns the set of shards with open cursors.
105      */
106     void getQueryShardIds(std::set<ShardId>& shardIds) const;
107 
108     std::shared_ptr<DBClientCursor> getShardCursor(const ShardId& shardId) const;
109 
110 private:
111     using ShardCursorsMap = std::map<ShardId, ParallelConnectionMetadata>;
112 
113     void fullInit(OperationContext* opCtx);
114     void startInit(OperationContext* opCtx);
115     void finishInit(OperationContext* opCtx);
116 
isCommand()117     bool isCommand() {
118         return NamespaceString(_qSpec.ns()).isCommand();
119     }
120 
121     void _finishCons();
122 
123     void _markStaleNS(const NamespaceString& staleNS, const StaleConfigException& e);
124 
125     bool _didInit;
126     bool _done;
127 
128     QuerySpec _qSpec;
129     CommandInfo _cInfo;
130 
131     // Count round-trips req'd for namespaces and total
132     std::map<std::string, int> _staleNSMap;
133 
134     int _totalTries;
135 
136     ShardCursorsMap _cursorMap;
137 
138     // LEGACY BELOW
139     int _numServers;
140     int _lastFrom;
141     std::set<std::string> _servers;
142     BSONObj _sortKey;
143 
144     DBClientCursorHolder* _cursors;
145     int _needToSkip;
146 
147     /**
148      * Setups the shard version of the connection. When using a replica
149      * set connection and the primary cannot be reached, the version
150      * will not be set if the slaveOk flag is set.
151      */
152     void setupVersionAndHandleSlaveOk(OperationContext* opCtx,
153                                       std::shared_ptr<ParallelConnectionState> state /* in & out */,
154                                       const ShardId& shardId,
155                                       std::shared_ptr<Shard> primary /* in */,
156                                       const NamespaceString& ns,
157                                       const std::string& vinfo,
158                                       std::shared_ptr<ChunkManager> manager /* in */);
159 
160     // LEGACY init - Needed for map reduce
161     void _oldInit();
162 
163     // LEGACY - Needed ONLY for _oldInit
164     std::string _ns;
165     BSONObj _query;
166     int _options;
167     BSONObj _fields;
168     int _batchSize;
169 };
170 
171 /**
172  * Throws a StaleConfigException wrapping the stale error document in this cursor when the
173  * ShardConfigStale flag is set or a command returns a ErrorCodes::StaleConfig error code.
174  */
175 void throwCursorStale(DBClientCursor* cursor);
176 
177 }  // namespace mongo
178