1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <string>
34 #include <vector>
35 
36 #include "mongo/base/disallow_copying.h"
37 #include "mongo/bson/oid.h"
38 #include "mongo/db/namespace_string.h"
39 #include "mongo/db/s/active_migrations_registry.h"
40 #include "mongo/db/s/chunk_splitter.h"
41 #include "mongo/db/s/collection_sharding_state.h"
42 #include "mongo/db/s/migration_destination_manager.h"
43 #include "mongo/executor/task_executor.h"
44 #include "mongo/executor/thread_pool_task_executor.h"
45 #include "mongo/stdx/functional.h"
46 #include "mongo/stdx/memory.h"
47 #include "mongo/stdx/mutex.h"
48 
49 namespace mongo {
50 
51 class BSONObj;
52 class BSONObjBuilder;
53 class ConnectionString;
54 class OperationContext;
55 class ServiceContext;
56 class ShardIdentityType;
57 class Status;
58 
59 namespace repl {
60 class OpTime;
61 }  // namespace repl
62 
63 /**
64  * Contains the global sharding state for a running mongod. There is one instance of this object per
65  * service context and it is never destroyed for the lifetime of the context.
66  */
67 class ShardingState {
68     MONGO_DISALLOW_COPYING(ShardingState);
69 
70 public:
71     using GlobalInitFunc =
72         stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>;
73 
74     ShardingState();
75     ~ShardingState();
76 
77     /**
78      * Retrieves the sharding state object associated with the specified service context. This
79      * method must only be called if ShardingState decoration has been created on the service
80      * context, otherwise it will fassert. In other words, it may only be called on MongoD and
81      * tests, which specifically require and instantiate ShardingState.
82      *
83      * Returns the instance's ShardingState.
84      */
85     static ShardingState* get(ServiceContext* serviceContext);
86     static ShardingState* get(OperationContext* operationContext);
87 
88     /**
89      * Returns true if ShardingState has been successfully initialized.
90      *
91      * Code that needs to perform extra actions if sharding is initialized, but does not need to
92      * error if not, should use this. Alternatively, see ShardingState::canAcceptShardedCommands().
93      */
94     bool enabled() const;
95 
96     /**
97      * Force-sets the initialization state to InitializationState::kInitialized, for testing
98      * purposes. Note that this function should ONLY be used for testing purposes.
99      */
100     void setEnabledForTest(const std::string& shardName);
101 
102     /**
103      * Returns Status::OK if the ShardingState is enabled; if not, returns an error describing
104      * whether the ShardingState is just not yet initialized, or if this shard is not running with
105      * --shardsvr at all.
106      *
107      * Code that should error if sharding state has not been initialized should use this to report
108      * a more descriptive error. Alternatively, see ShardingState::enabled().
109      */
110     Status canAcceptShardedCommands() const;
111 
112     ConnectionString getConfigServer(OperationContext* opCtx);
113 
114     std::string getShardName();
115 
migrationDestinationManager()116     MigrationDestinationManager* migrationDestinationManager() {
117         return &_migrationDestManager;
118     }
119 
120     /**
121      * Initializes the sharding state of this server from the shard identity document argument
122      * and sets secondary or primary state information on the catalog cache loader.
123      *
124      * Note: caller must hold a global/database lock! Needed in order to stably check for
125      * replica set state (primary, secondary, standalone).
126      */
127     Status initializeFromShardIdentity(OperationContext* opCtx,
128                                        const ShardIdentityType& shardIdentity);
129 
130     /**
131      * Shuts down sharding machinery on the shard.
132      */
133     void shutDown(OperationContext* opCtx);
134 
135     /**
136      * Updates the ShardRegistry's stored notion of the config server optime based on the
137      * ConfigServerMetadata decoration attached to the OperationContext.
138      */
139     Status updateConfigServerOpTimeFromMetadata(OperationContext* opCtx);
140 
141     ChunkSplitter* getChunkSplitter();
142 
143     /**
144      * Should be invoked when the shard server primary enters the 'PRIMARY' state.
145      * Sets up the ChunkSplitter to begin accepting split requests.
146      */
147     void initiateChunkSplitter();
148 
149     /**
150      * Should be invoked when this node which is currently serving as a 'PRIMARY' steps down.
151      * Sets the state of the ChunkSplitter so that it will no longer accept split requests.
152      */
153     void interruptChunkSplitter();
154 
155     /**
156      * Refreshes the local metadata based on whether the expected version is higher than what we
157      * have cached.
158      */
159     Status onStaleShardVersion(OperationContext* opCtx,
160                                const NamespaceString& nss,
161                                const ChunkVersion& expectedVersion);
162 
163     /**
164      * Refreshes collection metadata by asking the config server for the latest information.
165      * Starts a new config server request.
166      *
167      * Locking Notes:
168      *   + Must NOT be called with the write lock because this call may go into the network,
169      *     and deadlocks may occur with shard-as-a-config.  Therefore, nothing here guarantees
170      *     that 'latestShardVersion' is indeed the current one on return.
171      *
172      *   + Because this call must not be issued with the DBLock held, by the time the config
173      *     server sent us back the collection metadata information, someone else may have
174      *     updated the previously stored collection metadata.  There are cases when one can't
175      *     tell which of updated or loaded metadata are the freshest. There are also cases where
176      *     the data coming from configs do not correspond to a consistent snapshot.
177      *     In these cases, return RemoteChangeDetected. (This usually means this call needs to
178      *     be issued again, at caller discretion)
179      *
180      * @return OK if remote metadata successfully loaded (may or may not have been installed)
181      * @return RemoteChangeDetected if something changed while reloading and we may retry
182      * @return !OK if something else went wrong during reload
183      * @return latestShardVersion the version that is now stored for this collection
184      */
185     Status refreshMetadataNow(OperationContext* opCtx,
186                               const NamespaceString& nss,
187                               ChunkVersion* latestShardVersion);
188 
189     void appendInfo(OperationContext* opCtx, BSONObjBuilder& b);
190 
191     bool needCollectionMetadata(OperationContext* opCtx, const std::string& ns);
192 
193     /**
194      * Updates the config server field of the shardIdentity document with the given connection
195      * string.
196      *
197      * Note: this can return NotMaster error.
198      */
199     Status updateShardIdentityConfigString(OperationContext* opCtx,
200                                            const std::string& newConnectionString);
201 
202     /**
203      * If there are no migrations running on this shard, registers an active migration with the
204      * specified arguments and returns a ScopedRegisterDonateChunk, which must be signaled by the
205      * caller before it goes out of scope.
206      *
207      * If there is an active migration already running on this shard and it has the exact same
208      * arguments, returns a ScopedRegisterDonateChunk, which can be used to join the existing one.
209      *
210      * Othwerwise returns a ConflictingOperationInProgress error.
211      */
212     StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
213 
214     /**
215      * If there are no migrations running on this shard, registers an active receive operation with
216      * the specified session id and returns a ScopedRegisterReceiveChunk, which will unregister it
217      * when it goes out of scope.
218      *
219      * Otherwise returns a ConflictingOperationInProgress error.
220      */
221     StatusWith<ScopedRegisterReceiveChunk> registerReceiveChunk(const NamespaceString& nss,
222                                                                 const ChunkRange& chunkRange,
223                                                                 const ShardId& fromShardId);
224 
225     /**
226      * If a migration has been previously registered through a call to registerDonateChunk returns
227      * that namespace. Otherwise returns boost::none.
228      *
229      * This method can be called without any locks, but once the namespace is fetched it needs to be
230      * re-checked after acquiring some intent lock on that namespace.
231      */
232     boost::optional<NamespaceString> getActiveDonateChunkNss();
233 
234     /**
235      * Get a migration status report from the migration registry. If no migration is active, this
236      * returns an empty BSONObj.
237      *
238      * Takes an IS lock on the namespace of the active migration, if one is active.
239      */
240     BSONObj getActiveMigrationStatusReport(OperationContext* opCtx);
241 
242     /**
243      * For testing only. Mock the initialization method used by initializeFromConfigConnString and
244      * initializeFromShardIdentity after all checks are performed.
245      */
246     void setGlobalInitMethodForTest(GlobalInitFunc func);
247 
248     /**
249      * Schedules for the range to clean of the given namespace to be deleted.
250      * Behavior can be modified through setScheduleCleanupFunctionForTest.
251      */
252     void scheduleCleanup(const NamespaceString& nss);
253 
254     /**
255      * If started with --shardsvr, initializes sharding awareness from the shardIdentity document
256      * on disk, if there is one.
257      * If started with --shardsvr in queryableBackupMode, initializes sharding awareness from the
258      * shardIdentity document passed through the --overrideShardIdentity startup parameter.
259      *
260      * If returns true, the ShardingState::_globalInit method was called, meaning all the core
261      * classes for sharding were initialized, but no networking calls were made yet (with the
262      * exception of the duplicate ShardRegistry reload in ShardRegistry::startup() (see
263      * SERVER-26123). Outgoing networking calls to cluster members can now be made.
264      *
265      * Note: this function briefly takes the global lock to determine primary/secondary state.
266      */
267     StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx);
268 
269     /**
270      * Return the task executor to be shared by the range deleters for all collections.
271      */
272     executor::TaskExecutor* getRangeDeleterTaskExecutor();
273 
274 private:
275     // Progress of the sharding state initialization
276     enum class InitializationState : uint32_t {
277         // Initial state. The server must be under exclusive lock when this state is entered. No
278         // metadata is available yet and it is not known whether there is any min optime metadata,
279         // which needs to be recovered. From this state, the server may enter INITIALIZING, if a
280         // recovey document is found or stay in it until initialize has been called.
281         kNew,
282 
283         // Sharding state is fully usable.
284         kInitialized,
285 
286         // Some initialization error occurred. The _initializationStatus variable will contain the
287         // error.
288         kError,
289     };
290 
291     /**
292      * Returns the initialization state.
293      */
294     InitializationState _getInitializationState() const;
295 
296     /**
297      * Updates the initialization state.
298      */
299     void _setInitializationState(InitializationState newState);
300 
301     /**
302      * Refreshes collection metadata by asking the config server for the latest information and
303      * returns the latest version at the time the reload was done. This call does network I/O and
304      * should never be called with a lock.
305      */
306     ChunkVersion _refreshMetadata(OperationContext* opCtx, const NamespaceString& nss);
307 
308     // Manages the state of the migration recipient shard
309     MigrationDestinationManager _migrationDestManager;
310 
311     // Tracks the active move chunk operations running on this shard
312     ActiveMigrationsRegistry _activeMigrationsRegistry;
313 
314     // Handles asynchronous auto-splitting of chunks
315     std::unique_ptr<ChunkSplitter> _chunkSplitter;
316 
317     // Protects state below
318     stdx::mutex _mutex;
319 
320     // State of the initialization of the sharding state along with any potential errors
321     AtomicUInt32 _initializationState;
322 
323     // Only valid if _initializationState is kError. Contains the reason for initialization failure.
324     Status _initializationStatus;
325 
326     // Signaled when ::initialize finishes.
327     stdx::condition_variable _initializationFinishedCondition;
328 
329     // Sets the shard name for this host (comes through setShardVersion)
330     std::string _shardName;
331 
332     // The id for the cluster this shard belongs to.
333     OID _clusterId;
334 
335     // Function for initializing the external sharding state components not owned here.
336     GlobalInitFunc _globalInit;
337 
338     // Task executor shared by the collection range deleters.
339     struct RangeDeleterExecutor {
340         stdx::mutex lock{};
341         std::unique_ptr<executor::TaskExecutor> taskExecutor{nullptr};
342         ~RangeDeleterExecutor();
343     };
344     RangeDeleterExecutor _rangeDeleterExecutor;
345 };
346 
347 }  // namespace mongo
348