1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/s/client/version_manager.h"
36 
37 #include "mongo/client/dbclient_rs.h"
38 #include "mongo/db/namespace_string.h"
39 #include "mongo/s/catalog/sharding_catalog_client.h"
40 #include "mongo/s/catalog_cache.h"
41 #include "mongo/s/catalog_cache.h"
42 #include "mongo/s/chunk_version.h"
43 #include "mongo/s/client/shard_connection.h"
44 #include "mongo/s/client/shard_registry.h"
45 #include "mongo/s/grid.h"
46 #include "mongo/s/is_mongos.h"
47 #include "mongo/s/set_shard_version_request.h"
48 #include "mongo/s/stale_exception.h"
49 #include "mongo/util/log.h"
50 
51 namespace mongo {
52 
53 using std::shared_ptr;
54 using std::map;
55 using std::string;
56 
57 namespace {
58 
59 /**
60  * Tracking information, per-connection, of the latest chunk manager iteration or sequence
61  * number that was used to send a shard version over this connection.
62  * When the chunk manager is replaced, implying new versions were loaded, the chunk manager
63  * sequence number is iterated by 1 and connections need to re-send shard versions.
64  */
65 class ConnectionShardStatus {
66 public:
hasAnySequenceSet(DBClientBase * conn)67     bool hasAnySequenceSet(DBClientBase* conn) {
68         stdx::lock_guard<stdx::mutex> lk(_mutex);
69 
70         SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId());
71         return seenConnIt != _map.end() && seenConnIt->second.size() > 0;
72     }
73 
getSequence(DBClientBase * conn,const string & ns,unsigned long long * sequence)74     bool getSequence(DBClientBase* conn, const string& ns, unsigned long long* sequence) {
75         stdx::lock_guard<stdx::mutex> lk(_mutex);
76 
77         SequenceMap::const_iterator seenConnIt = _map.find(conn->getConnectionId());
78         if (seenConnIt == _map.end())
79             return false;
80 
81         map<string, unsigned long long>::const_iterator seenNSIt = seenConnIt->second.find(ns);
82         if (seenNSIt == seenConnIt->second.end())
83             return false;
84 
85         *sequence = seenNSIt->second;
86         return true;
87     }
88 
setSequence(DBClientBase * conn,const string & ns,const unsigned long long & s)89     void setSequence(DBClientBase* conn, const string& ns, const unsigned long long& s) {
90         stdx::lock_guard<stdx::mutex> lk(_mutex);
91         _map[conn->getConnectionId()][ns] = s;
92     }
93 
reset(DBClientBase * conn)94     void reset(DBClientBase* conn) {
95         stdx::lock_guard<stdx::mutex> lk(_mutex);
96         _map.erase(conn->getConnectionId());
97     }
98 
99 private:
100     // protects _map
101     stdx::mutex _mutex;
102 
103     // a map from a connection into ChunkManager's sequence number for each namespace
104     typedef map<unsigned long long, map<string, unsigned long long>> SequenceMap;
105     SequenceMap _map;
106 
107 } connectionShardStatus;
108 
109 /**
110  * Sends the setShardVersion command on the specified connection.
111  */
setShardVersion(OperationContext * opCtx,DBClientBase * conn,const string & ns,const ConnectionString & configServer,ChunkVersion version,ChunkManager * manager,bool authoritative,BSONObj & result)112 bool setShardVersion(OperationContext* opCtx,
113                      DBClientBase* conn,
114                      const string& ns,
115                      const ConnectionString& configServer,
116                      ChunkVersion version,
117                      ChunkManager* manager,
118                      bool authoritative,
119                      BSONObj& result) {
120     ShardId shardId;
121     ConnectionString shardCS;
122     {
123         const auto shard = grid.shardRegistry()->getShardForHostNoReload(
124             uassertStatusOK(HostAndPort::parse(conn->getServerAddress())));
125         uassert(ErrorCodes::ShardNotFound,
126                 str::stream() << conn->getServerAddress() << " is not recognized as a shard",
127                 shard);
128 
129         shardId = shard->getId();
130         shardCS = shard->getConnString();
131     }
132 
133     BSONObj cmd;
134 
135     if (ns.empty()) {
136         SetShardVersionRequest ssv =
137             SetShardVersionRequest::makeForInit(configServer, shardId, shardCS);
138         cmd = ssv.toBSON();
139     } else {
140         SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning(
141             configServer, shardId, shardCS, NamespaceString(ns), version, authoritative);
142         cmd = ssv.toBSON();
143     }
144 
145     LOG(1) << "    setShardVersion  " << shardId << " " << conn->getServerAddress() << "  " << ns
146            << "  " << cmd
147            << (manager ? string(str::stream() << " " << manager->getSequenceNumber()) : "");
148 
149     return conn->runCommand("admin", cmd, result, 0);
150 }
151 
152 /**
153  * Checks whether the specified connection supports versioning.
154  */
getVersionable(DBClientBase * conn)155 DBClientBase* getVersionable(DBClientBase* conn) {
156     switch (conn->type()) {
157         case ConnectionString::LOCAL:
158         case ConnectionString::INVALID:
159         case ConnectionString::CUSTOM:
160             MONGO_UNREACHABLE;
161 
162         case ConnectionString::MASTER:
163             return conn;
164         case ConnectionString::SET:
165             DBClientReplicaSet* set = (DBClientReplicaSet*)conn;
166             return &(set->masterConn());
167     }
168 
169     MONGO_UNREACHABLE;
170 }
171 
172 /**
173  * Special internal logic to run reduced version handshake for empty namespace operations to
174  * shards.
175  *
176  * Eventually this should go completely away, but for now many commands rely on unversioned but
177  * mongos-specific behavior on mongod (auditing and replication information in commands)
178  */
initShardVersionEmptyNS(OperationContext * opCtx,DBClientBase * conn_in)179 bool initShardVersionEmptyNS(OperationContext* opCtx, DBClientBase* conn_in) {
180     try {
181         // May throw if replica set primary is down
182         DBClientBase* const conn = getVersionable(conn_in);
183         dassert(conn);  // errors thrown above
184 
185         // Check to see if we've already initialized this connection. This avoids sending
186         // setShardVersion multiple times.
187         if (connectionShardStatus.hasAnySequenceSet(conn)) {
188             return false;
189         }
190 
191         BSONObj result;
192         const bool ok = setShardVersion(opCtx,
193                                         conn,
194                                         "",
195                                         grid.shardRegistry()->getConfigServerConnectionString(),
196                                         ChunkVersion(),
197                                         NULL,
198                                         true,
199                                         result);
200 
201         LOG(3) << "initial sharding result : " << result;
202 
203         connectionShardStatus.setSequence(conn, "", 0);
204         return ok;
205     } catch (const DBException&) {
206         // NOTE: Replica sets may fail to initShardVersion because future calls relying on
207         // correct versioning must later call checkShardVersion on the primary.
208         // Secondary queries and commands may not call checkShardVersion, but secondary ops
209         // aren't versioned at all.
210         if (conn_in->type() != ConnectionString::SET) {
211             throw;
212         }
213 
214         // NOTE: Only old-style cluster operations will talk via DBClientReplicaSets - using
215         // checkShardVersion is required (which includes initShardVersion information) if these
216         // connections are used.
217 
218         OCCASIONALLY {
219             warning() << "failed to initialize new replica set connection version, "
220                       << "will initialize on first use";
221         }
222 
223         return false;
224     }
225 }
226 
227 /**
228  * Updates the remote cached version on the remote shard host (primary, in the case of replica
229  * sets) if needed with a fully-qualified shard version for the given namespace:
230  *   config server(s) + shard name + shard version
231  *
232  * If no remote cached version has ever been set, an initial shard version is sent.
233  *
234  * If the namespace is empty and no version has ever been sent, the config server + shard name
235  * is sent to the remote shard host to initialize the connection as coming from mongos.
236  * NOTE: This initialization is *best-effort only*.  Operations which wish to correctly version
237  * must send the namespace.
238  *
239  * Config servers are special and are not (unless otherwise a shard) kept up to date with this
240  * protocol.  This is safe so long as config servers only contain unversioned collections.
241  *
242  * It is an error to call checkShardVersion with an unversionable connection (isVersionableCB).
243  *
244  * @return true if we contacted the remote host
245  */
checkShardVersion(OperationContext * opCtx,DBClientBase * conn_in,const string & ns,shared_ptr<ChunkManager> refManager,bool authoritative,int tryNumber)246 bool checkShardVersion(OperationContext* opCtx,
247                        DBClientBase* conn_in,
248                        const string& ns,
249                        shared_ptr<ChunkManager> refManager,
250                        bool authoritative,
251                        int tryNumber) {
252     // Empty namespaces are special - we require initialization but not versioning
253     if (ns.size() == 0) {
254         return initShardVersionEmptyNS(opCtx, conn_in);
255     }
256 
257     DBClientBase* const conn = getVersionable(conn_in);
258     verify(conn);  // errors thrown above
259 
260     const NamespaceString nss(ns);
261 
262     auto const catalogCache = Grid::get(opCtx)->catalogCache();
263 
264     if (authoritative) {
265         Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
266     }
267 
268     auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss);
269     if (!routingInfoStatus.isOK()) {
270         return false;
271     }
272 
273     auto& routingInfo = routingInfoStatus.getValue();
274 
275     const auto manager = routingInfo.cm();
276     const auto primary = routingInfo.primary();
277 
278     unsigned long long officialSequenceNumber = 0;
279 
280     if (manager) {
281         officialSequenceNumber = manager->getSequenceNumber();
282     } else if (primary && primary->isConfig()) {
283         // Do not send setShardVersion to collections on the config servers - this causes problems
284         // when config servers are also shards and get SSV with conflicting names.
285         return false;
286     }
287 
288     const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
289 
290     const auto shard = shardRegistry->getShardForHostNoReload(
291         uassertStatusOK(HostAndPort::parse(conn->getServerAddress())));
292     uassert(ErrorCodes::ShardNotFound,
293             str::stream() << conn->getServerAddress() << " is not recognized as a shard",
294             shard);
295 
296     // Check this manager against the reference manager
297     if (manager) {
298         if (refManager && !refManager->compatibleWith(*manager, shard->getId())) {
299             const ChunkVersion refVersion(refManager->getVersion(shard->getId()));
300             const ChunkVersion currentVersion(manager->getVersion(shard->getId()));
301 
302             string msg(str::stream() << "manager (" << currentVersion.toString() << " : "
303                                      << manager->getSequenceNumber()
304                                      << ") "
305                                      << "not compatible with reference manager ("
306                                      << refVersion.toString()
307                                      << " : "
308                                      << refManager->getSequenceNumber()
309                                      << ") "
310                                      << "on shard "
311                                      << shard->getId()
312                                      << " ("
313                                      << shard->getConnString().toString()
314                                      << ")");
315 
316             throw StaleConfigException(ns, msg, refVersion, currentVersion);
317         }
318     } else if (refManager) {
319         string msg(str::stream() << "not sharded (" << (!manager ? string("<none>") : str::stream()
320                                                                 << manager->getSequenceNumber())
321                                  << ") but has reference manager ("
322                                  << refManager->getSequenceNumber()
323                                  << ") "
324                                  << "on conn "
325                                  << conn->getServerAddress()
326                                  << " ("
327                                  << conn_in->getServerAddress()
328                                  << ")");
329 
330         throw StaleConfigException(
331             ns, msg, refManager->getVersion(shard->getId()), ChunkVersion::UNSHARDED());
332     }
333 
334     // Has the ChunkManager been reloaded since the last time we updated the shard version over
335     // this connection?  If we've never updated the shard version, do so now.
336     unsigned long long sequenceNumber = 0;
337     if (connectionShardStatus.getSequence(conn, ns, &sequenceNumber)) {
338         if (sequenceNumber == officialSequenceNumber) {
339             return false;
340         }
341     }
342 
343     ChunkVersion version = ChunkVersion(0, 0, OID());
344     if (manager) {
345         version = manager->getVersion(shard->getId());
346     }
347 
348     LOG(1) << "setting shard version of " << version << " for " << ns << " on shard "
349            << shard->toString();
350 
351     LOG(3) << "last version sent with chunk manager iteration " << sequenceNumber
352            << ", current chunk manager iteration is " << officialSequenceNumber;
353 
354     BSONObj result;
355     if (setShardVersion(opCtx,
356                         conn,
357                         ns,
358                         shardRegistry->getConfigServerConnectionString(),
359                         version,
360                         manager.get(),
361                         authoritative,
362                         result)) {
363         LOG(1) << "      setShardVersion success: " << result;
364         connectionShardStatus.setSequence(conn, ns, officialSequenceNumber);
365         return true;
366     }
367 
368     // If the shard rejected the setShardVersion, return the error to the user.
369     int errCode = result["code"].numberInt();
370     uassert(errCode, result["errmsg"].String(), errCode != ErrorCodes::NoShardingEnabled);
371 
372     LOG(1) << "       setShardVersion failed!\n" << result;
373 
374     if (result["need_authoritative"].trueValue())
375         massert(10428, "need_authoritative set but in authoritative mode already", !authoritative);
376 
377     if (!authoritative) {
378         // use the original connection and get a fresh versionable connection
379         // since conn can be invalidated (or worse, freed) after the failure
380         checkShardVersion(opCtx, conn_in, ns, refManager, 1, tryNumber + 1);
381         return true;
382     }
383 
384     Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
385 
386     const int maxNumTries = 7;
387     if (tryNumber < maxNumTries) {
388         LOG(tryNumber < (maxNumTries / 2) ? 1 : 0)
389             << "going to retry checkShardVersion shard: " << shard->toString() << " " << result;
390         sleepmillis(10 * tryNumber);
391         // use the original connection and get a fresh versionable connection
392         // since conn can be invalidated (or worse, freed) after the failure
393         checkShardVersion(opCtx, conn_in, ns, refManager, true, tryNumber + 1);
394         return true;
395     }
396 
397     string errmsg = str::stream() << "setShardVersion failed shard: " << shard->toString() << " "
398                                   << result;
399     log() << "     " << errmsg;
400     massert(10429, errmsg, 0);
401     return true;
402 }
403 
404 }  // namespace
405 
406 // Global version manager
407 VersionManager versionManager;
408 
resetShardVersionCB(DBClientBase * conn)409 void VersionManager::resetShardVersionCB(DBClientBase* conn) {
410     connectionShardStatus.reset(conn);
411 }
412 
isVersionableCB(DBClientBase * conn)413 bool VersionManager::isVersionableCB(DBClientBase* conn) {
414     // We do not version shard connections when issued from mongod
415     if (!isMongos()) {
416         return false;
417     }
418 
419     return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET;
420 }
421 
checkShardVersionCB(OperationContext * opCtx,DBClientBase * conn_in,const string & ns,bool authoritative,int tryNumber)422 bool VersionManager::checkShardVersionCB(OperationContext* opCtx,
423                                          DBClientBase* conn_in,
424                                          const string& ns,
425                                          bool authoritative,
426                                          int tryNumber) {
427     return checkShardVersion(opCtx, conn_in, ns, nullptr, authoritative, tryNumber);
428 }
429 
checkShardVersionCB(OperationContext * opCtx,ShardConnection * conn_in,bool authoritative,int tryNumber)430 bool VersionManager::checkShardVersionCB(OperationContext* opCtx,
431                                          ShardConnection* conn_in,
432                                          bool authoritative,
433                                          int tryNumber) {
434     return checkShardVersion(
435         opCtx, conn_in->get(), conn_in->getNS(), conn_in->getManager(), authoritative, tryNumber);
436 }
437 
438 }  // namespace mongo
439