1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef ClusterMgr_H
26 #define ClusterMgr_H
27
28 #include <ndb_limits.h>
29 #include <NdbThread.h>
30 #include <NdbMutex.h>
31 #include <NdbCondition.h>
32 #include <signaldata/ArbitSignalData.hpp>
33 #include <signaldata/NodeStateSignalData.hpp>
34 #include "trp_client.hpp"
35 #include "trp_node.hpp"
36 #include <signaldata/DisconnectRep.hpp>
37
38 extern "C" void* runClusterMgr_C(void * me);
39
40
41 /**
42 @class ClusterMgr
43 This class runs a heartbeat protocol between nodes, to detect if remote
44 nodes are reachable or not. This protocol is needed because the underlying
45 transporter connection may need a long time (or even forever) to detect
46 node or network failure. (TCP typically gives up retransmission after about
47 20 minutes).
48 Therefore API_REGREQ signal are sent on regular intervals. If more than
49 three signals are unanswered (by API_REGCONF) the node is presumed dead or
50 unreachable, and the transporter is disconnected.
51 This class handles heart beat between the following types of node pairs:
52 API-DB, MGMD-DB and MGMD-MGMD, where DB means data node. There is another
53 heart beat mechanism between pairs of data nodes, using the CM_HEARTBEAT
54 signal.
55 */
56 class ClusterMgr : public trp_client
57 {
58 friend class TransporterFacade;
59 friend class ArbitMgr;
60 friend void* runClusterMgr_C(void * me);
61 public:
62 ClusterMgr(class TransporterFacade &);
63 virtual ~ClusterMgr();
64 void configure(Uint32 nodeId, const ndb_mgm_configuration* config);
65
66 void reportConnected(NodeId nodeId);
67 void reportDisconnected(NodeId nodeId);
68
69 void doStop();
70 void startThread();
71
72 /**
73 * This method isn't used by the NDB code, it can be used by an API
74 * user through a public method on TransporterFacade if he wants to
75 * force the API node to use a different heartbeat interval than the
76 * one decided by the data node.
77 *
78 * The variable isn't protected and there is no need for it to be.
79 */
set_max_api_reg_req_interval(unsigned int millisec)80 void set_max_api_reg_req_interval(unsigned int millisec) {
81 m_max_api_reg_req_interval = millisec;
82 }
83
lock()84 void lock() { NdbMutex_Lock(clusterMgrThreadMutex); trp_client::lock(); }
unlock()85 void unlock() { trp_client::unlock();NdbMutex_Unlock(clusterMgrThreadMutex); }
86
87 private:
88 // 100ms is the smallest heart beat interval supported.
89 static const Uint32 minHeartBeatInterval = 100;
90
91 void startup();
92 void threadMain();
93
94 int theStop;
95 /**
96 * We could end up in a situation where signals are delayed for more
97 * than 100 ms, either due to slow operation or due to that we're
98 * closing the TransporterFacade object. To avoid sending more than
99 * signal to ourself in these cases we add this boolean variable to
100 * indicate if we already sent a signal to ourself, this signal will
101 * eventually arrive since it's a local signal within the same process.
102 */
103 bool m_sent_API_REGREQ_to_myself;
104 class TransporterFacade & theFacade;
105 class ArbitMgr * theArbitMgr;
106
107 enum Cluster_state {
108 CS_waiting_for_clean_cache = 0,
109 CS_waiting_for_first_connect,
110 CS_connected
111 };
112
113 public:
114 /**
115 * The node state is protected for updates by ClusterMgrThreadMutex.
116 * One can call hb_received and set hbMissed to 0 though without
117 * protection since this is safe. All other uses of hbFrequency,
118 * hbCounter and hbMissed is internal to ClusterMgr and done with
119 * protection of ClusterMgrThreadMutex.
120 *
121 * The node data is often read without protection as a way to decide
122 * which node to communicate to. If the information read is old it
123 * will mean a non-optimal decision is taken, but no specific error
124 * will be the result of reading stale node info data.
125 *
126 * getNoOfConnectedNodes is only used by a test program, so is essentially
127 * also a private method.
128 */
129 struct Node : public trp_node
130 {
131 Node();
132
133 /**
134 * Heartbeat stuff
135 */
136 Uint32 hbFrequency; // Heartbeat frequence
137 Uint32 hbCounter; // # milliseconds passed since last hb sent
138 Uint32 hbMissed; // # missed heartbeats
139 };
140
141 const trp_node & getNodeInfo(NodeId) const;
142 Uint32 getNoOfConnectedNodes() const;
143 void hb_received(NodeId);
144
145 /**
146 * This variable isn't protected, it's used when the last node disconnects to
147 * ensure that the ClusterMgr stops and doesn't perform any reconnects.
148 */
149 int m_auto_reconnect;
150 Uint32 m_connect_count;
151 private:
152 Uint32 m_max_api_reg_req_interval;
153 Uint32 noOfAliveNodes;
154 Uint32 noOfConnectedNodes;
155 Uint32 noOfConnectedDBNodes;
156 Uint32 minDbVersion;
157 Node theNodes[MAX_NODES];
158 NdbThread* theClusterMgrThread;
159
160 NdbCondition* waitForHBCond;
161
162 enum Cluster_state m_cluster_state;
163 /**
164 * We use the trp_client lock to protect the variables inside of the
165 * ClusterMgr. We use the clusterMgrThreadMutex to control start of
166 * the ClusterMgr main thread. It also protects the theStop variable
167 * against concurrent usage. Finally we need to use the clusterMgrThreadMutex
168 * to protect against concurrent close of trp_client and call of
169 * do_poll.
170 */
171 NdbMutex* clusterMgrThreadMutex;
172
173 /**
174 The rate (in milliseconds) at which this node expects to receive
175 API_REGREQ heartbeat messages.
176 */
177 Uint32 m_hbFrequency;
178
179 /**
180 * The maximal time between connection attempts to data nodes.
181 * start_connect_backoff_max_time is used before connection
182 * to the first data node has succeeded.
183 */
184 Uint32 start_connect_backoff_max_time;
185 Uint32 connect_backoff_max_time;
186
187 /**
188 * Signals received
189 */
190 void execAPI_REGREQ (const Uint32 * theData);
191 void execAPI_REGCONF (const NdbApiSignal*, const LinearSectionPtr ptr[]);
192 void execAPI_REGREF (const Uint32 * theData);
193 void execCONNECT_REP (const NdbApiSignal*, const LinearSectionPtr ptr[]);
194 void execDISCONNECT_REP(const NdbApiSignal*, const LinearSectionPtr ptr[]);
195 void execNODE_FAILREP (const NdbApiSignal*, const LinearSectionPtr ptr[]);
196 void execNF_COMPLETEREP(const NdbApiSignal*, const LinearSectionPtr ptr[]);
197
198 void check_wait_for_hb(NodeId nodeId);
199
200 bool is_cluster_completely_unavailable();
set_node_alive(trp_node & node,bool alive)201 inline void set_node_alive(trp_node& node, bool alive){
202
203 // Only DB nodes can be "alive"
204 assert(!alive ||
205 (alive && node.m_info.getType() == NodeInfo::DB));
206
207 if(node.m_alive && !alive)
208 {
209 assert(noOfAliveNodes);
210 noOfAliveNodes--;
211 }
212 else if(!node.m_alive && alive)
213 {
214 noOfAliveNodes++;
215 }
216 node.m_alive = alive;
217 }
218
219 void set_node_dead(trp_node&);
220
221 void print_nodes(const char* where, NdbOut& out = ndbout);
222 void recalcMinDbVersion();
223
224 public:
225 /**
226 * trp_client interface
227 *
228 * This method is called from do_poll which is called from the ClusterMgr
229 * main thread, we keep the clusterMgrThreadMutex when calling this method,
230 * so all signal methods are protected.
231 */
232 virtual void trp_deliver_signal(const NdbApiSignal*,
233 const LinearSectionPtr p[3]);
234 };
235
236 inline
237 const trp_node &
getNodeInfo(NodeId nodeId) const238 ClusterMgr::getNodeInfo(NodeId nodeId) const {
239 // Check array bounds
240 assert(nodeId < MAX_NODES);
241 return theNodes[nodeId];
242 }
243
244 inline
245 Uint32
getNoOfConnectedNodes() const246 ClusterMgr::getNoOfConnectedNodes() const {
247 return noOfConnectedNodes;
248 }
249
250 inline
251 void
hb_received(NodeId nodeId)252 ClusterMgr::hb_received(NodeId nodeId) {
253 // Check array bounds + don't allow node 0 to be touched
254 assert(nodeId > 0 && nodeId < MAX_NODES);
255 theNodes[nodeId].hbMissed = 0;
256 }
257
258 /*****************************************************************************/
259
260 /**
261 * @class ArbitMgr
262 * Arbitration manager. Runs in separate thread.
263 * Started only by a request from the kernel.
264 */
265
266 extern "C" void* runArbitMgr_C(void* me);
267
268 class ArbitMgr
269 {
270 public:
271 ArbitMgr(class ClusterMgr &);
272 ~ArbitMgr();
273
setRank(unsigned n)274 inline void setRank(unsigned n) { theRank = n; }
setDelay(unsigned n)275 inline void setDelay(unsigned n) { theDelay = n; }
276
277 void doStart(const Uint32* theData);
278 void doChoose(const Uint32* theData);
279 void doStop(const Uint32* theData);
280
281 friend void* runArbitMgr_C(void* me);
282
283 private:
284 class ClusterMgr & m_clusterMgr;
285 unsigned theRank;
286 unsigned theDelay;
287
288 void threadMain();
289 NdbThread* theThread;
290 NdbMutex* theThreadMutex; // not really needed
291
292 struct ArbitSignal {
293 GlobalSignalNumber gsn;
294 ArbitSignalData data;
295 NDB_TICKS startticks;
296
ArbitSignalArbitMgr::ArbitSignal297 ArbitSignal() {}
298
initArbitMgr::ArbitSignal299 inline void init(GlobalSignalNumber aGsn, const Uint32* aData) {
300 gsn = aGsn;
301 if (aData != NULL)
302 memcpy(&data, aData, sizeof(data));
303 else
304 memset(&data, 0, sizeof(data));
305 }
306
setTimestampArbitMgr::ArbitSignal307 inline void setTimestamp() {
308 startticks = NdbTick_getCurrentTicks();
309 }
310
getTimediffArbitMgr::ArbitSignal311 inline Uint64 getTimediff() {
312 const NDB_TICKS now = NdbTick_getCurrentTicks();
313 return NdbTick_Elapsed(startticks,now).milliSec();
314 }
315 };
316
317 NdbMutex* theInputMutex;
318 NdbCondition* theInputCond;
319 int theInputTimeout;
320 bool theInputFull; // the predicate
321 ArbitSignal theInputBuffer; // shared buffer
322
323 void sendSignalToThread(ArbitSignal& aSignal);
324
325 enum State { // thread states
326 StateInit,
327 StateStarted, // thread started
328 StateChoose1, // received one valid REQ
329 StateChoose2, // received two valid REQs
330 StateFinished // finished one way or other
331 };
332 State theState;
333
334 enum Stop { // stop code in ArbitSignal.data.code
335 StopExit = 1, // at API exit
336 StopRequest = 2, // request from kernel
337 StopRestart = 3 // stop before restart
338 };
339
340 void threadStart(ArbitSignal& aSignal); // handle thread events
341 void threadChoose(ArbitSignal& aSignal);
342 void threadTimeout();
343 void threadStop(ArbitSignal& aSignal);
344
345 ArbitSignal theStartReq;
346 ArbitSignal theChooseReq1;
347 ArbitSignal theChooseReq2;
348 ArbitSignal theStopOrd;
349
350 void sendStartConf(ArbitSignal& aSignal, Uint32);
351 void sendChooseRef(ArbitSignal& aSignal, Uint32);
352 void sendChooseConf(ArbitSignal& aSignal, Uint32);
353 void sendStopRep(ArbitSignal& aSignal, Uint32);
354
355 void sendSignalToQmgr(ArbitSignal& aSignal);
356 };
357
358 #endif
359