1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/auth/action_set.h"
36 #include "mongo/db/auth/action_type.h"
37 #include "mongo/db/auth/authorization_manager.h"
38 #include "mongo/db/auth/authorization_session.h"
39 #include "mongo/db/auth/privilege.h"
40 #include "mongo/db/catalog/catalog_raii.h"
41 #include "mongo/db/client.h"
42 #include "mongo/db/commands.h"
43 #include "mongo/db/lasterror.h"
44 #include "mongo/db/operation_context.h"
45 #include "mongo/db/repl/replication_coordinator.h"
46 #include "mongo/db/s/collection_sharding_state.h"
47 #include "mongo/db/s/migration_source_manager.h"
48 #include "mongo/db/s/sharded_connection_info.h"
49 #include "mongo/db/s/sharding_state.h"
50 #include "mongo/s/chunk_version.h"
51 #include "mongo/s/client/shard_registry.h"
52 #include "mongo/s/grid.h"
53 #include "mongo/util/log.h"
54 #include "mongo/util/stringutils.h"
55 
56 namespace mongo {
57 
58 using std::string;
59 using str::stream;
60 
61 namespace {
62 
63 class SetShardVersion : public ErrmsgCommandDeprecated {
64 public:
SetShardVersion()65     SetShardVersion() : ErrmsgCommandDeprecated("setShardVersion") {}
66 
help(std::stringstream & help) const67     void help(std::stringstream& help) const override {
68         help << "internal";
69     }
70 
adminOnly() const71     bool adminOnly() const override {
72         return true;
73     }
74 
slaveOk() const75     bool slaveOk() const override {
76         return true;
77     }
78 
supportsWriteConcern(const BSONObj & cmd) const79     virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
80         return false;
81     }
82 
addRequiredPrivileges(const std::string & dbname,const BSONObj & cmdObj,std::vector<Privilege> * out)83     void addRequiredPrivileges(const std::string& dbname,
84                                const BSONObj& cmdObj,
85                                std::vector<Privilege>* out) override {
86         ActionSet actions;
87         actions.addAction(ActionType::internal);
88         out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
89     }
90 
errmsgRun(OperationContext * opCtx,const std::string &,const BSONObj & cmdObj,string & errmsg,BSONObjBuilder & result)91     bool errmsgRun(OperationContext* opCtx,
92                    const std::string&,
93                    const BSONObj& cmdObj,
94                    string& errmsg,
95                    BSONObjBuilder& result) {
96         uassert(ErrorCodes::IllegalOperation,
97                 "can't issue setShardVersion from 'eval'",
98                 !opCtx->getClient()->isInDirectClient());
99 
100         auto const shardingState = ShardingState::get(opCtx);
101         uassertStatusOK(shardingState->canAcceptShardedCommands());
102 
103         // Steps
104         // 1. As long as the command does not have noConnectionVersioning set, register a
105         //    ShardedConnectionInfo for this client connection (this is for clients using
106         //    ShardConnection). Registering the ShardedConnectionInfo guarantees that we will check
107         //    the shardVersion on all requests from this client connection. The connection's version
108         //    will be updated on each subsequent setShardVersion sent on this connection.
109         //
110         // 2. If we have received the init form of setShardVersion, vacuously return true.
111         //    The init form of setShardVersion was used to initialize sharding awareness on a shard,
112         //    but was made obsolete in v3.4 by making nodes sharding-aware when they are added to a
113         //    cluster. The init form was kept in v3.4 shards for compatibility with mixed-version
114         //    3.2/3.4 clusters, but was deprecated and made to vacuously return true in v3.6.
115         //
116         // 3. Validate all command parameters against the info in our ShardingState, and return an
117         //    error if they do not match.
118         //
119         // 4. If the sent shardVersion is compatible with our shardVersion, update the shardVersion
120         //    in this client's ShardedConnectionInfo if needed.
121         //
122         // 5. If the sent shardVersion indicates a drop, jump to step 7.
123         //
124         // 6. If the sent shardVersion is staler than ours, return a stale config error.
125         //
126         // 7. If the sent shardVersion is newer than ours (or indicates a drop), reload our metadata
127         //    and compare the sent shardVersion with what we reloaded. If the versions are now
128         //    compatible, update the shardVersion in this client's ShardedConnectionInfo, as in
129         //    step 4. If the sent shardVersion is staler than what we reloaded, return a stale
130         //    config error, as in step 6.
131 
132         // Step 1
133 
134         Client* client = opCtx->getClient();
135         LastError::get(client).disable();
136 
137         const bool authoritative = cmdObj.getBoolField("authoritative");
138         const bool noConnectionVersioning = cmdObj.getBoolField("noConnectionVersioning");
139 
140         ShardedConnectionInfo dummyInfo;
141         ShardedConnectionInfo* info;
142         if (noConnectionVersioning) {
143             info = &dummyInfo;
144         } else {
145             info = ShardedConnectionInfo::get(client, true);
146         }
147 
148         // Step 2
149 
150         // The init form of setShardVersion was deprecated in v3.6. For backwards compatibility with
151         // pre-v3.6 mongos, return true.
152         const auto isInit = cmdObj["init"].trueValue();
153         if (isInit) {
154             result.append("initialized", true);
155             return true;
156         }
157 
158         // Step 3
159 
160         // Validate shardName parameter.
161         const auto shardName = cmdObj["shard"].str();
162         const auto storedShardName = ShardingState::get(opCtx)->getShardName();
163         uassert(ErrorCodes::BadValue,
164                 str::stream() << "received shardName " << shardName
165                               << " which differs from stored shardName "
166                               << storedShardName,
167                 storedShardName == shardName);
168 
169         // Validate config connection string parameter.
170         const auto configdb = cmdObj["configdb"].String();
171         uassert(ErrorCodes::BadValue,
172                 "Config server connection string cannot be empty",
173                 !configdb.empty());
174 
175         const auto givenConnStr = uassertStatusOK(ConnectionString::parse(configdb));
176         uassert(ErrorCodes::InvalidOptions,
177                 str::stream() << "Given config server string " << givenConnStr.toString()
178                               << " is not of type SET",
179                 givenConnStr.type() == ConnectionString::SET);
180 
181         const auto storedConnStr = ShardingState::get(opCtx)->getConfigServer(opCtx);
182         uassert(ErrorCodes::IllegalOperation,
183                 str::stream() << "Given config server set name: " << givenConnStr.getSetName()
184                               << " differs from known set name: "
185                               << storedConnStr.getSetName(),
186                 givenConnStr.getSetName() == storedConnStr.getSetName());
187 
188         // Validate namespace parameter.
189         const NamespaceString nss(cmdObj["setShardVersion"].String());
190         uassert(ErrorCodes::InvalidNamespace,
191                 str::stream() << "Invalid namespace " << nss.ns(),
192                 nss.isValid());
193 
194         // Validate chunk version parameter.
195         const ChunkVersion requestedVersion =
196             uassertStatusOK(ChunkVersion::parseFromBSONForSetShardVersion(cmdObj));
197 
198         // Step 4
199 
200         const ChunkVersion connectionVersion = info->getVersion(nss.ns());
201         connectionVersion.addToBSON(result, "oldVersion");
202 
203         {
204             boost::optional<AutoGetDb> autoDb;
205             autoDb.emplace(opCtx, nss.db(), MODE_IS);
206 
207             // Slave nodes cannot support set shard version
208             uassert(ErrorCodes::NotMaster,
209                     str::stream() << "setShardVersion with collection version is only supported "
210                                      "against primary nodes, but it was received for namespace "
211                                   << nss.ns(),
212                     repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx,
213                                                                                          nss.db()));
214 
215             // Views do not require a shard version check.
216             if (autoDb->getDb() && !autoDb->getDb()->getCollection(opCtx, nss) &&
217                 autoDb->getDb()->getViewCatalog()->lookup(opCtx, nss.ns())) {
218                 return true;
219             }
220 
221             boost::optional<Lock::CollectionLock> collLock;
222             collLock.emplace(opCtx->lockState(), nss.ns(), MODE_IS);
223 
224             auto css = CollectionShardingState::get(opCtx, nss);
225             const ChunkVersion collectionShardVersion =
226                 (css->getMetadata() ? css->getMetadata()->getShardVersion()
227                                     : ChunkVersion::UNSHARDED());
228 
229             if (requestedVersion.isWriteCompatibleWith(collectionShardVersion)) {
230                 // mongos and mongod agree!
231                 // Now we should update the connection's version if it's not compatible with the
232                 // request's version. This could happen if the shard's metadata has changed, but
233                 // the remote client has already refreshed its view of the metadata since the last
234                 // time it sent anything over this connection.
235                 if (!connectionVersion.isWriteCompatibleWith(requestedVersion)) {
236                     // A migration occurred.
237                     if (connectionVersion < collectionShardVersion &&
238                         connectionVersion.epoch() == collectionShardVersion.epoch()) {
239                         info->setVersion(nss.ns(), requestedVersion);
240                     }
241                     // The collection was dropped and recreated.
242                     else if (authoritative) {
243                         info->setVersion(nss.ns(), requestedVersion);
244                     } else {
245                         result.append("ns", nss.ns());
246                         result.appendBool("need_authoritative", true);
247                         errmsg = str::stream() << "verifying drop on '" << nss.ns() << "'";
248                         return false;
249                     }
250                 }
251 
252                 return true;
253             }
254 
255             // Step 5
256 
257             const bool isDropRequested =
258                 !requestedVersion.isSet() && collectionShardVersion.isSet();
259 
260             if (isDropRequested) {
261                 if (!authoritative) {
262                     result.appendBool("need_authoritative", true);
263                     result.append("ns", nss.ns());
264                     collectionShardVersion.addToBSON(result, "globalVersion");
265                     errmsg = "dropping needs to be authoritative";
266                     return false;
267                 }
268 
269                 // Fall through to metadata reload below
270             } else {
271                 // Not Dropping
272 
273                 // Step 6
274 
275                 // TODO: Refactor all of this
276                 if (requestedVersion < connectionVersion &&
277                     requestedVersion.epoch() == connectionVersion.epoch()) {
278                     errmsg = str::stream() << "this connection already had a newer version "
279                                            << "of collection '" << nss.ns() << "'";
280                     result.append("ns", nss.ns());
281                     requestedVersion.addToBSON(result, "newVersion");
282                     collectionShardVersion.addToBSON(result, "globalVersion");
283                     return false;
284                 }
285 
286                 // TODO: Refactor all of this
287                 if (requestedVersion < collectionShardVersion &&
288                     requestedVersion.epoch() == collectionShardVersion.epoch()) {
289                     if (css->getMigrationSourceManager()) {
290                         auto critSecSignal =
291                             css->getMigrationSourceManager()->getMigrationCriticalSectionSignal(
292                                 false);
293                         if (critSecSignal) {
294                             collLock.reset();
295                             autoDb.reset();
296                             log() << "waiting till out of critical section";
297                             critSecSignal->waitFor(opCtx, Seconds(10));
298                         }
299                     }
300 
301                     errmsg = str::stream() << "shard global version for collection is higher "
302                                            << "than trying to set to '" << nss.ns() << "'";
303                     result.append("ns", nss.ns());
304                     requestedVersion.addToBSON(result, "version");
305                     collectionShardVersion.addToBSON(result, "globalVersion");
306                     result.appendBool("reloadConfig", true);
307                     return false;
308                 }
309 
310                 if (!collectionShardVersion.isSet() && !authoritative) {
311                     // Needed b/c when the last chunk is moved off a shard, the version gets reset
312                     // to zero, which should require a reload.
313                     if (css->getMigrationSourceManager()) {
314                         auto critSecSignal =
315                             css->getMigrationSourceManager()->getMigrationCriticalSectionSignal(
316                                 false);
317                         if (critSecSignal) {
318                             collLock.reset();
319                             autoDb.reset();
320                             log() << "waiting till out of critical section";
321                             critSecSignal->waitFor(opCtx, Seconds(10));
322                         }
323                     }
324 
325                     // need authoritative for first look
326                     result.append("ns", nss.ns());
327                     result.appendBool("need_authoritative", true);
328                     errmsg = str::stream() << "first time for collection '" << nss.ns() << "'";
329                     return false;
330                 }
331 
332                 // Fall through to metadata reload below
333             }
334         }
335 
336         // Step 7
337 
338         Status status = shardingState->onStaleShardVersion(opCtx, nss, requestedVersion);
339 
340         {
341             AutoGetCollection autoColl(opCtx, nss, MODE_IS);
342 
343             ChunkVersion currVersion = ChunkVersion::UNSHARDED();
344             auto collMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata();
345             if (collMetadata) {
346                 currVersion = collMetadata->getShardVersion();
347             }
348 
349             if (!status.isOK()) {
350                 // The reload itself was interrupted or confused here
351 
352                 errmsg = str::stream() << "could not refresh metadata for " << nss.ns()
353                                        << " with requested shard version "
354                                        << requestedVersion.toString()
355                                        << ", stored shard version is " << currVersion.toString()
356                                        << causedBy(redact(status));
357 
358                 warning() << errmsg;
359 
360                 result.append("ns", nss.ns());
361                 requestedVersion.addToBSON(result, "version");
362                 currVersion.addToBSON(result, "globalVersion");
363                 result.appendBool("reloadConfig", true);
364 
365                 return false;
366             } else if (!requestedVersion.isWriteCompatibleWith(currVersion)) {
367                 // We reloaded a version that doesn't match the version mongos was trying to
368                 // set.
369                 errmsg = str::stream() << "requested shard version differs from"
370                                        << " config shard version for " << nss.ns()
371                                        << ", requested version is " << requestedVersion.toString()
372                                        << " but found version " << currVersion.toString();
373 
374                 OCCASIONALLY warning() << errmsg;
375 
376                 // WARNING: the exact fields below are important for compatibility with mongos
377                 // version reload.
378 
379                 result.append("ns", nss.ns());
380                 currVersion.addToBSON(result, "globalVersion");
381 
382                 // If this was a reset of a collection or the last chunk moved out, inform mongos to
383                 // do a full reload.
384                 if (currVersion.epoch() != requestedVersion.epoch() || !currVersion.isSet()) {
385                     result.appendBool("reloadConfig", true);
386                     // Zero-version also needed to trigger full mongos reload, sadly
387                     // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
388                     ChunkVersion(0, 0, OID()).addToBSON(result, "version");
389                     // For debugging
390                     requestedVersion.addToBSON(result, "origVersion");
391                 } else {
392                     requestedVersion.addToBSON(result, "version");
393                 }
394 
395                 return false;
396             }
397         }
398 
399         info->setVersion(nss.ns(), requestedVersion);
400         return true;
401     }
402 
403 } setShardVersionCmd;
404 
405 }  // namespace
406 }  // namespace mongo
407