1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef ClusterMgr_H
26 #define ClusterMgr_H
27 
28 #include <ndb_limits.h>
29 #include <NdbThread.h>
30 #include <NdbMutex.h>
31 #include <NdbCondition.h>
32 #include <signaldata/ArbitSignalData.hpp>
33 #include <signaldata/NodeStateSignalData.hpp>
34 #include "trp_client.hpp"
35 #include "trp_node.hpp"
36 #include <signaldata/DisconnectRep.hpp>
37 
38 extern "C" void* runClusterMgr_C(void * me);
39 
40 
41 /**
42   @class ClusterMgr
43   This class runs a heartbeat protocol between nodes, to detect if remote
44   nodes are reachable or not. This protocol is needed because the underlying
45   transporter connection may need a long time (or even forever) to detect
46   node or network failure. (TCP typically gives up retransmission after about
47   20 minutes).
48   Therefore API_REGREQ signal are sent on regular intervals. If more than
49   three signals are unanswered (by API_REGCONF) the node is presumed dead or
50   unreachable, and the transporter is disconnected.
51   This class handles heart beat between the following types of node pairs:
52   API-DB, MGMD-DB and MGMD-MGMD, where DB means data node. There is another
53   heart beat mechanism between pairs of data nodes, using the CM_HEARTBEAT
54   signal.
55  */
56 class ClusterMgr : public trp_client
57 {
58   friend class TransporterFacade;
59   friend class ArbitMgr;
60   friend void* runClusterMgr_C(void * me);
61 public:
62   ClusterMgr(class TransporterFacade &);
63   virtual ~ClusterMgr();
64   void configure(Uint32 nodeId, const ndb_mgm_configuration* config);
65 
66   void reportConnected(NodeId nodeId);
67   void reportDisconnected(NodeId nodeId);
68 
69   void doStop();
70   void startThread();
71 
72   /**
73    * This method isn't used by the NDB code, it can be used by an API
74    * user through a public method on TransporterFacade if he wants to
75    * force the API node to use a different heartbeat interval than the
76    * one decided by the data node.
77    *
78    * The variable isn't protected and there is no need for it to be.
79    */
set_max_api_reg_req_interval(unsigned int millisec)80   void set_max_api_reg_req_interval(unsigned int millisec) {
81     m_max_api_reg_req_interval = millisec;
82   }
83 
lock()84   void lock() { NdbMutex_Lock(clusterMgrThreadMutex); trp_client::lock(); }
unlock()85   void unlock() { trp_client::unlock();NdbMutex_Unlock(clusterMgrThreadMutex); }
86 
87 private:
88   // 100ms is the smallest heart beat interval supported.
89   static const Uint32  minHeartBeatInterval = 100;
90 
91   void startup();
92   void threadMain();
93 
94   int  theStop;
95   /**
96    * We could end up in a situation where signals are delayed for more
97    * than 100 ms, either due to slow operation or due to that we're
98    * closing the TransporterFacade object. To avoid sending more than
99    * signal to ourself in these cases we add this boolean variable to
100    * indicate if we already sent a signal to ourself, this signal will
101    * eventually arrive since it's a local signal within the same process.
102   */
103   bool m_sent_API_REGREQ_to_myself;
104   class TransporterFacade & theFacade;
105   class ArbitMgr * theArbitMgr;
106 
107   enum Cluster_state {
108     CS_waiting_for_clean_cache = 0,
109     CS_waiting_for_first_connect,
110     CS_connected
111   };
112 
113 public:
114   /**
115    * The node state is protected for updates by ClusterMgrThreadMutex.
116    * One can call hb_received and set hbMissed to 0 though without
117    * protection since this is safe. All other uses of hbFrequency,
118    * hbCounter and hbMissed is internal to ClusterMgr and done with
119    * protection of ClusterMgrThreadMutex.
120    *
121    * The node data is often read without protection as a way to decide
122    * which node to communicate to. If the information read is old it
123    * will mean a non-optimal decision is taken, but no specific error
124    * will be the result of reading stale node info data.
125    *
126    * getNoOfConnectedNodes is only used by a test program, so is essentially
127    * also a private method.
128    */
129   struct Node : public trp_node
130   {
131     Node();
132 
133     /**
134      * Heartbeat stuff
135      */
136     Uint32 hbFrequency; // Heartbeat frequence
137     Uint32 hbCounter;   // # milliseconds passed since last hb sent
138     Uint32 hbMissed;    // # missed heartbeats
139   };
140 
141   const trp_node & getNodeInfo(NodeId) const;
142   Uint32        getNoOfConnectedNodes() const;
143   void          hb_received(NodeId);
144 
145   /**
146    * This variable isn't protected, it's used when the last node disconnects to
147    * ensure that the ClusterMgr stops and doesn't perform any reconnects.
148    */
149   int m_auto_reconnect;
150   Uint32        m_connect_count;
151 private:
152   Uint32        m_max_api_reg_req_interval;
153   Uint32        noOfAliveNodes;
154   Uint32        noOfConnectedNodes;
155   Uint32        noOfConnectedDBNodes;
156   Uint32        minDbVersion;
157   Node          theNodes[MAX_NODES];
158   NdbThread*    theClusterMgrThread;
159 
160   NdbCondition* waitForHBCond;
161 
162   enum Cluster_state m_cluster_state;
163   /**
164    * We use the trp_client lock to protect the variables inside of the
165    * ClusterMgr. We use the clusterMgrThreadMutex to control start of
166    * the ClusterMgr main thread. It also protects the theStop variable
167    * against concurrent usage. Finally we need to use the clusterMgrThreadMutex
168    * to protect against concurrent close of trp_client and call of
169    * do_poll.
170    */
171   NdbMutex*     clusterMgrThreadMutex;
172 
173   /**
174     The rate (in milliseconds) at which this node expects to receive
175     API_REGREQ heartbeat messages.
176    */
177   Uint32 m_hbFrequency;
178 
179   /**
180    * The maximal time between connection attempts to data nodes.
181    * start_connect_backoff_max_time is used before connection
182    * to the first data node has succeeded.
183    */
184   Uint32	start_connect_backoff_max_time;
185   Uint32	connect_backoff_max_time;
186 
187   /**
188    * Signals received
189    */
190   void execAPI_REGREQ    (const Uint32 * theData);
191   void execAPI_REGCONF   (const NdbApiSignal*, const LinearSectionPtr ptr[]);
192   void execAPI_REGREF    (const Uint32 * theData);
193   void execCONNECT_REP   (const NdbApiSignal*, const LinearSectionPtr ptr[]);
194   void execDISCONNECT_REP(const NdbApiSignal*, const LinearSectionPtr ptr[]);
195   void execNODE_FAILREP  (const NdbApiSignal*, const LinearSectionPtr ptr[]);
196   void execNF_COMPLETEREP(const NdbApiSignal*, const LinearSectionPtr ptr[]);
197 
198   void check_wait_for_hb(NodeId nodeId);
199 
200   bool is_cluster_completely_unavailable();
set_node_alive(trp_node & node,bool alive)201   inline void set_node_alive(trp_node& node, bool alive){
202 
203     // Only DB nodes can be "alive"
204     assert(!alive ||
205            (alive && node.m_info.getType() == NodeInfo::DB));
206 
207     if(node.m_alive && !alive)
208     {
209       assert(noOfAliveNodes);
210       noOfAliveNodes--;
211     }
212     else if(!node.m_alive && alive)
213     {
214       noOfAliveNodes++;
215     }
216     node.m_alive = alive;
217   }
218 
219   void set_node_dead(trp_node&);
220 
221   void print_nodes(const char* where, NdbOut& out = ndbout);
222   void recalcMinDbVersion();
223 
224 public:
225   /**
226    * trp_client interface
227    *
228    * This method is called from do_poll which is called from the ClusterMgr
229    * main thread, we keep the clusterMgrThreadMutex when calling this method,
230    * so all signal methods are protected.
231    */
232   virtual void trp_deliver_signal(const NdbApiSignal*,
233                                   const LinearSectionPtr p[3]);
234 };
235 
236 inline
237 const trp_node &
getNodeInfo(NodeId nodeId) const238 ClusterMgr::getNodeInfo(NodeId nodeId) const {
239   // Check array bounds
240   assert(nodeId < MAX_NODES);
241   return theNodes[nodeId];
242 }
243 
244 inline
245 Uint32
getNoOfConnectedNodes() const246 ClusterMgr::getNoOfConnectedNodes() const {
247   return noOfConnectedNodes;
248 }
249 
250 inline
251 void
hb_received(NodeId nodeId)252 ClusterMgr::hb_received(NodeId nodeId) {
253   // Check array bounds + don't allow node 0 to be touched
254   assert(nodeId > 0 && nodeId < MAX_NODES);
255   theNodes[nodeId].hbMissed = 0;
256 }
257 
258 /*****************************************************************************/
259 
260 /**
261  * @class ArbitMgr
262  * Arbitration manager.  Runs in separate thread.
263  * Started only by a request from the kernel.
264  */
265 
266 extern "C" void* runArbitMgr_C(void* me);
267 
268 class ArbitMgr
269 {
270 public:
271   ArbitMgr(class ClusterMgr &);
272   ~ArbitMgr();
273 
setRank(unsigned n)274   inline void setRank(unsigned n) { theRank = n; }
setDelay(unsigned n)275   inline void setDelay(unsigned n) { theDelay = n; }
276 
277   void doStart(const Uint32* theData);
278   void doChoose(const Uint32* theData);
279   void doStop(const Uint32* theData);
280 
281   friend void* runArbitMgr_C(void* me);
282 
283 private:
284   class ClusterMgr & m_clusterMgr;
285   unsigned theRank;
286   unsigned theDelay;
287 
288   void threadMain();
289   NdbThread* theThread;
290   NdbMutex* theThreadMutex;     // not really needed
291 
292   struct ArbitSignal {
293     GlobalSignalNumber gsn;
294     ArbitSignalData data;
295     NDB_TICKS startticks;
296 
ArbitSignalArbitMgr::ArbitSignal297     ArbitSignal() {}
298 
initArbitMgr::ArbitSignal299     inline void init(GlobalSignalNumber aGsn, const Uint32* aData) {
300       gsn = aGsn;
301       if (aData != NULL)
302         memcpy(&data, aData, sizeof(data));
303       else
304         memset(&data, 0, sizeof(data));
305     }
306 
setTimestampArbitMgr::ArbitSignal307     inline void setTimestamp() {
308       startticks = NdbTick_getCurrentTicks();
309     }
310 
getTimediffArbitMgr::ArbitSignal311     inline Uint64 getTimediff() {
312       const NDB_TICKS now = NdbTick_getCurrentTicks();
313       return NdbTick_Elapsed(startticks,now).milliSec();
314     }
315   };
316 
317   NdbMutex* theInputMutex;
318   NdbCondition* theInputCond;
319   int theInputTimeout;
320   bool theInputFull;            // the predicate
321   ArbitSignal theInputBuffer;   // shared buffer
322 
323   void sendSignalToThread(ArbitSignal& aSignal);
324 
325   enum State {                  // thread states
326     StateInit,
327     StateStarted,               // thread started
328     StateChoose1,               // received one valid REQ
329     StateChoose2,               // received two valid REQs
330     StateFinished               // finished one way or other
331   };
332   State theState;
333 
334   enum Stop {                   // stop code in ArbitSignal.data.code
335     StopExit = 1,               // at API exit
336     StopRequest = 2,            // request from kernel
337     StopRestart = 3             // stop before restart
338   };
339 
340   void threadStart(ArbitSignal& aSignal);       // handle thread events
341   void threadChoose(ArbitSignal& aSignal);
342   void threadTimeout();
343   void threadStop(ArbitSignal& aSignal);
344 
345   ArbitSignal theStartReq;
346   ArbitSignal theChooseReq1;
347   ArbitSignal theChooseReq2;
348   ArbitSignal theStopOrd;
349 
350   void sendStartConf(ArbitSignal& aSignal, Uint32);
351   void sendChooseRef(ArbitSignal& aSignal, Uint32);
352   void sendChooseConf(ArbitSignal& aSignal, Uint32);
353   void sendStopRep(ArbitSignal& aSignal, Uint32);
354 
355   void sendSignalToQmgr(ArbitSignal& aSignal);
356 };
357 
358 #endif
359