1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <set> 34 #include <string> 35 36 #include "mongo/client/dbclientinterface.h" 37 #include "mongo/client/query.h" 38 #include "mongo/db/namespace_string.h" 39 #include "mongo/s/client/shard.h" 40 41 namespace mongo { 42 43 class ChunkManager; 44 class DBClientCursor; 45 class DBClientCursorHolder; 46 class OperationContext; 47 struct ParallelConnectionMetadata; 48 struct ParallelConnectionState; 49 class StaleConfigException; 50 51 struct CommandInfo { 52 CommandInfo() = default; 53 CommandInfoCommandInfo54 CommandInfo(const std::string& vns, const BSONObj& filter, const BSONObj& collation) 55 : versionedNS(vns), cmdFilter(filter), cmdCollation(collation) {} 56 isEmptyCommandInfo57 bool isEmpty() const { 58 return versionedNS.empty(); 59 } 60 toStringCommandInfo61 std::string toString() const { 62 return str::stream() << "CInfo " 63 << BSON("v_ns" << versionedNS << "filter" << cmdFilter << "collation" 64 << cmdCollation); 65 } 66 67 std::string versionedNS; 68 BSONObj cmdFilter; 69 BSONObj cmdCollation; 70 }; 71 72 /** 73 * Runs a query in parallel across N servers, enforcing compatible chunk versions for queries 74 * across all shards. 75 * 76 * If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the 77 * query spec, but instead enforces versions across another namespace specified by CommandInfo. 78 * This is to support commands like: 79 * db.runCommand({ fileMD5 : "<coll name>" }) 80 * 81 * There is a deprecated legacy mode as well which effectively does a merge-sort across a number 82 * of servers, but does not correctly enforce versioning (used only in mapreduce). 83 */ 84 class ParallelSortClusteredCursor { 85 public: 86 ParallelSortClusteredCursor(const QuerySpec& qSpec, const CommandInfo& cInfo); 87 88 // DEPRECATED legacy constructor for pure mergesort functionality - do not use 89 ParallelSortClusteredCursor(const std::set<std::string>& servers, 90 const std::string& ns, 91 const Query& q, 92 int options = 0, 93 const BSONObj& fields = BSONObj()); 94 95 ~ParallelSortClusteredCursor(); 96 97 void init(OperationContext* opCtx); 98 99 bool more(); 100 101 BSONObj next(); 102 103 /** 104 * Returns the set of shards with open cursors. 105 */ 106 void getQueryShardIds(std::set<ShardId>& shardIds) const; 107 108 std::shared_ptr<DBClientCursor> getShardCursor(const ShardId& shardId) const; 109 110 private: 111 using ShardCursorsMap = std::map<ShardId, ParallelConnectionMetadata>; 112 113 void fullInit(OperationContext* opCtx); 114 void startInit(OperationContext* opCtx); 115 void finishInit(OperationContext* opCtx); 116 isCommand()117 bool isCommand() { 118 return NamespaceString(_qSpec.ns()).isCommand(); 119 } 120 121 void _finishCons(); 122 123 void _markStaleNS(const NamespaceString& staleNS, const StaleConfigException& e); 124 125 bool _didInit; 126 bool _done; 127 128 QuerySpec _qSpec; 129 CommandInfo _cInfo; 130 131 // Count round-trips req'd for namespaces and total 132 std::map<std::string, int> _staleNSMap; 133 134 int _totalTries; 135 136 ShardCursorsMap _cursorMap; 137 138 // LEGACY BELOW 139 int _numServers; 140 int _lastFrom; 141 std::set<std::string> _servers; 142 BSONObj _sortKey; 143 144 DBClientCursorHolder* _cursors; 145 int _needToSkip; 146 147 /** 148 * Setups the shard version of the connection. When using a replica 149 * set connection and the primary cannot be reached, the version 150 * will not be set if the slaveOk flag is set. 151 */ 152 void setupVersionAndHandleSlaveOk(OperationContext* opCtx, 153 std::shared_ptr<ParallelConnectionState> state /* in & out */, 154 const ShardId& shardId, 155 std::shared_ptr<Shard> primary /* in */, 156 const NamespaceString& ns, 157 const std::string& vinfo, 158 std::shared_ptr<ChunkManager> manager /* in */); 159 160 // LEGACY init - Needed for map reduce 161 void _oldInit(); 162 163 // LEGACY - Needed ONLY for _oldInit 164 std::string _ns; 165 BSONObj _query; 166 int _options; 167 BSONObj _fields; 168 int _batchSize; 169 }; 170 171 /** 172 * Throws a StaleConfigException wrapping the stale error document in this cursor when the 173 * ShardConfigStale flag is set or a command returns a ErrorCodes::StaleConfig error code. 174 */ 175 void throwCursorStale(DBClientCursor* cursor); 176 177 } // namespace mongo 178