1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 #include <ndb_limits.h>
27 #include <util/version.h>
28 
29 #include "TransporterFacade.hpp"
30 #include <kernel/GlobalSignalNumbers.h>
31 
32 #include "ClusterMgr.hpp"
33 #include <IPCConfig.hpp>
34 #include "NdbApiSignal.hpp"
35 #include <NdbSleep.h>
36 #include <NdbOut.hpp>
37 #include <NdbTick.h>
38 
39 
40 #include <signaldata/NodeFailRep.hpp>
41 #include <signaldata/NFCompleteRep.hpp>
42 #include <signaldata/ApiRegSignalData.hpp>
43 #include <signaldata/AlterTable.hpp>
44 #include <signaldata/SumaImpl.hpp>
45 
46 #include <mgmapi.h>
47 #include <mgmapi_configuration.hpp>
48 #include <mgmapi_config_parameters.h>
49 
50 int global_flag_skip_invalidate_cache = 0;
51 int global_flag_skip_waiting_for_clean_cache = 0;
52 //#define DEBUG_REG
53 
54 // Just a C wrapper for threadMain
55 extern "C"
56 void*
runClusterMgr_C(void * me)57 runClusterMgr_C(void * me)
58 {
59   ((ClusterMgr*) me)->threadMain();
60 
61   return NULL;
62 }
63 
ClusterMgr(TransporterFacade & _facade)64 ClusterMgr::ClusterMgr(TransporterFacade & _facade):
65   theStop(0),
66   m_sent_API_REGREQ_to_myself(false),
67   theFacade(_facade),
68   theArbitMgr(NULL),
69   m_connect_count(0),
70   m_max_api_reg_req_interval(~0),
71   noOfAliveNodes(0),
72   noOfConnectedNodes(0),
73   noOfConnectedDBNodes(0),
74   minDbVersion(0),
75   theClusterMgrThread(NULL),
76   m_cluster_state(CS_waiting_for_clean_cache),
77   m_hbFrequency(0)
78 {
79   DBUG_ENTER("ClusterMgr::ClusterMgr");
80   clusterMgrThreadMutex = NdbMutex_Create();
81   waitForHBCond= NdbCondition_Create();
82   m_auto_reconnect = -1;
83 
84   Uint32 ret = this->open(&theFacade, API_CLUSTERMGR);
85   if (unlikely(ret == 0))
86   {
87     ndbout_c("Failed to register ClusterMgr! ret: %d", ret);
88     abort();
89   }
90   DBUG_VOID_RETURN;
91 }
92 
~ClusterMgr()93 ClusterMgr::~ClusterMgr()
94 {
95   DBUG_ENTER("ClusterMgr::~ClusterMgr");
96   assert(theStop == 1);
97   if (theArbitMgr != 0)
98   {
99     delete theArbitMgr;
100     theArbitMgr = 0;
101   }
102   NdbCondition_Destroy(waitForHBCond);
103   NdbMutex_Destroy(clusterMgrThreadMutex);
104   DBUG_VOID_RETURN;
105 }
106 
107 /**
108  * This method is called from start of cluster connection instance and
109  * before we have started any socket services and thus it needs no
110  * mutex protection since the ClusterMgr object isn't known by any other
111  * thread at this point in time.
112  */
113 void
configure(Uint32 nodeId,const ndb_mgm_configuration * config)114 ClusterMgr::configure(Uint32 nodeId,
115                       const ndb_mgm_configuration* config)
116 {
117   ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
118   for(iter.first(); iter.valid(); iter.next()){
119     Uint32 nodeId = 0;
120     if(iter.get(CFG_NODE_ID, &nodeId))
121       continue;
122 
123     // Check array bounds + don't allow node 0 to be touched
124     assert(nodeId > 0 && nodeId < MAX_NODES);
125     trp_node& theNode = theNodes[nodeId];
126     theNode.defined = true;
127 
128     unsigned type;
129     if(iter.get(CFG_TYPE_OF_SECTION, &type))
130       continue;
131 
132     switch(type){
133     case NODE_TYPE_DB:
134       theNode.m_info.m_type = NodeInfo::DB;
135       break;
136     case NODE_TYPE_API:
137       theNode.m_info.m_type = NodeInfo::API;
138       break;
139     case NODE_TYPE_MGM:
140       theNode.m_info.m_type = NodeInfo::MGM;
141       break;
142     default:
143       break;
144     }
145   }
146 
147   /* Mark all non existing nodes as not defined */
148   for(Uint32 i = 0; i<MAX_NODES; i++) {
149     if (iter.first())
150       continue;
151 
152     if (iter.find(CFG_NODE_ID, i))
153       theNodes[i]= Node();
154   }
155 
156 #if 0
157   print_nodes("init");
158 #endif
159 
160   // Configure arbitrator
161   Uint32 rank = 0;
162   iter.first();
163   iter.find(CFG_NODE_ID, nodeId); // let not found in config mean rank=0
164   iter.get(CFG_NODE_ARBIT_RANK, &rank);
165 
166   if (rank > 0)
167   {
168     // The arbitrator should be active
169     if (!theArbitMgr)
170       theArbitMgr = new ArbitMgr(* this);
171     theArbitMgr->setRank(rank);
172 
173     Uint32 delay = 0;
174     iter.get(CFG_NODE_ARBIT_DELAY, &delay);
175     theArbitMgr->setDelay(delay);
176   }
177   else if (theArbitMgr)
178   {
179     // No arbitrator should be started
180     theArbitMgr->doStop(NULL);
181     delete theArbitMgr;
182     theArbitMgr= NULL;
183   }
184 
185   // Configure heartbeats.
186   unsigned hbFrequency = 0;
187   iter.get(CFG_MGMD_MGMD_HEARTBEAT_INTERVAL, &hbFrequency);
188   m_hbFrequency = static_cast<Uint32>(hbFrequency);
189 
190   // Configure max backoff time for connection attempts to first
191   // data node.
192   Uint32 backoff_max_time = 0;
193   iter.get(CFG_START_CONNECT_BACKOFF_MAX_TIME,
194            &backoff_max_time);
195   start_connect_backoff_max_time = backoff_max_time;
196 
197   // Configure max backoff time for connection attempts to data
198   // nodes.
199   backoff_max_time = 0;
200   iter.get(CFG_CONNECT_BACKOFF_MAX_TIME, &backoff_max_time);
201   connect_backoff_max_time = backoff_max_time;
202 
203   theFacade.get_registry()->set_connect_backoff_max_time_in_ms(
204     start_connect_backoff_max_time);
205 }
206 
207 void
startThread()208 ClusterMgr::startThread()
209 {
210   /**
211    * We use the clusterMgrThreadMutex as a signalling object between this
212    * thread and the main thread of the ClusterMgr.
213    * The clusterMgrThreadMutex also protects the theStop-variable.
214    */
215   Guard g(clusterMgrThreadMutex);
216 
217   theStop = -1;
218   theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
219                                          (void**)this,
220                                          0, // default stack size
221                                          "ndb_clustermgr",
222                                          NDB_THREAD_PRIO_HIGH);
223   Uint32 cnt = 0;
224   while (theStop == -1 && cnt < 60)
225   {
226     NdbCondition_WaitTimeout(waitForHBCond, clusterMgrThreadMutex, 1000);
227   }
228 
229   assert(theStop == 0);
230 }
231 
232 void
doStop()233 ClusterMgr::doStop( ){
234   DBUG_ENTER("ClusterMgr::doStop");
235   {
236     /* Ensure stop is only executed once */
237     Guard g(clusterMgrThreadMutex);
238     if(theStop == 1){
239       DBUG_VOID_RETURN;
240     }
241     theStop = 1;
242   }
243 
244   void *status;
245   if (theClusterMgrThread) {
246     NdbThread_WaitFor(theClusterMgrThread, &status);
247     NdbThread_Destroy(&theClusterMgrThread);
248   }
249 
250   if (theArbitMgr != NULL)
251   {
252     theArbitMgr->doStop(NULL);
253   }
254   {
255     /**
256      * Need protection against concurrent execution of do_poll in main
257      * thread. We cannot rely only on the trp_client lock since it is
258      * not supposed to be locked when calling close (it is locked as
259      * part of the close logic.
260      */
261     Guard g(clusterMgrThreadMutex);
262     this->close(); // disconnect from TransporterFacade
263   }
264 
265   DBUG_VOID_RETURN;
266 }
267 
268 void
startup()269 ClusterMgr::startup()
270 {
271   assert(theStop == -1);
272   Uint32 nodeId = getOwnNodeId();
273   Node & cm_node = theNodes[nodeId];
274   trp_node & theNode = cm_node;
275   assert(theNode.defined);
276 
277   lock();
278   theFacade.doConnect(nodeId);
279   flush_send_buffers();
280   unlock();
281 
282   for (Uint32 i = 0; i<3000; i++)
283   {
284     theFacade.request_connection_check();
285     start_poll();
286     do_poll(0);
287     complete_poll();
288 
289     if (theNode.is_connected())
290       break;
291     NdbSleep_MilliSleep(20);
292   }
293 
294   assert(theNode.is_connected());
295   Guard g(clusterMgrThreadMutex);
296   /* Signalling to creating thread that we are done with thread startup */
297   theStop = 0;
298   NdbCondition_Broadcast(waitForHBCond);
299 }
300 
301 void
threadMain()302 ClusterMgr::threadMain()
303 {
304   startup();
305 
306   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
307 
308   signal.theVerId_signalNumber   = GSN_API_REGREQ;
309   signal.theTrace                = 0;
310   signal.theLength               = ApiRegReq::SignalLength;
311 
312   ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
313   req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
314   req->version = NDB_VERSION;
315   req->mysql_version = NDB_MYSQL_VERSION_D;
316 
317   NdbApiSignal nodeFail_signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
318   nodeFail_signal.theVerId_signalNumber = GSN_NODE_FAILREP;
319   nodeFail_signal.theReceiversBlockNumber = API_CLUSTERMGR;
320   nodeFail_signal.theTrace  = 0;
321   nodeFail_signal.theLength = NodeFailRep::SignalLengthLong;
322 
323   NDB_TICKS now = NdbTick_getCurrentTicks();
324 
325   while(!theStop)
326   {
327     /* Sleep 1/5 of minHeartBeatInterval between each check */
328     const NDB_TICKS before = now;
329     for (Uint32 i = 0; i<5; i++)
330     {
331       NdbSleep_MilliSleep(minHeartBeatInterval/5);
332       {
333         /**
334          * start_poll does lock the trp_client and complete_poll
335          * releases this lock. This means that this protects
336          * against concurrent calls to send signals in ArbitMgr.
337          * We do however need to protect also against concurrent
338          * close in doStop, so to avoid this problem we need to
339          * also lock clusterMgrThreadMutex before we start the
340          * poll.
341          */
342         Guard g(clusterMgrThreadMutex);
343         start_poll();
344         do_poll(0);
345         complete_poll();
346       }
347     }
348     now = NdbTick_getCurrentTicks();
349     const Uint32 timeSlept = (Uint32)NdbTick_Elapsed(before, now).milliSec();
350 
351     lock();
352     if (m_cluster_state == CS_waiting_for_clean_cache &&
353         theFacade.m_globalDictCache)
354     {
355       if (!global_flag_skip_waiting_for_clean_cache)
356       {
357         theFacade.m_globalDictCache->lock();
358         unsigned sz= theFacade.m_globalDictCache->get_size();
359         theFacade.m_globalDictCache->unlock();
360         if (sz)
361         {
362           unlock();
363           continue;
364         }
365       }
366       m_cluster_state = CS_waiting_for_first_connect;
367     }
368 
369     NodeFailRep * nodeFailRep = CAST_PTR(NodeFailRep,
370                                          nodeFail_signal.getDataPtrSend());
371     nodeFailRep->noOfNodes = 0;
372     NodeBitmask::clear(nodeFailRep->theAllNodes);
373 
374     for (int i = 1; i < MAX_NODES; i++)
375     {
376       /**
377        * Send register request (heartbeat) to all available nodes
378        * at specified timing intervals
379        */
380       const NodeId nodeId = i;
381       // Check array bounds + don't allow node 0 to be touched
382       assert(nodeId > 0 && nodeId < MAX_NODES);
383       Node & cm_node = theNodes[nodeId];
384       trp_node & theNode = cm_node;
385 
386       if (!theNode.defined)
387 	continue;
388 
389       if (theNode.is_connected() == false){
390 	theFacade.doConnect(nodeId);
391 	continue;
392       }
393 
394       if (!theNode.compatible){
395 	continue;
396       }
397 
398       if (nodeId == getOwnNodeId())
399       {
400         /**
401          * Don't send HB to self more than once
402          * (once needed to avoid weird special cases in e.g ConfigManager)
403          */
404         if (m_sent_API_REGREQ_to_myself)
405         {
406           continue;
407         }
408       }
409 
410       cm_node.hbCounter += timeSlept;
411       if (cm_node.hbCounter >= m_max_api_reg_req_interval ||
412           cm_node.hbCounter >= cm_node.hbFrequency)
413       {
414 	/**
415 	 * It is now time to send a new Heartbeat
416 	 */
417         if (cm_node.hbCounter >= cm_node.hbFrequency)
418         {
419           cm_node.hbMissed++;
420           cm_node.hbCounter = 0;
421 	}
422 
423         if (theNode.m_info.m_type != NodeInfo::DB)
424           signal.theReceiversBlockNumber = API_CLUSTERMGR;
425         else
426           signal.theReceiversBlockNumber = QMGR;
427 
428 #ifdef DEBUG_REG
429 	ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
430 #endif
431         if (nodeId == getOwnNodeId())
432         {
433           /* Set flag to ensure we only send once to ourself */
434           m_sent_API_REGREQ_to_myself = true;
435         }
436 	raw_sendSignal(&signal, nodeId);
437       }//if
438 
439       if (cm_node.hbMissed == 4 && cm_node.hbFrequency > 0)
440       {
441         nodeFailRep->noOfNodes++;
442         NodeBitmask::set(nodeFailRep->theAllNodes, nodeId);
443       }
444     }
445     flush_send_buffers();
446     unlock();
447 
448     if (nodeFailRep->noOfNodes)
449     {
450       lock();
451       raw_sendSignal(&nodeFail_signal, getOwnNodeId());
452       flush_send_buffers();
453       unlock();
454     }
455   }
456 }
457 
458 /**
459  * We're holding the trp_client lock while performing poll from
460  * ClusterMgr. So we always execute all the execSIGNAL-methods in
461  * ClusterMgr with protection other methods that use the trp_client
462  * lock (reportDisconnect, reportConnect, is_cluster_completely_unavailable,
463  * ArbitMgr (sendSignalToQmgr)).
464  */
465 void
trp_deliver_signal(const NdbApiSignal * sig,const LinearSectionPtr ptr[3])466 ClusterMgr::trp_deliver_signal(const NdbApiSignal* sig,
467                                const LinearSectionPtr ptr[3])
468 {
469   const Uint32 gsn = sig->theVerId_signalNumber;
470   const Uint32 * theData = sig->getDataPtr();
471 
472   switch (gsn){
473   case GSN_API_REGREQ:
474     execAPI_REGREQ(theData);
475     break;
476 
477   case GSN_API_REGCONF:
478      execAPI_REGCONF(sig, ptr);
479     break;
480 
481   case GSN_API_REGREF:
482     execAPI_REGREF(theData);
483     break;
484 
485   case GSN_NODE_FAILREP:
486     execNODE_FAILREP(sig, ptr);
487     break;
488 
489   case GSN_NF_COMPLETEREP:
490     execNF_COMPLETEREP(sig, ptr);
491     break;
492   case GSN_ARBIT_STARTREQ:
493     if (theArbitMgr != NULL)
494       theArbitMgr->doStart(theData);
495     break;
496 
497   case GSN_ARBIT_CHOOSEREQ:
498     if (theArbitMgr != NULL)
499       theArbitMgr->doChoose(theData);
500     break;
501 
502   case GSN_ARBIT_STOPORD:
503     if(theArbitMgr != NULL)
504       theArbitMgr->doStop(theData);
505     break;
506 
507   case GSN_ALTER_TABLE_REP:
508   {
509     if (theFacade.m_globalDictCache == NULL)
510       break;
511     const AlterTableRep* rep = (const AlterTableRep*)theData;
512     theFacade.m_globalDictCache->lock();
513     theFacade.m_globalDictCache->
514       alter_table_rep((const char*)ptr[0].p,
515                       rep->tableId,
516                       rep->tableVersion,
517                       rep->changeType == AlterTableRep::CT_ALTERED);
518     theFacade.m_globalDictCache->unlock();
519     break;
520   }
521   case GSN_SUB_GCP_COMPLETE_REP:
522   {
523     /**
524      * Report
525      */
526     theFacade.for_each(this, sig, ptr);
527 
528     /**
529      * Reply
530      */
531     {
532       BlockReference ownRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
533       NdbApiSignal tSignal(* sig);
534       Uint32* send= tSignal.getDataPtrSend();
535       memcpy(send, theData, tSignal.getLength() << 2);
536       CAST_PTR(SubGcpCompleteAck, send)->rep.senderRef = ownRef;
537       Uint32 ref= sig->theSendersBlockRef;
538       Uint32 aNodeId= refToNode(ref);
539       tSignal.theReceiversBlockNumber= refToBlock(ref);
540       tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
541       tSignal.theSendersBlockRef = API_CLUSTERMGR;
542       safe_noflush_sendSignal(&tSignal, aNodeId);
543     }
544     break;
545   }
546   case GSN_TAKE_OVERTCCONF:
547   {
548     /**
549      * Report
550      */
551     theFacade.for_each(this, sig, ptr);
552     return;
553   }
554   case GSN_CONNECT_REP:
555   {
556     execCONNECT_REP(sig, ptr);
557     return;
558   }
559   case GSN_DISCONNECT_REP:
560   {
561     execDISCONNECT_REP(sig, ptr);
562     return;
563   }
564   case GSN_CLOSE_COMREQ:
565   {
566     theFacade.perform_close_clnt(this);
567     return;
568   }
569   case GSN_EXPAND_CLNT:
570   {
571     theFacade.expand_clnt();
572     return;
573   }
574   default:
575     break;
576 
577   }
578   return;
579 }
580 
Node()581 ClusterMgr::Node::Node()
582   : hbFrequency(0), hbCounter(0)
583 {
584 }
585 
586 /**
587  * recalcMinDbVersion
588  *
589  * This method is called whenever the 'minimum DB node
590  * version' data for the connected DB nodes changes
591  * It calculates the minimum version of all the connected
592  * DB nodes.
593  * This information is cached by Ndb object instances.
594  * This information is useful when implementing API compatibility
595  * with older DB nodes
596  */
597 void
recalcMinDbVersion()598 ClusterMgr::recalcMinDbVersion()
599 {
600   Uint32 newMinDbVersion = ~ (Uint32) 0;
601 
602   for (Uint32 i = 0; i < MAX_NODES; i++)
603   {
604     trp_node& node = theNodes[i];
605 
606     if (node.is_connected() &&
607         node.is_confirmed() &&
608         node.m_info.getType() == NodeInfo::DB)
609     {
610       /* Include this node in the set of nodes used to
611        * compute the lowest current DB node version
612        */
613       assert(node.m_info.m_version);
614 
615       if (node.minDbVersion < newMinDbVersion)
616       {
617         newMinDbVersion = node.minDbVersion;
618       }
619     }
620   }
621 
622   /* Now update global min Db version if we have one.
623    * Otherwise set it to 0
624    */
625   newMinDbVersion = (newMinDbVersion == ~ (Uint32) 0) ?
626     0 :
627     newMinDbVersion;
628 
629 //#ifdef DEBUG_MINVER
630 
631 #ifdef DEBUG_MINVER
632   if (newMinDbVersion != minDbVersion)
633   {
634     ndbout << "Previous min Db node version was "
635            << NdbVersion(minDbVersion)
636            << " new min is "
637            << NdbVersion(newMinDbVersion)
638            << endl;
639   }
640   else
641   {
642     ndbout << "MinDbVersion recalculated, but is same : "
643            << NdbVersion(minDbVersion)
644            << endl;
645   }
646 #endif
647 
648   minDbVersion = newMinDbVersion;
649 }
650 
651 /******************************************************************************
652  * API_REGREQ and friends
653  ******************************************************************************/
654 
655 void
execAPI_REGREQ(const Uint32 * theData)656 ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
657   const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
658   const NodeId nodeId = refToNode(apiRegReq->ref);
659 
660 #ifdef DEBUG_REG
661   ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
662 #endif
663 
664   assert(nodeId > 0 && nodeId < MAX_NODES);
665 
666   Node & cm_node = theNodes[nodeId];
667   trp_node & node = cm_node;
668   assert(node.defined == true);
669   assert(node.is_connected() == true);
670 
671   /*
672      API nodes send API_REGREQ once to themselves. Other than that, there are
673      no API-API heart beats.
674   */
675   assert(cm_node.m_info.m_type != NodeInfo::API ||
676          (nodeId == getOwnNodeId() &&
677           !cm_node.is_confirmed()));
678 
679   if(node.m_info.m_version != apiRegReq->version){
680     node.m_info.m_version = apiRegReq->version;
681     node.m_info.m_mysql_version = apiRegReq->mysql_version;
682     if (node.m_info.m_version < NDBD_SPLIT_VERSION)
683       node.m_info.m_mysql_version = 0;
684 
685     if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
686 	getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
687       node.compatible = false;
688     } else {
689       node.compatible = true;
690     }
691   }
692 
693   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
694   signal.theVerId_signalNumber   = GSN_API_REGCONF;
695   signal.theReceiversBlockNumber = API_CLUSTERMGR;
696   signal.theTrace                = 0;
697   signal.theLength               = ApiRegConf::SignalLength;
698 
699   ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
700   conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
701   conf->version = NDB_VERSION;
702   conf->mysql_version = NDB_MYSQL_VERSION_D;
703 
704   /*
705     This is the frequency (in centiseonds) at which we want the other node
706     to send API_REGREQ messages.
707   */
708   conf->apiHeartbeatFrequency = m_hbFrequency/10;
709 
710   conf->minDbVersion= 0;
711   conf->nodeState= node.m_state;
712 
713   node.set_confirmed(true);
714   if (safe_sendSignal(&signal, nodeId) != 0)
715     node.set_confirmed(false);
716 }
717 
718 void
execAPI_REGCONF(const NdbApiSignal * signal,const LinearSectionPtr ptr[])719 ClusterMgr::execAPI_REGCONF(const NdbApiSignal * signal,
720                             const LinearSectionPtr ptr[])
721 {
722   const ApiRegConf * apiRegConf = CAST_CONSTPTR(ApiRegConf,
723                                                 signal->getDataPtr());
724   const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
725 
726 #ifdef DEBUG_REG
727   ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
728 #endif
729 
730   assert(nodeId > 0 && nodeId < MAX_NODES);
731 
732   Node & cm_node = theNodes[nodeId];
733   trp_node & node = cm_node;
734   assert(node.defined == true);
735   assert(node.is_connected() == true);
736 
737   if(node.m_info.m_version != apiRegConf->version){
738     node.m_info.m_version = apiRegConf->version;
739     node.m_info.m_mysql_version = apiRegConf->mysql_version;
740     if (node.m_info.m_version < NDBD_SPLIT_VERSION)
741       node.m_info.m_mysql_version = 0;
742 
743     if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM)
744       node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
745 					       node.m_info.m_version);
746     else
747       node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
748 					      node.m_info.m_version);
749   }
750 
751   node.set_confirmed(true);
752 
753   if (node.minDbVersion != apiRegConf->minDbVersion)
754   {
755     node.minDbVersion = apiRegConf->minDbVersion;
756     recalcMinDbVersion();
757   }
758 
759   if (node.m_info.m_version >= NDBD_255_NODES_VERSION)
760   {
761     node.m_state = apiRegConf->nodeState;
762   }
763   else
764   {
765     /**
766      * from 2 to 8 words = 6 words diff, 6*4 = 24
767      */
768     memcpy(&node.m_state, &apiRegConf->nodeState, sizeof(node.m_state) - 24);
769   }
770 
771   if (node.m_info.m_type == NodeInfo::DB)
772   {
773     /**
774      * Only set DB nodes to "alive"
775      */
776     if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED ||
777                             node.m_state.getSingleUserMode()))
778     {
779       set_node_alive(node, true);
780     }
781     else
782     {
783       set_node_alive(node, false);
784     }
785   }
786 
787   cm_node.hbMissed = 0;
788   cm_node.hbCounter = 0;
789   /*
790     By convention, conf->apiHeartbeatFrequency is in centiseconds rather than
791     milliseconds. See also Qmgr::sendApiRegConf().
792    */
793   const Int64 freq =
794     (static_cast<Int64>(apiRegConf->apiHeartbeatFrequency) * 10) - 50;
795 
796   if (freq > UINT_MAX32)
797   {
798     // In case of overflow.
799     assert(false);  /* Note this assert fails on some upgrades... */
800     cm_node.hbFrequency = UINT_MAX32;
801   }
802   else if (freq < minHeartBeatInterval)
803   {
804     /**
805      * We use minHeartBeatInterval as a lower limit. This also prevents
806      * against underflow.
807      */
808     cm_node.hbFrequency = minHeartBeatInterval;
809   }
810   else
811   {
812     cm_node.hbFrequency = static_cast<Uint32>(freq);
813   }
814 
815   // If responding nodes indicates that it is connected to other
816   // nodes, that makes it probable that those nodes are alive and
817   // available also for this node.
818   for (int db_node_id = 1; db_node_id <= MAX_DATA_NODE_ID; db_node_id ++)
819   {
820     if (node.m_state.m_connected_nodes.get(db_node_id))
821     {
822       // Tell this nodes start clients thread that db_node_id
823       // is up and probable connectable.
824       theFacade.theTransporterRegistry->indicate_node_up(db_node_id);
825     }
826   }
827 
828   // Distribute signal to all threads/blocks
829   // TODO only if state changed...
830   theFacade.for_each(this, signal, ptr);
831 }
832 
833 void
execAPI_REGREF(const Uint32 * theData)834 ClusterMgr::execAPI_REGREF(const Uint32 * theData){
835 
836   ApiRegRef * ref = (ApiRegRef*)theData;
837 
838   const NodeId nodeId = refToNode(ref->ref);
839 
840   assert(nodeId > 0 && nodeId < MAX_NODES);
841 
842   Node & cm_node = theNodes[nodeId];
843   trp_node & node = cm_node;
844 
845   assert(node.is_connected() == true);
846   assert(node.defined == true);
847   /* Only DB nodes will send API_REGREF */
848   assert(node.m_info.getType() == NodeInfo::DB);
849 
850   node.compatible = false;
851   set_node_alive(node, false);
852   node.m_state = NodeState::SL_NOTHING;
853   node.m_info.m_version = ref->version;
854 
855   switch(ref->errorCode){
856   case ApiRegRef::WrongType:
857     ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
858     abort();
859   case ApiRegRef::UnsupportedVersion:
860   default:
861     break;
862   }
863 }
864 
865 void
execNF_COMPLETEREP(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])866 ClusterMgr::execNF_COMPLETEREP(const NdbApiSignal* signal,
867                                const LinearSectionPtr ptr[3])
868 {
869   const NFCompleteRep * nfComp = CAST_CONSTPTR(NFCompleteRep,
870                                                signal->getDataPtr());
871   const NodeId nodeId = nfComp->failedNodeId;
872   assert(nodeId > 0 && nodeId < MAX_NODES);
873 
874   trp_node & node = theNodes[nodeId];
875   if (node.nfCompleteRep == false)
876   {
877     node.nfCompleteRep = true;
878     theFacade.for_each(this, signal, ptr);
879   }
880 }
881 
882 /**
883  * This is called as a callback when executing update_connections which
884  * is always called with ownership of trp_client lock.
885  */
886 void
reportConnected(NodeId nodeId)887 ClusterMgr::reportConnected(NodeId nodeId)
888 {
889   DBUG_ENTER("ClusterMgr::reportConnected");
890   DBUG_PRINT("info", ("nodeId: %u", nodeId));
891   /**
892    * Ensure that we are sending heartbeat every 100 ms
893    * until we have got the first reply from NDB providing
894    * us with the real time-out period to use.
895    */
896   assert(nodeId > 0 && nodeId < MAX_NODES);
897   if (nodeId != getOwnNodeId())
898   {
899     noOfConnectedNodes++;
900   }
901 
902   Node & cm_node = theNodes[nodeId];
903   trp_node & theNode = cm_node;
904 
905   if (theNode.m_info.m_type == NodeInfo::DB)
906   {
907     noOfConnectedDBNodes++;
908     if (noOfConnectedDBNodes == 1)
909     {
910       // Data node connected, use ConnectBackoffMaxTime
911       theFacade.get_registry()->set_connect_backoff_max_time_in_ms(connect_backoff_max_time);
912     }
913   }
914 
915   cm_node.hbMissed = 0;
916   cm_node.hbCounter = 0;
917   cm_node.hbFrequency = 0;
918 
919   assert(theNode.is_connected() == false);
920 
921   /**
922    * make sure the node itself is marked connected even
923    * if first API_REGCONF has not arrived
924    */
925   theNode.set_connected(true);
926   theNode.m_state.m_connected_nodes.set(nodeId);
927   theNode.m_info.m_version = 0;
928   theNode.compatible = true;
929   theNode.nfCompleteRep = true;
930   theNode.m_node_fail_rep = false;
931   theNode.m_state.startLevel = NodeState::SL_NOTHING;
932   theNode.minDbVersion = 0;
933 
934   /**
935    * We know that we have clusterMgrThreadMutex and trp_client::mutex
936    *   but we don't know if we are polling...and for_each can
937    *   only be used by a poller...
938    *
939    * Send signal to self, so that we can do this when receiving a signal
940    */
941   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
942   signal.theVerId_signalNumber = GSN_CONNECT_REP;
943   signal.theReceiversBlockNumber = API_CLUSTERMGR;
944   signal.theTrace  = 0;
945   signal.theLength = 1;
946   signal.getDataPtrSend()[0] = nodeId;
947   raw_sendSignal(&signal, getOwnNodeId());
948   DBUG_VOID_RETURN;
949 }
950 
951 void
execCONNECT_REP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])952 ClusterMgr::execCONNECT_REP(const NdbApiSignal* sig,
953                             const LinearSectionPtr ptr[])
954 {
955   theFacade.for_each(this, sig, 0);
956 }
957 
958 void
set_node_dead(trp_node & theNode)959 ClusterMgr::set_node_dead(trp_node& theNode)
960 {
961   set_node_alive(theNode, false);
962   theNode.set_confirmed(false);
963   theNode.m_state.m_connected_nodes.clear();
964   theNode.m_state.startLevel = NodeState::SL_NOTHING;
965   theNode.m_info.m_connectCount ++;
966   theNode.nfCompleteRep = false;
967 }
968 
969 void
reportDisconnected(NodeId nodeId)970 ClusterMgr::reportDisconnected(NodeId nodeId)
971 {
972   assert(nodeId > 0 && nodeId < MAX_NODES);
973   assert(noOfConnectedNodes > 0);
974 
975   /**
976    * We know that we have trp_client lock
977    *   but we don't know if we are polling...and for_each can
978    *   only be used by a poller...
979    *
980    * Send signal to self, so that we can do this when receiving a signal
981    */
982   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
983   signal.theVerId_signalNumber = GSN_DISCONNECT_REP;
984   signal.theReceiversBlockNumber = API_CLUSTERMGR;
985   signal.theTrace  = 0;
986   signal.theLength = DisconnectRep::SignalLength;
987 
988   DisconnectRep * rep = CAST_PTR(DisconnectRep, signal.getDataPtrSend());
989   rep->nodeId = nodeId;
990   rep->err = 0;
991   raw_sendSignal(&signal, getOwnNodeId());
992 }
993 
994 void
execDISCONNECT_REP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])995 ClusterMgr::execDISCONNECT_REP(const NdbApiSignal* sig,
996                                const LinearSectionPtr ptr[])
997 {
998   const DisconnectRep * rep = CAST_CONSTPTR(DisconnectRep, sig->getDataPtr());
999   Uint32 nodeId = rep->nodeId;
1000 
1001   assert(nodeId > 0 && nodeId < MAX_NODES);
1002   Node & cm_node = theNodes[nodeId];
1003   trp_node & theNode = cm_node;
1004 
1005   bool node_failrep = theNode.m_node_fail_rep;
1006   set_node_dead(theNode);
1007   theNode.set_connected(false);
1008 
1009   noOfConnectedNodes--;
1010   if (noOfConnectedNodes == 0)
1011   {
1012     if (!global_flag_skip_invalidate_cache &&
1013         theFacade.m_globalDictCache)
1014     {
1015       theFacade.m_globalDictCache->lock();
1016       theFacade.m_globalDictCache->invalidate_all();
1017       theFacade.m_globalDictCache->unlock();
1018       m_connect_count ++;
1019       m_cluster_state = CS_waiting_for_clean_cache;
1020     }
1021 
1022     if (m_auto_reconnect == 0)
1023     {
1024       theStop = 2;
1025     }
1026   }
1027 
1028   if (theNode.m_info.m_type == NodeInfo::DB)
1029   {
1030     assert(noOfConnectedDBNodes > 0);
1031     noOfConnectedDBNodes--;
1032     if (noOfConnectedDBNodes == 0)
1033     {
1034       // No data nodes connected, use StartConnectBackoffMaxTime
1035       theFacade.get_registry()->set_connect_backoff_max_time_in_ms(start_connect_backoff_max_time);
1036     }
1037   }
1038 
1039   if (node_failrep == false)
1040   {
1041     /**
1042      * Inform API
1043      */
1044     NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
1045     signal.theVerId_signalNumber = GSN_NODE_FAILREP;
1046     signal.theReceiversBlockNumber = API_CLUSTERMGR;
1047     signal.theTrace  = 0;
1048     signal.theLength = NodeFailRep::SignalLengthLong;
1049 
1050     NodeFailRep * rep = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
1051     rep->failNo = 0;
1052     rep->masterNodeId = 0;
1053     rep->noOfNodes = 1;
1054     NodeBitmask::clear(rep->theAllNodes);
1055     NodeBitmask::set(rep->theAllNodes, nodeId);
1056     execNODE_FAILREP(&signal, 0);
1057   }
1058 }
1059 
1060 void
execNODE_FAILREP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])1061 ClusterMgr::execNODE_FAILREP(const NdbApiSignal* sig,
1062                              const LinearSectionPtr ptr[])
1063 {
1064   const NodeFailRep * rep = CAST_CONSTPTR(NodeFailRep, sig->getDataPtr());
1065   NodeBitmask mask;
1066   if (sig->getLength() == NodeFailRep::SignalLengthLong)
1067   {
1068     mask.assign(NodeBitmask::Size, rep->theAllNodes);
1069   }
1070   else
1071   {
1072     mask.assign(NdbNodeBitmask::Size, rep->theNodes);
1073   }
1074 
1075   NdbApiSignal signal(sig->theSendersBlockRef);
1076   signal.theVerId_signalNumber = GSN_NODE_FAILREP;
1077   signal.theReceiversBlockNumber = API_CLUSTERMGR;
1078   signal.theTrace  = 0;
1079   signal.theLength = NodeFailRep::SignalLengthLong;
1080 
1081   NodeFailRep * copy = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
1082   copy->failNo = 0;
1083   copy->masterNodeId = 0;
1084   copy->noOfNodes = 0;
1085   NodeBitmask::clear(copy->theAllNodes);
1086 
1087   for (Uint32 i = mask.find_first(); i != NodeBitmask::NotFound;
1088        i = mask.find_next(i + 1))
1089   {
1090     Node & cm_node = theNodes[i];
1091     trp_node & theNode = cm_node;
1092 
1093     bool node_failrep = theNode.m_node_fail_rep;
1094     bool connected = theNode.is_connected();
1095     set_node_dead(theNode);
1096 
1097     if (node_failrep == false)
1098     {
1099       theNode.m_node_fail_rep = true;
1100       NodeBitmask::set(copy->theAllNodes, i);
1101       copy->noOfNodes++;
1102     }
1103 
1104     if (connected)
1105     {
1106       theFacade.doDisconnect(i);
1107     }
1108   }
1109 
1110   recalcMinDbVersion();
1111   if (copy->noOfNodes)
1112   {
1113     theFacade.for_each(this, &signal, 0); // report GSN_NODE_FAILREP
1114   }
1115 
1116   if (noOfAliveNodes == 0)
1117   {
1118     NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
1119     signal.theVerId_signalNumber = GSN_NF_COMPLETEREP;
1120     signal.theReceiversBlockNumber = 0;
1121     signal.theTrace  = 0;
1122     signal.theLength = NFCompleteRep::SignalLength;
1123 
1124     NFCompleteRep * rep = CAST_PTR(NFCompleteRep, signal.getDataPtrSend());
1125     rep->blockNo =0;
1126     rep->nodeId = getOwnNodeId();
1127     rep->unused = 0;
1128     rep->from = __LINE__;
1129 
1130     for (Uint32 i = 1; i < MAX_NODES; i++)
1131     {
1132       trp_node& theNode = theNodes[i];
1133       if (theNode.defined && theNode.nfCompleteRep == false)
1134       {
1135         rep->failedNodeId = i;
1136         execNF_COMPLETEREP(&signal, 0);
1137       }
1138     }
1139   }
1140 }
1141 
1142 bool
is_cluster_completely_unavailable()1143 ClusterMgr::is_cluster_completely_unavailable()
1144 {
1145   bool ret_code = true;
1146 
1147   /**
1148    * This method (and several other 'node state getters') allow
1149    * reading of theNodes[] from multiple block threads while
1150    * ClusterMgr concurrently updates them. Thus, a mutex should
1151    * have been expected here. See bug#20391191, and addendum patches
1152    * to bug#19524096, to understand what prevents us from locking (yet)
1153    */
1154   for (NodeId n = 1; n < MAX_NDB_NODES ; n++)
1155   {
1156     const trp_node& node = theNodes[n];
1157     if (!node.defined)
1158     {
1159       /**
1160        * Node isn't even part of configuration.
1161        */
1162       continue;
1163     }
1164     if (node.m_state.startLevel > NodeState::SL_STARTED)
1165     {
1166       /**
1167        * Node is stopping, so isn't available for any transactions,
1168        * so not available for us to use.
1169        */
1170       continue;
1171     }
1172     if (!node.compatible)
1173     {
1174       /**
1175        * The node isn't compatible with ours, so we can't use it
1176        */
1177       continue;
1178     }
1179     if (node.m_alive ||
1180         node.m_state.startLevel == NodeState::SL_STARTING ||
1181         node.m_state.startLevel == NodeState::SL_STARTED)
1182     {
1183       /**
1184        * We found a node that is either alive (less likely since we call this
1185        * method), or it is in state SL_STARTING which means that we were
1186        * allowed to connect, this means that we will very shortly be able to
1187        * use this connection. So this means that we know that the current
1188        * connection problem is a temporary issue and we can report a temporary
1189        * error instead of reporting 4009.
1190        *
1191        * We can deduce that the cluster isn't ready to be declared down
1192        * yet, we have a link to a starting node. We either very soon have
1193        * a working cluster, or we already have a working cluster but we
1194        * haven't yet the most up-to-date information about the cluster state.
1195        * So the cluster will soon be available again very likely, so
1196        * we can report a temporary error rather than an unknown error.
1197        */
1198       ret_code = false;
1199       break;
1200     }
1201   }
1202   return ret_code;
1203 }
1204 
1205 void
print_nodes(const char * where,NdbOut & out)1206 ClusterMgr::print_nodes(const char* where, NdbOut& out)
1207 {
1208   out << where << " >>" << endl;
1209   for (NodeId n = 1; n < MAX_NODES ; n++)
1210   {
1211     const trp_node node = getNodeInfo(n);
1212     if (!node.defined)
1213       continue;
1214     out << "node: " << n << endl;
1215     out << " -";
1216     out << " connected: " << node.is_connected();
1217     out << ", compatible: " << node.compatible;
1218     out << ", nf_complete_rep: " << node.nfCompleteRep;
1219     out << ", alive: " << node.m_alive;
1220     out << ", confirmed: " << node.is_confirmed();
1221     out << endl;
1222 
1223     out << " - " << node.m_info << endl;
1224     out << " - " << node.m_state << endl;
1225   }
1226   out << "<<" << endl;
1227 }
1228 
1229 
1230 /******************************************************************************
1231  * Arbitrator
1232  ******************************************************************************/
ArbitMgr(ClusterMgr & c)1233 ArbitMgr::ArbitMgr(ClusterMgr & c)
1234   : m_clusterMgr(c)
1235 {
1236   DBUG_ENTER("ArbitMgr::ArbitMgr");
1237 
1238   theThreadMutex = NdbMutex_Create();
1239   theInputCond = NdbCondition_Create();
1240   theInputMutex = NdbMutex_Create();
1241 
1242   theRank = 0;
1243   theDelay = 0;
1244   theThread = 0;
1245 
1246   theInputTimeout = 0;
1247   theInputFull = false;
1248   memset(&theInputBuffer, 0, sizeof(theInputBuffer));
1249   theState = StateInit;
1250 
1251   memset(&theStartReq, 0, sizeof(theStartReq));
1252   memset(&theChooseReq1, 0, sizeof(theChooseReq1));
1253   memset(&theChooseReq2, 0, sizeof(theChooseReq2));
1254   memset(&theStopOrd, 0, sizeof(theStopOrd));
1255 
1256   DBUG_VOID_RETURN;
1257 }
1258 
~ArbitMgr()1259 ArbitMgr::~ArbitMgr()
1260 {
1261   DBUG_ENTER("ArbitMgr::~ArbitMgr");
1262   NdbMutex_Destroy(theThreadMutex);
1263   NdbCondition_Destroy(theInputCond);
1264   NdbMutex_Destroy(theInputMutex);
1265   DBUG_VOID_RETURN;
1266 }
1267 
1268 // Start arbitrator thread.  This is kernel request.
1269 // First stop any previous thread since it is a left-over
1270 // which was never used and which now has wrong ticket.
1271 void
doStart(const Uint32 * theData)1272 ArbitMgr::doStart(const Uint32* theData)
1273 {
1274   ArbitSignal aSignal;
1275   NdbMutex_Lock(theThreadMutex);
1276   if (theThread != NULL) {
1277     aSignal.init(GSN_ARBIT_STOPORD, NULL);
1278     aSignal.data.code = StopRestart;
1279     sendSignalToThread(aSignal);
1280     void* value;
1281     NdbThread_WaitFor(theThread, &value);
1282     NdbThread_Destroy(&theThread);
1283     theState = StateInit;
1284     theInputFull = false;
1285   }
1286   aSignal.init(GSN_ARBIT_STARTREQ, theData);
1287   sendSignalToThread(aSignal);
1288   theThread = NdbThread_Create(
1289     runArbitMgr_C, (void**)this,
1290     0, // default stack size
1291     "ndb_arbitmgr",
1292     NDB_THREAD_PRIO_HIGH);
1293   NdbMutex_Unlock(theThreadMutex);
1294 }
1295 
1296 // The "choose me" signal from a candidate.
1297 void
doChoose(const Uint32 * theData)1298 ArbitMgr::doChoose(const Uint32* theData)
1299 {
1300   ArbitSignal aSignal;
1301   aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
1302   sendSignalToThread(aSignal);
1303 }
1304 
1305 // Stop arbitrator thread via stop signal from the kernel
1306 // or when exiting API program.
1307 void
doStop(const Uint32 * theData)1308 ArbitMgr::doStop(const Uint32* theData)
1309 {
1310   DBUG_ENTER("ArbitMgr::doStop");
1311   ArbitSignal aSignal;
1312   NdbMutex_Lock(theThreadMutex);
1313   if (theThread != NULL) {
1314     aSignal.init(GSN_ARBIT_STOPORD, theData);
1315     if (theData == 0) {
1316       aSignal.data.code = StopExit;
1317     } else {
1318       aSignal.data.code = StopRequest;
1319     }
1320     sendSignalToThread(aSignal);
1321     void* value;
1322     NdbThread_WaitFor(theThread, &value);
1323     NdbThread_Destroy(&theThread);
1324     theState = StateInit;
1325   }
1326   NdbMutex_Unlock(theThreadMutex);
1327   DBUG_VOID_RETURN;
1328 }
1329 
1330 // private methods
1331 
1332 extern "C"
1333 void*
runArbitMgr_C(void * me)1334 runArbitMgr_C(void* me)
1335 {
1336   ((ArbitMgr*) me)->threadMain();
1337   return NULL;
1338 }
1339 
1340 void
sendSignalToThread(ArbitSignal & aSignal)1341 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
1342 {
1343 #ifdef DEBUG_ARBIT
1344   char buf[17] = "";
1345   ndbout << "arbit recv: ";
1346   ndbout << " gsn=" << aSignal.gsn;
1347   ndbout << " send=" << aSignal.data.sender;
1348   ndbout << " code=" << aSignal.data.code;
1349   ndbout << " node=" << aSignal.data.node;
1350   ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1351   ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1352   ndbout << endl;
1353 #endif
1354   aSignal.setTimestamp();       // signal arrival time
1355   NdbMutex_Lock(theInputMutex);
1356   while (theInputFull) {
1357     NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
1358   }
1359   theInputBuffer = aSignal;
1360   theInputFull = true;
1361   NdbCondition_Signal(theInputCond);
1362   NdbMutex_Unlock(theInputMutex);
1363 }
1364 
1365 void
threadMain()1366 ArbitMgr::threadMain()
1367 {
1368   ArbitSignal aSignal;
1369   aSignal = theInputBuffer;
1370   threadStart(aSignal);
1371   bool stop = false;
1372   while (! stop) {
1373     NdbMutex_Lock(theInputMutex);
1374     while (! theInputFull) {
1375       NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
1376       threadTimeout();
1377     }
1378     aSignal = theInputBuffer;
1379     theInputFull = false;
1380     NdbCondition_Signal(theInputCond);
1381     NdbMutex_Unlock(theInputMutex);
1382     switch (aSignal.gsn) {
1383     case GSN_ARBIT_CHOOSEREQ:
1384       threadChoose(aSignal);
1385       break;
1386     case GSN_ARBIT_STOPORD:
1387       stop = true;
1388       break;
1389     }
1390   }
1391   threadStop(aSignal);
1392 }
1393 
1394 // handle events in the thread
1395 
1396 void
threadStart(ArbitSignal & aSignal)1397 ArbitMgr::threadStart(ArbitSignal& aSignal)
1398 {
1399   theStartReq = aSignal;
1400   sendStartConf(theStartReq, ArbitCode::ApiStart);
1401   theState = StateStarted;
1402   theInputTimeout = 1000;
1403 }
1404 
1405 void
threadChoose(ArbitSignal & aSignal)1406 ArbitMgr::threadChoose(ArbitSignal& aSignal)
1407 {
1408   switch (theState) {
1409   case StateStarted:            // first REQ
1410     if (! theStartReq.data.match(aSignal.data)) {
1411       sendChooseRef(aSignal, ArbitCode::ErrTicket);
1412       break;
1413     }
1414     theChooseReq1 = aSignal;
1415     if (theDelay == 0) {
1416       sendChooseConf(aSignal, ArbitCode::WinChoose);
1417       theState = StateFinished;
1418       theInputTimeout = 1000;
1419       break;
1420     }
1421     theState = StateChoose1;
1422     theInputTimeout = 1;
1423     return;
1424   case StateChoose1:            // second REQ within Delay
1425     if (! theStartReq.data.match(aSignal.data)) {
1426       sendChooseRef(aSignal, ArbitCode::ErrTicket);
1427       break;
1428     }
1429     theChooseReq2 = aSignal;
1430     theState = StateChoose2;
1431     theInputTimeout = 1;
1432     return;
1433   case StateChoose2:            // too many REQs - refuse all
1434     if (! theStartReq.data.match(aSignal.data)) {
1435       sendChooseRef(aSignal, ArbitCode::ErrTicket);
1436       break;
1437     }
1438     sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
1439     sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
1440     sendChooseRef(aSignal, ArbitCode::ErrToomany);
1441     theState = StateFinished;
1442     theInputTimeout = 1000;
1443     return;
1444   default:
1445     sendChooseRef(aSignal, ArbitCode::ErrState);
1446     break;
1447   }
1448 }
1449 
1450 void
threadTimeout()1451 ArbitMgr::threadTimeout()
1452 {
1453   switch (theState) {
1454   case StateStarted:
1455     break;
1456   case StateChoose1:
1457     if (theChooseReq1.getTimediff() < theDelay)
1458       break;
1459     sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1460     theState = StateFinished;
1461     theInputTimeout = 1000;
1462     break;
1463   case StateChoose2:
1464     sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1465     sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1466     theState = StateFinished;
1467     theInputTimeout = 1000;
1468     break;
1469   default:
1470     break;
1471   }
1472 }
1473 
1474 void
threadStop(ArbitSignal & aSignal)1475 ArbitMgr::threadStop(ArbitSignal& aSignal)
1476 {
1477   switch (aSignal.data.code) {
1478   case StopExit:
1479     switch (theState) {
1480     case StateStarted:
1481       sendStopRep(theStartReq, 0);
1482       break;
1483     case StateChoose1:                  // just in time
1484       sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1485       break;
1486     case StateChoose2:
1487       sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1488       sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1489       break;
1490     case StateInit:
1491     case StateFinished:
1492       //??
1493       break;
1494     }
1495     break;
1496   case StopRequest:
1497     break;
1498   case StopRestart:
1499     break;
1500   }
1501 }
1502 
1503 // output routines
1504 
1505 void
sendStartConf(ArbitSignal & aSignal,Uint32 code)1506 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
1507 {
1508   ArbitSignal copySignal = aSignal;
1509   copySignal.gsn = GSN_ARBIT_STARTCONF;
1510   copySignal.data.code = code;
1511   sendSignalToQmgr(copySignal);
1512 }
1513 
1514 void
sendChooseConf(ArbitSignal & aSignal,Uint32 code)1515 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
1516 {
1517   ArbitSignal copySignal = aSignal;
1518   copySignal.gsn = GSN_ARBIT_CHOOSECONF;
1519   copySignal.data.code = code;
1520   sendSignalToQmgr(copySignal);
1521 }
1522 
1523 void
sendChooseRef(ArbitSignal & aSignal,Uint32 code)1524 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
1525 {
1526   ArbitSignal copySignal = aSignal;
1527   copySignal.gsn = GSN_ARBIT_CHOOSEREF;
1528   copySignal.data.code = code;
1529   sendSignalToQmgr(copySignal);
1530 }
1531 
1532 void
sendStopRep(ArbitSignal & aSignal,Uint32 code)1533 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
1534 {
1535   ArbitSignal copySignal = aSignal;
1536   copySignal.gsn = GSN_ARBIT_STOPREP;
1537   copySignal.data.code = code;
1538   sendSignalToQmgr(copySignal);
1539 }
1540 
1541 /**
1542  * Send signal to QMGR.  The input includes signal number and
1543  * signal data.  The signal data is normally a copy of a received
1544  * signal so it contains expected arbitrator node id and ticket.
1545  * The sender in signal data is the QMGR node id.
1546  */
1547 void
sendSignalToQmgr(ArbitSignal & aSignal)1548 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
1549 {
1550   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId()));
1551 
1552   signal.theVerId_signalNumber = aSignal.gsn;
1553   signal.theReceiversBlockNumber = QMGR;
1554   signal.theTrace  = 0;
1555   signal.theLength = ArbitSignalData::SignalLength;
1556 
1557   ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
1558 
1559   sd->sender = numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId());
1560   sd->code = aSignal.data.code;
1561   sd->node = aSignal.data.node;
1562   sd->ticket = aSignal.data.ticket;
1563   sd->mask = aSignal.data.mask;
1564 
1565 #ifdef DEBUG_ARBIT
1566   char buf[17] = "";
1567   ndbout << "arbit send: ";
1568   ndbout << " gsn=" << aSignal.gsn;
1569   ndbout << " recv=" << aSignal.data.sender;
1570   ndbout << " code=" << aSignal.data.code;
1571   ndbout << " node=" << aSignal.data.node;
1572   ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1573   ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1574   ndbout << endl;
1575 #endif
1576 
1577   {
1578     m_clusterMgr.lock();
1579     m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
1580     m_clusterMgr.flush_send_buffers();
1581     m_clusterMgr.unlock();
1582   }
1583 }
1584