1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include "mongo/base/disallow_copying.h"
34 #include "mongo/db/s/balancer/balancer_chunk_selection_policy.h"
35 #include "mongo/db/s/balancer/balancer_random.h"
36 #include "mongo/db/s/balancer/migration_manager.h"
37 #include "mongo/stdx/condition_variable.h"
38 #include "mongo/stdx/mutex.h"
39 #include "mongo/stdx/thread.h"
40 
41 namespace mongo {
42 
43 class ChunkType;
44 class ClusterStatistics;
45 class MigrationSecondaryThrottleOptions;
46 class OperationContext;
47 class ServiceContext;
48 class Status;
49 
50 /**
51  * The balancer is a background task that tries to keep the number of chunks across all
52  * servers of the cluster even.
53  *
54  * The balancer does act continuously but in "rounds". At a given round, it would decide if
55  * there is an imbalance by checking the difference in chunks between the most and least
56  * loaded shards. It would issue a request for a chunk migration per round, if it found so.
57  */
58 class Balancer {
59     MONGO_DISALLOW_COPYING(Balancer);
60 
61 public:
62     Balancer(ServiceContext* serviceContext);
63     ~Balancer();
64 
65     /**
66      * Instantiates an instance of the balancer and installs it on the specified service context.
67      * This method is not thread-safe and must be called only once when the service is starting.
68      */
69     static void create(ServiceContext* serviceContext);
70 
71     /**
72      * Retrieves the per-service instance of the Balancer.
73      */
74     static Balancer* get(ServiceContext* serviceContext);
75     static Balancer* get(OperationContext* operationContext);
76 
77     /**
78      * Invoked when the config server primary enters the 'PRIMARY' state and is invoked while the
79      * caller is holding the global X lock. Kicks off the main balancer thread and returns
80      * immediately. Auto-balancing (if enabled) should commence shortly, and manual migrations will
81      * be processed and run.
82      *
83      * Must only be called if the balancer is in the stopped state (i.e., just constructed or
84      * waitForBalancerToStop has been called before). Any code in this call must not try to acquire
85      * any locks or to wait on operations, which acquire locks.
86      */
87     void initiateBalancer(OperationContext* opCtx);
88 
89     /**
90      * Invoked when this node which is currently serving as a 'PRIMARY' steps down and is invoked
91      * while the global X lock is held. Requests the main balancer thread to stop and returns
92      * immediately without waiting for it to terminate. Once the balancer has stopped, manual
93      * migrations will be rejected.
94      *
95      * This method might be called multiple times in succession, which is what happens as a result
96      * of incomplete transition to primary so it is resilient to that.
97      *
98      * The waitForBalancerToStop method must be called afterwards in order to wait for the main
99      * balancer thread to terminate and to allow initiateBalancer to be called again.
100      */
101     void interruptBalancer();
102 
103     /**
104      * Invoked when a node on its way to becoming a primary finishes draining and is about to
105      * acquire the global X lock in order to allow writes. Waits for the balancer thread to
106      * terminate and primes the balancer so that initiateBalancer can be called.
107      *
108      * This must not be called while holding any locks!
109      */
110     void waitForBalancerToStop();
111 
112     /**
113      * Potentially blocking method, which will return immediately if the balancer is not running a
114      * balancer round and will block until the current round completes otherwise. If the operation
115      * context's deadline is exceeded, it will throw an ExceededTimeLimit exception.
116      */
117     void joinCurrentRound(OperationContext* opCtx);
118 
119     /**
120      * Blocking call, which requests the balancer to move a single chunk to a more appropriate
121      * shard, in accordance with the active balancer policy. It is not guaranteed that the chunk
122      * will actually move because it may already be at the best shard. An error will be returned if
123      * the attempt to find a better shard or the actual migration fail for any reason.
124      */
125     Status rebalanceSingleChunk(OperationContext* opCtx, const ChunkType& chunk);
126 
127     /**
128      * Blocking call, which requests the balancer to move a single chunk to the specified location
129      * in accordance with the active balancer policy. An error will be returned if the attempt to
130      * move fails for any reason.
131      *
132      * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the
133      *       move regardless. If should be used only for user-initiated moves.
134      */
135     Status moveSingleChunk(OperationContext* opCtx,
136                            const ChunkType& chunk,
137                            const ShardId& newShardId,
138                            uint64_t maxChunkSizeBytes,
139                            const MigrationSecondaryThrottleOptions& secondaryThrottle,
140                            bool waitForDelete);
141 
142     /**
143      * Appends the runtime state of the balancer instance to the specified builder.
144      */
145     void report(OperationContext* opCtx, BSONObjBuilder* builder);
146 
147 private:
148     /**
149      * Possible runtime states of the balancer. The comments indicate the allowed next state.
150      */
151     enum State {
152         kStopped,   // kRunning
153         kRunning,   // kStopping
154         kStopping,  // kStopped
155     };
156 
157     /**
158      * The main balancer loop, which runs in a separate thread.
159      */
160     void _mainThread();
161 
162     /**
163      * Checks whether the balancer main thread has been requested to stop.
164      */
165     bool _stopRequested();
166 
167     /**
168      * Signals the beginning and end of a balancing round.
169      */
170     void _beginRound(OperationContext* opCtx);
171     void _endRound(OperationContext* opCtx, Milliseconds waitTimeout);
172 
173     /**
174      * Blocks the caller for the specified timeout or until the balancer condition variable is
175      * signaled, whichever comes first.
176      */
177     void _sleepFor(OperationContext* opCtx, Milliseconds waitTimeout);
178 
179     /**
180      * Returns true if all the servers listed in configdb as being shards are reachable and are
181      * distinct processes (no hostname mixup).
182      */
183     bool _checkOIDs(OperationContext* opCtx);
184 
185     /**
186      * Iterates through all chunks in all collections. If the collection is the sessions collection,
187      * checks if the number of chunks is greater than or equal to the configured minimum number of
188      * chunks for the sessions collection (minNumChunksForSessionsCollection). If it isn't,
189      * calculates split points that evenly partition the key space into N ranges (where N is
190      * minNumChunksForSessionsCollection rounded up the next power of 2), and splits any chunks that
191      * straddle those split points. If the collection is any other collection, splits any chunks
192      * that straddle tag boundaries.
193      */
194     Status _splitChunksIfNeeded(OperationContext* opCtx);
195 
196     /**
197      * Schedules migrations for the specified set of chunks and returns how many chunks were
198      * successfully processed.
199      */
200     int _moveChunks(OperationContext* opCtx,
201                     const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks);
202 
203     /**
204      * Performs a split on the chunk with min value "minKey". If the split fails, it is marked as
205      * jumbo.
206      */
207     void _splitOrMarkJumbo(OperationContext* opCtx,
208                            const NamespaceString& nss,
209                            const BSONObj& minKey);
210 
211     // Protects the state below
212     stdx::mutex _mutex;
213 
214     // Indicates the current state of the balancer
215     State _state{kStopped};
216 
217     // The main balancer thread
218     stdx::thread _thread;
219 
220     // The operation context of the main balancer thread. This value may only be available in the
221     // kRunning state and is used to force interrupt of any blocking calls made by the balancer
222     // thread.
223     OperationContext* _threadOperationContext{nullptr};
224 
225     // This thread is only available in the kStopping state and is necessary for the migration
226     // manager shutdown to not deadlock with replica set step down. In particular, the migration
227     // manager's order of lock acquisition is mutex, then collection lock, whereas stepdown first
228     // acquires the global S lock and then acquires the migration manager's mutex.
229     //
230     // The interrupt thread is scheduled when the balancer enters the kStopping state (which is at
231     // step down) and is joined outside of lock, when the replica set leaves draining mode, outside
232     // of the global X lock.
233     stdx::thread _migrationManagerInterruptThread;
234 
235     // Indicates whether the balancer is currently executing a balancer round
236     bool _inBalancerRound{false};
237 
238     // Counts the number of balancing rounds performed since the balancer thread was first activated
239     int64_t _numBalancerRounds{0};
240 
241     // Condition variable, which is signalled every time the above runtime state of the balancer
242     // changes (in particular, state/balancer round and number of balancer rounds).
243     stdx::condition_variable _condVar;
244 
245     // Number of moved chunks in last round
246     int _balancedLastTime;
247 
248     // Source of randomness when metadata needs to be randomized.
249     BalancerRandomSource _random;
250 
251     // Source for cluster statistics. Depends on the source of randomness above so it should be
252     // created after it and destroyed before it.
253     std::unique_ptr<ClusterStatistics> _clusterStats;
254 
255     // Balancer policy. Depends on the cluster statistics instance and source of randomness above so
256     // it should be created after them and destroyed before them.
257     std::unique_ptr<BalancerChunkSelectionPolicy> _chunkSelectionPolicy;
258 
259     // Migration manager used to schedule and manage migrations
260     MigrationManager _migrationManager;
261 };
262 
263 }  // namespace mongo
264