1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/s/client/version_manager.h"
36
37 #include "mongo/client/dbclient_rs.h"
38 #include "mongo/db/namespace_string.h"
39 #include "mongo/s/catalog/sharding_catalog_client.h"
40 #include "mongo/s/catalog_cache.h"
41 #include "mongo/s/catalog_cache.h"
42 #include "mongo/s/chunk_version.h"
43 #include "mongo/s/client/shard_connection.h"
44 #include "mongo/s/client/shard_registry.h"
45 #include "mongo/s/grid.h"
46 #include "mongo/s/is_mongos.h"
47 #include "mongo/s/set_shard_version_request.h"
48 #include "mongo/s/stale_exception.h"
49 #include "mongo/util/log.h"
50
51 namespace mongo {
52
53 using std::shared_ptr;
54 using std::map;
55 using std::string;
56
57 namespace {
58
59 /**
60 * Tracking information, per-connection, of the latest chunk manager iteration or sequence
61 * number that was used to send a shard version over this connection.
62 * When the chunk manager is replaced, implying new versions were loaded, the chunk manager
63 * sequence number is iterated by 1 and connections need to re-send shard versions.
64 */
65 class ConnectionShardStatus {
66 public:
hasAnySequenceSet(DBClientBase * conn)67 bool hasAnySequenceSet(DBClientBase* conn) {
68 stdx::lock_guard<stdx::mutex> lk(_mutex);
69
70 SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId());
71 return seenConnIt != _map.end() && seenConnIt->second.size() > 0;
72 }
73
getSequence(DBClientBase * conn,const string & ns,unsigned long long * sequence)74 bool getSequence(DBClientBase* conn, const string& ns, unsigned long long* sequence) {
75 stdx::lock_guard<stdx::mutex> lk(_mutex);
76
77 SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId());
78 if (seenConnIt == _map.end())
79 return false;
80
81 map<string, unsigned long long>::const_iterator seenNSIt = seenConnIt->second.find(ns);
82 if (seenNSIt == seenConnIt->second.end())
83 return false;
84
85 *sequence = seenNSIt->second;
86 return true;
87 }
88
setSequence(DBClientBase * conn,const string & ns,const unsigned long long & s)89 void setSequence(DBClientBase* conn, const string& ns, const unsigned long long& s) {
90 stdx::lock_guard<stdx::mutex> lk(_mutex);
91 _map[conn->getConnectionId()][ns] = s;
92 }
93
reset(DBClientBase * conn)94 void reset(DBClientBase* conn) {
95 stdx::lock_guard<stdx::mutex> lk(_mutex);
96 _map.erase(conn->getConnectionId());
97 }
98
99 private:
100 // protects _map
101 stdx::mutex _mutex;
102
103 // a map from a connection into ChunkManager's sequence number for each namespace
104 typedef map<unsigned long long, map<string, unsigned long long>> SequenceMap;
105 SequenceMap _map;
106
107 } connectionShardStatus;
108
109 /**
110 * Sends the setShardVersion command on the specified connection.
111 */
setShardVersion(OperationContext * opCtx,DBClientBase * conn,const string & ns,const ConnectionString & configServer,ChunkVersion version,ChunkManager * manager,bool authoritative,BSONObj & result)112 bool setShardVersion(OperationContext* opCtx,
113 DBClientBase* conn,
114 const string& ns,
115 const ConnectionString& configServer,
116 ChunkVersion version,
117 ChunkManager* manager,
118 bool authoritative,
119 BSONObj& result) {
120 ShardId shardId;
121 ConnectionString shardCS;
122 {
123 const auto shard = grid.shardRegistry()->getShardForHostNoReload(
124 uassertStatusOK(HostAndPort::parse(conn->getServerAddress())));
125 uassert(ErrorCodes::ShardNotFound,
126 str::stream() << conn->getServerAddress() << " is not recognized as a shard",
127 shard);
128
129 shardId = shard->getId();
130 shardCS = shard->getConnString();
131 }
132
133 BSONObj cmd;
134
135 if (ns.empty()) {
136 SetShardVersionRequest ssv =
137 SetShardVersionRequest::makeForInit(configServer, shardId, shardCS);
138 cmd = ssv.toBSON();
139 } else {
140 SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning(
141 configServer, shardId, shardCS, NamespaceString(ns), version, authoritative);
142 cmd = ssv.toBSON();
143 }
144
145 LOG(1) << " setShardVersion " << shardId << " " << conn->getServerAddress() << " " << ns
146 << " " << cmd
147 << (manager ? string(str::stream() << " " << manager->getSequenceNumber()) : "");
148
149 return conn->runCommand("admin", cmd, result, 0);
150 }
151
152 /**
153 * Checks whether the specified connection supports versioning.
154 */
getVersionable(DBClientBase * conn)155 DBClientBase* getVersionable(DBClientBase* conn) {
156 switch (conn->type()) {
157 case ConnectionString::LOCAL:
158 case ConnectionString::INVALID:
159 case ConnectionString::CUSTOM:
160 MONGO_UNREACHABLE;
161
162 case ConnectionString::MASTER:
163 return conn;
164 case ConnectionString::SET:
165 DBClientReplicaSet* set = (DBClientReplicaSet*)conn;
166 return &(set->masterConn());
167 }
168
169 MONGO_UNREACHABLE;
170 }
171
172 /**
173 * Special internal logic to run reduced version handshake for empty namespace operations to
174 * shards.
175 *
176 * Eventually this should go completely away, but for now many commands rely on unversioned but
177 * mongos-specific behavior on mongod (auditing and replication information in commands)
178 */
initShardVersionEmptyNS(OperationContext * opCtx,DBClientBase * conn_in)179 bool initShardVersionEmptyNS(OperationContext* opCtx, DBClientBase* conn_in) {
180 try {
181 // May throw if replica set primary is down
182 DBClientBase* const conn = getVersionable(conn_in);
183 dassert(conn); // errors thrown above
184
185 // Check to see if we've already initialized this connection. This avoids sending
186 // setShardVersion multiple times.
187 if (connectionShardStatus.hasAnySequenceSet(conn)) {
188 return false;
189 }
190
191 BSONObj result;
192 const bool ok = setShardVersion(opCtx,
193 conn,
194 "",
195 grid.shardRegistry()->getConfigServerConnectionString(),
196 ChunkVersion(),
197 NULL,
198 true,
199 result);
200
201 LOG(3) << "initial sharding result : " << result;
202
203 connectionShardStatus.setSequence(conn, "", 0);
204 return ok;
205 } catch (const DBException&) {
206 // NOTE: Replica sets may fail to initShardVersion because future calls relying on
207 // correct versioning must later call checkShardVersion on the primary.
208 // Secondary queries and commands may not call checkShardVersion, but secondary ops
209 // aren't versioned at all.
210 if (conn_in->type() != ConnectionString::SET) {
211 throw;
212 }
213
214 // NOTE: Only old-style cluster operations will talk via DBClientReplicaSets - using
215 // checkShardVersion is required (which includes initShardVersion information) if these
216 // connections are used.
217
218 OCCASIONALLY {
219 warning() << "failed to initialize new replica set connection version, "
220 << "will initialize on first use";
221 }
222
223 return false;
224 }
225 }
226
227 /**
228 * Updates the remote cached version on the remote shard host (primary, in the case of replica
229 * sets) if needed with a fully-qualified shard version for the given namespace:
230 * config server(s) + shard name + shard version
231 *
232 * If no remote cached version has ever been set, an initial shard version is sent.
233 *
234 * If the namespace is empty and no version has ever been sent, the config server + shard name
235 * is sent to the remote shard host to initialize the connection as coming from mongos.
236 * NOTE: This initialization is *best-effort only*. Operations which wish to correctly version
237 * must send the namespace.
238 *
239 * Config servers are special and are not (unless otherwise a shard) kept up to date with this
240 * protocol. This is safe so long as config servers only contain unversioned collections.
241 *
242 * It is an error to call checkShardVersion with an unversionable connection (isVersionableCB).
243 *
244 * @return true if we contacted the remote host
245 */
checkShardVersion(OperationContext * opCtx,DBClientBase * conn_in,const string & ns,shared_ptr<ChunkManager> refManager,bool authoritative,int tryNumber)246 bool checkShardVersion(OperationContext* opCtx,
247 DBClientBase* conn_in,
248 const string& ns,
249 shared_ptr<ChunkManager> refManager,
250 bool authoritative,
251 int tryNumber) {
252 // Empty namespaces are special - we require initialization but not versioning
253 if (ns.size() == 0) {
254 return initShardVersionEmptyNS(opCtx, conn_in);
255 }
256
257 DBClientBase* const conn = getVersionable(conn_in);
258 verify(conn); // errors thrown above
259
260 const NamespaceString nss(ns);
261
262 auto const catalogCache = Grid::get(opCtx)->catalogCache();
263
264 if (authoritative) {
265 Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
266 }
267
268 auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss);
269 if (!routingInfoStatus.isOK()) {
270 return false;
271 }
272
273 auto& routingInfo = routingInfoStatus.getValue();
274
275 const auto manager = routingInfo.cm();
276 const auto primary = routingInfo.primary();
277
278 unsigned long long officialSequenceNumber = 0;
279
280 if (manager) {
281 officialSequenceNumber = manager->getSequenceNumber();
282 } else if (primary && primary->isConfig()) {
283 // Do not send setShardVersion to collections on the config servers - this causes problems
284 // when config servers are also shards and get SSV with conflicting names.
285 return false;
286 }
287
288 const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
289
290 const auto shard = shardRegistry->getShardForHostNoReload(
291 uassertStatusOK(HostAndPort::parse(conn->getServerAddress())));
292 uassert(ErrorCodes::ShardNotFound,
293 str::stream() << conn->getServerAddress() << " is not recognized as a shard",
294 shard);
295
296 // Check this manager against the reference manager
297 if (manager) {
298 if (refManager && !refManager->compatibleWith(*manager, shard->getId())) {
299 const ChunkVersion refVersion(refManager->getVersion(shard->getId()));
300 const ChunkVersion currentVersion(manager->getVersion(shard->getId()));
301
302 string msg(str::stream() << "manager (" << currentVersion.toString() << " : "
303 << manager->getSequenceNumber()
304 << ") "
305 << "not compatible with reference manager ("
306 << refVersion.toString()
307 << " : "
308 << refManager->getSequenceNumber()
309 << ") "
310 << "on shard "
311 << shard->getId()
312 << " ("
313 << shard->getConnString().toString()
314 << ")");
315
316 throw StaleConfigException(ns, msg, refVersion, currentVersion);
317 }
318 } else if (refManager) {
319 string msg(str::stream() << "not sharded (" << (!manager ? string("<none>") : str::stream()
320 << manager->getSequenceNumber())
321 << ") but has reference manager ("
322 << refManager->getSequenceNumber()
323 << ") "
324 << "on conn "
325 << conn->getServerAddress()
326 << " ("
327 << conn_in->getServerAddress()
328 << ")");
329
330 throw StaleConfigException(
331 ns, msg, refManager->getVersion(shard->getId()), ChunkVersion::UNSHARDED());
332 }
333
334 // Has the ChunkManager been reloaded since the last time we updated the shard version over
335 // this connection? If we've never updated the shard version, do so now.
336 unsigned long long sequenceNumber = 0;
337 if (connectionShardStatus.getSequence(conn, ns, &sequenceNumber)) {
338 if (sequenceNumber == officialSequenceNumber) {
339 return false;
340 }
341 }
342
343 ChunkVersion version = ChunkVersion(0, 0, OID());
344 if (manager) {
345 version = manager->getVersion(shard->getId());
346 }
347
348 LOG(1) << "setting shard version of " << version << " for " << ns << " on shard "
349 << shard->toString();
350
351 LOG(3) << "last version sent with chunk manager iteration " << sequenceNumber
352 << ", current chunk manager iteration is " << officialSequenceNumber;
353
354 BSONObj result;
355 if (setShardVersion(opCtx,
356 conn,
357 ns,
358 shardRegistry->getConfigServerConnectionString(),
359 version,
360 manager.get(),
361 authoritative,
362 result)) {
363 LOG(1) << " setShardVersion success: " << result;
364 connectionShardStatus.setSequence(conn, ns, officialSequenceNumber);
365 return true;
366 }
367
368 // If the shard rejected the setShardVersion, return the error to the user.
369 int errCode = result["code"].numberInt();
370 uassert(errCode, result["errmsg"].String(), errCode != ErrorCodes::NoShardingEnabled);
371
372 LOG(1) << " setShardVersion failed!\n" << result;
373
374 if (result["need_authoritative"].trueValue())
375 massert(10428, "need_authoritative set but in authoritative mode already", !authoritative);
376
377 if (!authoritative) {
378 // use the original connection and get a fresh versionable connection
379 // since conn can be invalidated (or worse, freed) after the failure
380 checkShardVersion(opCtx, conn_in, ns, refManager, 1, tryNumber + 1);
381 return true;
382 }
383
384 Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
385
386 const int maxNumTries = 7;
387 if (tryNumber < maxNumTries) {
388 LOG(tryNumber < (maxNumTries / 2) ? 1 : 0)
389 << "going to retry checkShardVersion shard: " << shard->toString() << " " << result;
390 sleepmillis(10 * tryNumber);
391 // use the original connection and get a fresh versionable connection
392 // since conn can be invalidated (or worse, freed) after the failure
393 checkShardVersion(opCtx, conn_in, ns, refManager, true, tryNumber + 1);
394 return true;
395 }
396
397 string errmsg = str::stream() << "setShardVersion failed shard: " << shard->toString() << " "
398 << result;
399 log() << " " << errmsg;
400 massert(10429, errmsg, 0);
401 return true;
402 }
403
404 } // namespace
405
406 // Global version manager
407 VersionManager versionManager;
408
resetShardVersionCB(DBClientBase * conn)409 void VersionManager::resetShardVersionCB(DBClientBase* conn) {
410 connectionShardStatus.reset(conn);
411 }
412
isVersionableCB(DBClientBase * conn)413 bool VersionManager::isVersionableCB(DBClientBase* conn) {
414 // We do not version shard connections when issued from mongod
415 if (!isMongos()) {
416 return false;
417 }
418
419 return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET;
420 }
421
checkShardVersionCB(OperationContext * opCtx,DBClientBase * conn_in,const string & ns,bool authoritative,int tryNumber)422 bool VersionManager::checkShardVersionCB(OperationContext* opCtx,
423 DBClientBase* conn_in,
424 const string& ns,
425 bool authoritative,
426 int tryNumber) {
427 return checkShardVersion(opCtx, conn_in, ns, nullptr, authoritative, tryNumber);
428 }
429
checkShardVersionCB(OperationContext * opCtx,ShardConnection * conn_in,bool authoritative,int tryNumber)430 bool VersionManager::checkShardVersionCB(OperationContext* opCtx,
431 ShardConnection* conn_in,
432 bool authoritative,
433 int tryNumber) {
434 return checkShardVersion(
435 opCtx, conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber);
436 }
437
438 } // namespace mongo
439