1 /*
2    Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 #include <ndb_limits.h>
27 #include <util/version.h>
28 
29 #include "TransporterFacade.hpp"
30 #include <kernel/GlobalSignalNumbers.h>
31 
32 #include "ClusterMgr.hpp"
33 #include <IPCConfig.hpp>
34 #include "NdbApiSignal.hpp"
35 #include <NdbSleep.h>
36 #include <NdbOut.hpp>
37 #include <NdbTick.h>
38 
39 
40 #include <signaldata/NodeFailRep.hpp>
41 #include <signaldata/NFCompleteRep.hpp>
42 #include <signaldata/ApiRegSignalData.hpp>
43 #include <signaldata/AlterTable.hpp>
44 #include <signaldata/SumaImpl.hpp>
45 
46 #include <mgmapi.h>
47 #include <mgmapi_configuration.hpp>
48 #include <mgmapi_config_parameters.h>
49 
50 int global_flag_skip_invalidate_cache = 0;
51 int global_flag_skip_waiting_for_clean_cache = 0;
52 //#define DEBUG_REG
53 
54 // Just a C wrapper for threadMain
55 extern "C"
56 void*
runClusterMgr_C(void * me)57 runClusterMgr_C(void * me)
58 {
59   ((ClusterMgr*) me)->threadMain();
60 
61   return NULL;
62 }
63 
ClusterMgr(TransporterFacade & _facade)64 ClusterMgr::ClusterMgr(TransporterFacade & _facade):
65   theStop(0),
66   theFacade(_facade),
67   theArbitMgr(NULL),
68   m_connect_count(0),
69   m_max_api_reg_req_interval(~0),
70   noOfAliveNodes(0),
71   noOfConnectedNodes(0),
72   minDbVersion(0),
73   theClusterMgrThread(NULL),
74   waitingForHB(false),
75   m_cluster_state(CS_waiting_for_clean_cache)
76 {
77   DBUG_ENTER("ClusterMgr::ClusterMgr");
78   clusterMgrThreadMutex = NdbMutex_Create();
79   waitForHBCond= NdbCondition_Create();
80   m_auto_reconnect = -1;
81 
82   Uint32 ret = this->open(&theFacade, API_CLUSTERMGR);
83   if (unlikely(ret == 0))
84   {
85     ndbout_c("Failed to register ClusterMgr! ret: %d", ret);
86     abort();
87   }
88   DBUG_VOID_RETURN;
89 }
90 
~ClusterMgr()91 ClusterMgr::~ClusterMgr()
92 {
93   DBUG_ENTER("ClusterMgr::~ClusterMgr");
94   doStop();
95   if (theArbitMgr != 0)
96   {
97     delete theArbitMgr;
98     theArbitMgr = 0;
99   }
100   this->close(); // disconnect from TransporterFacade
101   NdbCondition_Destroy(waitForHBCond);
102   NdbMutex_Destroy(clusterMgrThreadMutex);
103   DBUG_VOID_RETURN;
104 }
105 
106 void
configure(Uint32 nodeId,const ndb_mgm_configuration * config)107 ClusterMgr::configure(Uint32 nodeId,
108                       const ndb_mgm_configuration* config)
109 {
110   ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
111   for(iter.first(); iter.valid(); iter.next()){
112     Uint32 nodeId = 0;
113     if(iter.get(CFG_NODE_ID, &nodeId))
114       continue;
115 
116     // Check array bounds + don't allow node 0 to be touched
117     assert(nodeId > 0 && nodeId < MAX_NODES);
118     trp_node& theNode = theNodes[nodeId];
119     theNode.defined = true;
120 
121     unsigned type;
122     if(iter.get(CFG_TYPE_OF_SECTION, &type))
123       continue;
124 
125     switch(type){
126     case NODE_TYPE_DB:
127       theNode.m_info.m_type = NodeInfo::DB;
128       break;
129     case NODE_TYPE_API:
130       theNode.m_info.m_type = NodeInfo::API;
131       break;
132     case NODE_TYPE_MGM:
133       theNode.m_info.m_type = NodeInfo::MGM;
134       break;
135     default:
136       type = type;
137       break;
138     }
139   }
140 
141   /* Mark all non existing nodes as not defined */
142   for(Uint32 i = 0; i<MAX_NODES; i++) {
143     if (iter.first())
144       continue;
145 
146     if (iter.find(CFG_NODE_ID, i))
147       theNodes[i]= Node();
148   }
149 
150 #if 0
151   print_nodes("init");
152 #endif
153 
154   // Configure arbitrator
155   Uint32 rank = 0;
156   iter.first();
157   iter.find(CFG_NODE_ID, nodeId); // let not found in config mean rank=0
158   iter.get(CFG_NODE_ARBIT_RANK, &rank);
159 
160   if (rank > 0)
161   {
162     // The arbitrator should be active
163     if (!theArbitMgr)
164       theArbitMgr = new ArbitMgr(* this);
165     theArbitMgr->setRank(rank);
166 
167     Uint32 delay = 0;
168     iter.get(CFG_NODE_ARBIT_DELAY, &delay);
169     theArbitMgr->setDelay(delay);
170   }
171   else if (theArbitMgr)
172   {
173     // No arbitrator should be started
174     theArbitMgr->doStop(NULL);
175     delete theArbitMgr;
176     theArbitMgr= NULL;
177   }
178 }
179 
180 void
startThread()181 ClusterMgr::startThread() {
182   Guard g(clusterMgrThreadMutex);
183 
184   theStop = -1;
185   theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
186                                          (void**)this,
187                                          0, // default stack size
188                                          "ndb_clustermgr",
189                                          NDB_THREAD_PRIO_HIGH);
190   Uint32 cnt = 0;
191   while (theStop == -1 && cnt < 60)
192   {
193     NdbCondition_WaitTimeout(waitForHBCond, clusterMgrThreadMutex, 1000);
194   }
195 
196   assert(theStop == 0);
197 }
198 
199 void
doStop()200 ClusterMgr::doStop( ){
201   DBUG_ENTER("ClusterMgr::doStop");
202   {
203     Guard g(clusterMgrThreadMutex);
204     if(theStop == 1){
205       DBUG_VOID_RETURN;
206     }
207   }
208 
209   void *status;
210   theStop = 1;
211   if (theClusterMgrThread) {
212     NdbThread_WaitFor(theClusterMgrThread, &status);
213     NdbThread_Destroy(&theClusterMgrThread);
214   }
215 
216   if (theArbitMgr != NULL)
217   {
218     theArbitMgr->doStop(NULL);
219   }
220 
221   DBUG_VOID_RETURN;
222 }
223 
224 void
forceHB()225 ClusterMgr::forceHB()
226 {
227   theFacade.lock_mutex();
228 
229   if(waitingForHB)
230   {
231     NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
232     theFacade.unlock_mutex();
233     return;
234   }
235 
236   waitingForHB= true;
237 
238   NodeBitmask ndb_nodes;
239   ndb_nodes.clear();
240   waitForHBFromNodes.clear();
241   for(Uint32 i = 1; i < MAX_NDB_NODES; i++)
242   {
243     const trp_node &node= getNodeInfo(i);
244     if(!node.defined)
245       continue;
246     if(node.m_info.getType() == NodeInfo::DB)
247     {
248       ndb_nodes.set(i);
249       waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
250     }
251   }
252   waitForHBFromNodes.bitAND(ndb_nodes);
253   theFacade.unlock_mutex();
254 
255 #ifdef DEBUG_REG
256   char buf[128];
257   ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
258 #endif
259   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
260 
261   signal.theVerId_signalNumber   = GSN_API_REGREQ;
262   signal.theReceiversBlockNumber = QMGR;
263   signal.theTrace                = 0;
264   signal.theLength               = ApiRegReq::SignalLength;
265 
266   ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
267   req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
268   req->version = NDB_VERSION;
269   req->mysql_version = NDB_MYSQL_VERSION_D;
270 
271   {
272     lock();
273     int nodeId= 0;
274     for(int i=0;
275         (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.find(i));
276         i= nodeId+1)
277     {
278 #ifdef DEBUG_REG
279       ndbout << "FORCE HB to " << nodeId << endl;
280 #endif
281       raw_sendSignal(&signal, nodeId);
282     }
283     unlock();
284   }
285   /* Wait for nodes to reply - if any heartbeats was sent */
286   theFacade.lock_mutex();
287   if (!waitForHBFromNodes.isclear())
288     NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
289 
290   waitingForHB= false;
291 #ifdef DEBUG_REG
292   ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
293 #endif
294   theFacade.unlock_mutex();
295 }
296 
297 void
startup()298 ClusterMgr::startup()
299 {
300   assert(theStop == -1);
301   Uint32 nodeId = getOwnNodeId();
302   Node & cm_node = theNodes[nodeId];
303   trp_node & theNode = cm_node;
304   assert(theNode.defined);
305 
306   lock();
307   theFacade.doConnect(nodeId);
308   unlock();
309 
310   for (Uint32 i = 0; i<3000; i++)
311   {
312     lock();
313     theFacade.theTransporterRegistry->update_connections();
314     unlock();
315     if (theNode.is_connected())
316       break;
317     NdbSleep_MilliSleep(20);
318   }
319 
320   assert(theNode.is_connected());
321   Guard g(clusterMgrThreadMutex);
322   theStop = 0;
323   NdbCondition_Broadcast(waitForHBCond);
324 }
325 
326 void
threadMain()327 ClusterMgr::threadMain()
328 {
329   startup();
330 
331   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
332 
333   signal.theVerId_signalNumber   = GSN_API_REGREQ;
334   signal.theTrace                = 0;
335   signal.theLength               = ApiRegReq::SignalLength;
336 
337   ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
338   req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
339   req->version = NDB_VERSION;
340   req->mysql_version = NDB_MYSQL_VERSION_D;
341 
342   NdbApiSignal nodeFail_signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
343   nodeFail_signal.theVerId_signalNumber = GSN_NODE_FAILREP;
344   nodeFail_signal.theReceiversBlockNumber = API_CLUSTERMGR;
345   nodeFail_signal.theTrace  = 0;
346   nodeFail_signal.theLength = NodeFailRep::SignalLengthLong;
347 
348   NDB_TICKS timeSlept = 100;
349   NDB_TICKS now = NdbTick_CurrentMillisecond();
350 
351   while(!theStop)
352   {
353     /* Sleep at 100ms between each heartbeat check */
354     NDB_TICKS before = now;
355     for (Uint32 i = 0; i<10; i++)
356     {
357       NdbSleep_MilliSleep(10);
358       {
359         Guard g(clusterMgrThreadMutex);
360         /**
361          * Protect from ArbitMgr sending signals while we poll
362          */
363         start_poll();
364         do_poll(0);
365         complete_poll();
366       }
367     }
368     now = NdbTick_CurrentMillisecond();
369     timeSlept = (now - before);
370 
371     if (m_cluster_state == CS_waiting_for_clean_cache &&
372         theFacade.m_globalDictCache)
373     {
374       if (!global_flag_skip_waiting_for_clean_cache)
375       {
376         theFacade.m_globalDictCache->lock();
377         unsigned sz= theFacade.m_globalDictCache->get_size();
378         theFacade.m_globalDictCache->unlock();
379         if (sz)
380           continue;
381       }
382       m_cluster_state = CS_waiting_for_first_connect;
383     }
384 
385 
386     NodeFailRep * nodeFailRep = CAST_PTR(NodeFailRep,
387                                          nodeFail_signal.getDataPtrSend());
388     nodeFailRep->noOfNodes = 0;
389     NodeBitmask::clear(nodeFailRep->theNodes);
390 
391     trp_client::lock();
392     for (int i = 1; i < MAX_NODES; i++){
393       /**
394        * Send register request (heartbeat) to all available nodes
395        * at specified timing intervals
396        */
397       const NodeId nodeId = i;
398       // Check array bounds + don't allow node 0 to be touched
399       assert(nodeId > 0 && nodeId < MAX_NODES);
400       Node & cm_node = theNodes[nodeId];
401       trp_node & theNode = cm_node;
402 
403       if (!theNode.defined)
404 	continue;
405 
406       if (theNode.is_connected() == false){
407 	theFacade.doConnect(nodeId);
408 	continue;
409       }
410 
411       if (!theNode.compatible){
412 	continue;
413       }
414 
415       if (nodeId == getOwnNodeId() && theNode.is_confirmed())
416       {
417         /**
418          * Don't send HB to self more than once
419          * (once needed to avoid weird special cases in e.g ConfigManager)
420          */
421         continue;
422       }
423 
424       cm_node.hbCounter += (Uint32)timeSlept;
425       if (cm_node.hbCounter >= m_max_api_reg_req_interval ||
426           cm_node.hbCounter >= cm_node.hbFrequency)
427       {
428 	/**
429 	 * It is now time to send a new Heartbeat
430 	 */
431         if (cm_node.hbCounter >= cm_node.hbFrequency)
432         {
433           cm_node.hbMissed++;
434           cm_node.hbCounter = 0;
435 	}
436 
437         if (theNode.m_info.m_type != NodeInfo::DB)
438           signal.theReceiversBlockNumber = API_CLUSTERMGR;
439         else
440           signal.theReceiversBlockNumber = QMGR;
441 
442 #ifdef DEBUG_REG
443 	ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
444 #endif
445 	raw_sendSignal(&signal, nodeId);
446       }//if
447 
448       if (cm_node.hbMissed == 4 && cm_node.hbFrequency > 0)
449       {
450         nodeFailRep->noOfNodes++;
451         NodeBitmask::set(nodeFailRep->theNodes, nodeId);
452       }
453     }
454 
455     if (nodeFailRep->noOfNodes)
456     {
457       raw_sendSignal(&nodeFail_signal, getOwnNodeId());
458     }
459     trp_client::unlock();
460   }
461 }
462 
463 void
trp_deliver_signal(const NdbApiSignal * sig,const LinearSectionPtr ptr[3])464 ClusterMgr::trp_deliver_signal(const NdbApiSignal* sig,
465                                const LinearSectionPtr ptr[3])
466 {
467   const Uint32 gsn = sig->theVerId_signalNumber;
468   const Uint32 * theData = sig->getDataPtr();
469 
470   switch (gsn){
471   case GSN_API_REGREQ:
472     execAPI_REGREQ(theData);
473     break;
474 
475   case GSN_API_REGCONF:
476      execAPI_REGCONF(sig, ptr);
477     break;
478 
479   case GSN_API_REGREF:
480     execAPI_REGREF(theData);
481     break;
482 
483   case GSN_NODE_FAILREP:
484     execNODE_FAILREP(sig, ptr);
485     break;
486 
487   case GSN_NF_COMPLETEREP:
488     execNF_COMPLETEREP(sig, ptr);
489     break;
490   case GSN_ARBIT_STARTREQ:
491     if (theArbitMgr != NULL)
492       theArbitMgr->doStart(theData);
493     break;
494 
495   case GSN_ARBIT_CHOOSEREQ:
496     if (theArbitMgr != NULL)
497       theArbitMgr->doChoose(theData);
498     break;
499 
500   case GSN_ARBIT_STOPORD:
501     if(theArbitMgr != NULL)
502       theArbitMgr->doStop(theData);
503     break;
504 
505   case GSN_ALTER_TABLE_REP:
506   {
507     if (theFacade.m_globalDictCache == NULL)
508       break;
509     const AlterTableRep* rep = (const AlterTableRep*)theData;
510     theFacade.m_globalDictCache->lock();
511     theFacade.m_globalDictCache->
512       alter_table_rep((const char*)ptr[0].p,
513                       rep->tableId,
514                       rep->tableVersion,
515                       rep->changeType == AlterTableRep::CT_ALTERED);
516     theFacade.m_globalDictCache->unlock();
517     break;
518   }
519   case GSN_SUB_GCP_COMPLETE_REP:
520   {
521     /**
522      * Report
523      */
524     theFacade.for_each(this, sig, ptr);
525 
526     /**
527      * Reply
528      */
529     {
530       BlockReference ownRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
531       NdbApiSignal tSignal(* sig);
532       Uint32* send= tSignal.getDataPtrSend();
533       memcpy(send, theData, tSignal.getLength() << 2);
534       CAST_PTR(SubGcpCompleteAck, send)->rep.senderRef = ownRef;
535       Uint32 ref= sig->theSendersBlockRef;
536       Uint32 aNodeId= refToNode(ref);
537       tSignal.theReceiversBlockNumber= refToBlock(ref);
538       tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
539       tSignal.theSendersBlockRef = API_CLUSTERMGR;
540       safe_sendSignal(&tSignal, aNodeId);
541     }
542     break;
543   }
544   case GSN_TAKE_OVERTCCONF:
545   {
546     /**
547      * Report
548      */
549     theFacade.for_each(this, sig, ptr);
550     return;
551   }
552   case GSN_CONNECT_REP:
553   {
554     execCONNECT_REP(sig, ptr);
555     return;
556   }
557   case GSN_DISCONNECT_REP:
558   {
559     execDISCONNECT_REP(sig, ptr);
560     return;
561   }
562   default:
563     break;
564 
565   }
566   return;
567 }
568 
Node()569 ClusterMgr::Node::Node()
570   : hbFrequency(0), hbCounter(0)
571 {
572 }
573 
574 /**
575  * recalcMinDbVersion
576  *
577  * This method is called whenever the 'minimum DB node
578  * version' data for the connected DB nodes changes
579  * It calculates the minimum version of all the connected
580  * DB nodes.
581  * This information is cached by Ndb object instances.
582  * This information is useful when implementing API compatibility
583  * with older DB nodes
584  */
585 void
recalcMinDbVersion()586 ClusterMgr::recalcMinDbVersion()
587 {
588   Uint32 newMinDbVersion = ~ (Uint32) 0;
589 
590   for (Uint32 i = 0; i < MAX_NODES; i++)
591   {
592     trp_node& node = theNodes[i];
593 
594     if (node.is_connected() &&
595         node.is_confirmed() &&
596         node.m_info.getType() == NodeInfo::DB)
597     {
598       /* Include this node in the set of nodes used to
599        * compute the lowest current DB node version
600        */
601       assert(node.m_info.m_version);
602 
603       if (node.minDbVersion < newMinDbVersion)
604       {
605         newMinDbVersion = node.minDbVersion;
606       }
607     }
608   }
609 
610   /* Now update global min Db version if we have one.
611    * Otherwise set it to 0
612    */
613   newMinDbVersion = (newMinDbVersion == ~ (Uint32) 0) ?
614     0 :
615     newMinDbVersion;
616 
617 //#ifdef DEBUG_MINVER
618 
619 #ifdef DEBUG_MINVER
620   if (newMinDbVersion != minDbVersion)
621   {
622     ndbout << "Previous min Db node version was "
623            << NdbVersion(minDbVersion)
624            << " new min is "
625            << NdbVersion(newMinDbVersion)
626            << endl;
627   }
628   else
629   {
630     ndbout << "MinDbVersion recalculated, but is same : "
631            << NdbVersion(minDbVersion)
632            << endl;
633   }
634 #endif
635 
636   minDbVersion = newMinDbVersion;
637 }
638 
639 /******************************************************************************
640  * API_REGREQ and friends
641  ******************************************************************************/
642 
643 void
execAPI_REGREQ(const Uint32 * theData)644 ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
645   const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
646   const NodeId nodeId = refToNode(apiRegReq->ref);
647 
648 #ifdef DEBUG_REG
649   ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
650 #endif
651 
652   assert(nodeId > 0 && nodeId < MAX_NODES);
653 
654   Node & cm_node = theNodes[nodeId];
655   trp_node & node = cm_node;
656   assert(node.defined == true);
657   assert(node.is_connected() == true);
658 
659   if(node.m_info.m_version != apiRegReq->version){
660     node.m_info.m_version = apiRegReq->version;
661     node.m_info.m_mysql_version = apiRegReq->mysql_version;
662     if (node.m_info.m_version < NDBD_SPLIT_VERSION)
663       node.m_info.m_mysql_version = 0;
664 
665     if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
666 	getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
667       node.compatible = false;
668     } else {
669       node.compatible = true;
670     }
671   }
672 
673   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
674   signal.theVerId_signalNumber   = GSN_API_REGCONF;
675   signal.theReceiversBlockNumber = API_CLUSTERMGR;
676   signal.theTrace                = 0;
677   signal.theLength               = ApiRegConf::SignalLength;
678 
679   ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
680   conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
681   conf->version = NDB_VERSION;
682   conf->mysql_version = NDB_MYSQL_VERSION_D;
683   conf->apiHeartbeatFrequency = cm_node.hbFrequency;
684 
685   conf->minDbVersion= 0;
686   conf->nodeState= node.m_state;
687 
688   node.set_confirmed(true);
689   if (safe_sendSignal(&signal, nodeId) != 0)
690     node.set_confirmed(false);
691 }
692 
693 void
execAPI_REGCONF(const NdbApiSignal * signal,const LinearSectionPtr ptr[])694 ClusterMgr::execAPI_REGCONF(const NdbApiSignal * signal,
695                             const LinearSectionPtr ptr[])
696 {
697   const ApiRegConf * apiRegConf = CAST_CONSTPTR(ApiRegConf,
698                                                 signal->getDataPtr());
699   const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
700 
701 #ifdef DEBUG_REG
702   ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
703 #endif
704 
705   assert(nodeId > 0 && nodeId < MAX_NODES);
706 
707   Node & cm_node = theNodes[nodeId];
708   trp_node & node = cm_node;
709   assert(node.defined == true);
710   assert(node.is_connected() == true);
711 
712   if(node.m_info.m_version != apiRegConf->version){
713     node.m_info.m_version = apiRegConf->version;
714     node.m_info.m_mysql_version = apiRegConf->mysql_version;
715     if (node.m_info.m_version < NDBD_SPLIT_VERSION)
716       node.m_info.m_mysql_version = 0;
717 
718     if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM)
719       node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
720 					       node.m_info.m_version);
721     else
722       node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
723 					      node.m_info.m_version);
724   }
725 
726   node.set_confirmed(true);
727 
728   if (node.minDbVersion != apiRegConf->minDbVersion)
729   {
730     node.minDbVersion = apiRegConf->minDbVersion;
731     recalcMinDbVersion();
732   }
733 
734   if (node.m_info.m_version >= NDBD_255_NODES_VERSION)
735   {
736     node.m_state = apiRegConf->nodeState;
737   }
738   else
739   {
740     /**
741      * from 2 to 8 words = 6 words diff, 6*4 = 24
742      */
743     memcpy(&node.m_state, &apiRegConf->nodeState, sizeof(node.m_state) - 24);
744   }
745 
746   if (node.m_info.m_type == NodeInfo::DB)
747   {
748     /**
749      * Only set DB nodes to "alive"
750      */
751     if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED ||
752                             node.m_state.getSingleUserMode()))
753     {
754       set_node_alive(node, true);
755     }
756     else
757     {
758       set_node_alive(node, false);
759     }
760   }
761 
762   cm_node.hbMissed = 0;
763   cm_node.hbCounter = 0;
764   cm_node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
765 
766   // Distribute signal to all threads/blocks
767   // TODO only if state changed...
768   theFacade.for_each(this, signal, ptr);
769 
770   check_wait_for_hb(nodeId);
771 }
772 
773 void
check_wait_for_hb(NodeId nodeId)774 ClusterMgr::check_wait_for_hb(NodeId nodeId)
775 {
776   if(waitingForHB)
777   {
778     waitForHBFromNodes.clear(nodeId);
779 
780     if(waitForHBFromNodes.isclear())
781     {
782       waitingForHB= false;
783       NdbCondition_Broadcast(waitForHBCond);
784     }
785   }
786   return;
787 }
788 
789 
790 void
execAPI_REGREF(const Uint32 * theData)791 ClusterMgr::execAPI_REGREF(const Uint32 * theData){
792 
793   ApiRegRef * ref = (ApiRegRef*)theData;
794 
795   const NodeId nodeId = refToNode(ref->ref);
796 
797   assert(nodeId > 0 && nodeId < MAX_NODES);
798 
799   Node & cm_node = theNodes[nodeId];
800   trp_node & node = cm_node;
801 
802   assert(node.is_connected() == true);
803   assert(node.defined == true);
804   /* Only DB nodes will send API_REGREF */
805   assert(node.m_info.getType() == NodeInfo::DB);
806 
807   node.compatible = false;
808   set_node_alive(node, false);
809   node.m_state = NodeState::SL_NOTHING;
810   node.m_info.m_version = ref->version;
811 
812   switch(ref->errorCode){
813   case ApiRegRef::WrongType:
814     ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
815     abort();
816   case ApiRegRef::UnsupportedVersion:
817   default:
818     break;
819   }
820 
821   check_wait_for_hb(nodeId);
822 }
823 
824 void
execNF_COMPLETEREP(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])825 ClusterMgr::execNF_COMPLETEREP(const NdbApiSignal* signal,
826                                const LinearSectionPtr ptr[3])
827 {
828   const NFCompleteRep * nfComp = CAST_CONSTPTR(NFCompleteRep,
829                                                signal->getDataPtr());
830   const NodeId nodeId = nfComp->failedNodeId;
831   assert(nodeId > 0 && nodeId < MAX_NODES);
832 
833   trp_node & node = theNodes[nodeId];
834   if (node.nfCompleteRep == false)
835   {
836     node.nfCompleteRep = true;
837     theFacade.for_each(this, signal, ptr);
838   }
839 }
840 
841 void
reportConnected(NodeId nodeId)842 ClusterMgr::reportConnected(NodeId nodeId)
843 {
844   DBUG_ENTER("ClusterMgr::reportConnected");
845   DBUG_PRINT("info", ("nodeId: %u", nodeId));
846   /**
847    * Ensure that we are sending heartbeat every 100 ms
848    * until we have got the first reply from NDB providing
849    * us with the real time-out period to use.
850    */
851   assert(nodeId > 0 && nodeId < MAX_NODES);
852   if (nodeId == getOwnNodeId())
853   {
854     noOfConnectedNodes--; // Don't count self...
855   }
856 
857   noOfConnectedNodes++;
858 
859   Node & cm_node = theNodes[nodeId];
860   trp_node & theNode = cm_node;
861 
862   cm_node.hbMissed = 0;
863   cm_node.hbCounter = 0;
864   cm_node.hbFrequency = 0;
865 
866   assert(theNode.is_connected() == false);
867 
868   /**
869    * make sure the node itself is marked connected even
870    * if first API_REGCONF has not arrived
871    */
872   theNode.set_connected(true);
873   theNode.m_state.m_connected_nodes.set(nodeId);
874   theNode.m_info.m_version = 0;
875   theNode.compatible = true;
876   theNode.nfCompleteRep = true;
877   theNode.m_node_fail_rep = false;
878   theNode.m_state.startLevel = NodeState::SL_NOTHING;
879   theNode.minDbVersion = 0;
880 
881   /**
882    * We know that we have clusterMgrThreadMutex and trp_client::mutex
883    *   but we don't know if we are polling...and for_each can
884    *   only be used by a poller...
885    *
886    * Send signal to self, so that we can do this when receiving a signal
887    */
888   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
889   signal.theVerId_signalNumber = GSN_CONNECT_REP;
890   signal.theReceiversBlockNumber = API_CLUSTERMGR;
891   signal.theTrace  = 0;
892   signal.theLength = 1;
893   signal.getDataPtrSend()[0] = nodeId;
894   raw_sendSignal(&signal, getOwnNodeId());
895   DBUG_VOID_RETURN;
896 }
897 
898 void
execCONNECT_REP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])899 ClusterMgr::execCONNECT_REP(const NdbApiSignal* sig,
900                             const LinearSectionPtr ptr[])
901 {
902   theFacade.for_each(this, sig, 0);
903 }
904 
905 void
set_node_dead(trp_node & theNode)906 ClusterMgr::set_node_dead(trp_node& theNode)
907 {
908   set_node_alive(theNode, false);
909   theNode.set_confirmed(false);
910   theNode.m_state.m_connected_nodes.clear();
911   theNode.m_state.startLevel = NodeState::SL_NOTHING;
912   theNode.m_info.m_connectCount ++;
913   theNode.nfCompleteRep = false;
914 }
915 
916 void
reportDisconnected(NodeId nodeId)917 ClusterMgr::reportDisconnected(NodeId nodeId)
918 {
919   assert(nodeId > 0 && nodeId < MAX_NODES);
920   assert(noOfConnectedNodes > 0);
921 
922   /**
923    * We know that we have clusterMgrThreadMutex and trp_client::mutex
924    *   but we don't know if we are polling...and for_each can
925    *   only be used by a poller...
926    *
927    * Send signal to self, so that we can do this when receiving a signal
928    */
929   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
930   signal.theVerId_signalNumber = GSN_DISCONNECT_REP;
931   signal.theReceiversBlockNumber = API_CLUSTERMGR;
932   signal.theTrace  = 0;
933   signal.theLength = DisconnectRep::SignalLength;
934 
935   DisconnectRep * rep = CAST_PTR(DisconnectRep, signal.getDataPtrSend());
936   rep->nodeId = nodeId;
937   rep->err = 0;
938   raw_sendSignal(&signal, getOwnNodeId());
939 }
940 
941 void
execDISCONNECT_REP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])942 ClusterMgr::execDISCONNECT_REP(const NdbApiSignal* sig,
943                                const LinearSectionPtr ptr[])
944 {
945   const DisconnectRep * rep = CAST_CONSTPTR(DisconnectRep, sig->getDataPtr());
946   Uint32 nodeId = rep->nodeId;
947 
948   assert(nodeId > 0 && nodeId < MAX_NODES);
949   Node & cm_node = theNodes[nodeId];
950   trp_node & theNode = cm_node;
951 
952   bool node_failrep = theNode.m_node_fail_rep;
953   set_node_dead(theNode);
954   theNode.set_connected(false);
955 
956   noOfConnectedNodes--;
957   if (noOfConnectedNodes == 0)
958   {
959     if (!global_flag_skip_invalidate_cache &&
960         theFacade.m_globalDictCache)
961     {
962       theFacade.m_globalDictCache->lock();
963       theFacade.m_globalDictCache->invalidate_all();
964       theFacade.m_globalDictCache->unlock();
965       m_connect_count ++;
966       m_cluster_state = CS_waiting_for_clean_cache;
967     }
968 
969     if (m_auto_reconnect == 0)
970     {
971       theStop = 2;
972     }
973   }
974 
975   if (node_failrep == false)
976   {
977     /**
978      * Inform API
979      */
980     NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
981     signal.theVerId_signalNumber = GSN_NODE_FAILREP;
982     signal.theReceiversBlockNumber = API_CLUSTERMGR;
983     signal.theTrace  = 0;
984     signal.theLength = NodeFailRep::SignalLengthLong;
985 
986     NodeFailRep * rep = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
987     rep->failNo = 0;
988     rep->masterNodeId = 0;
989     rep->noOfNodes = 1;
990     NodeBitmask::clear(rep->theNodes);
991     NodeBitmask::set(rep->theNodes, nodeId);
992     execNODE_FAILREP(&signal, 0);
993   }
994 }
995 
996 void
execNODE_FAILREP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])997 ClusterMgr::execNODE_FAILREP(const NdbApiSignal* sig,
998                              const LinearSectionPtr ptr[])
999 {
1000   const NodeFailRep * rep = CAST_CONSTPTR(NodeFailRep, sig->getDataPtr());
1001 
1002   NdbApiSignal signal(sig->theSendersBlockRef);
1003   signal.theVerId_signalNumber = GSN_NODE_FAILREP;
1004   signal.theReceiversBlockNumber = API_CLUSTERMGR;
1005   signal.theTrace  = 0;
1006   signal.theLength = NodeFailRep::SignalLengthLong;
1007 
1008   NodeFailRep * copy = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
1009   copy->failNo = 0;
1010   copy->masterNodeId = 0;
1011   copy->noOfNodes = 0;
1012   NodeBitmask::clear(copy->theNodes);
1013 
1014   for (Uint32 i = NdbNodeBitmask::find_first(rep->theNodes);
1015        i != NdbNodeBitmask::NotFound;
1016        i = NdbNodeBitmask::find_next(rep->theNodes, i + 1))
1017   {
1018     Node & cm_node = theNodes[i];
1019     trp_node & theNode = cm_node;
1020 
1021     bool node_failrep = theNode.m_node_fail_rep;
1022     bool connected = theNode.is_connected();
1023     set_node_dead(theNode);
1024 
1025     if (node_failrep == false)
1026     {
1027       theNode.m_node_fail_rep = true;
1028       NodeBitmask::set(copy->theNodes, i);
1029       copy->noOfNodes++;
1030     }
1031 
1032     if (connected)
1033     {
1034       theFacade.doDisconnect(i);
1035     }
1036   }
1037 
1038   recalcMinDbVersion();
1039   if (copy->noOfNodes)
1040   {
1041     theFacade.for_each(this, &signal, 0); // report GSN_NODE_FAILREP
1042   }
1043 
1044   if (noOfAliveNodes == 0)
1045   {
1046     NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
1047     signal.theVerId_signalNumber = GSN_NF_COMPLETEREP;
1048     signal.theReceiversBlockNumber = 0;
1049     signal.theTrace  = 0;
1050     signal.theLength = NFCompleteRep::SignalLength;
1051 
1052     NFCompleteRep * rep = CAST_PTR(NFCompleteRep, signal.getDataPtrSend());
1053     rep->blockNo =0;
1054     rep->nodeId = getOwnNodeId();
1055     rep->unused = 0;
1056     rep->from = __LINE__;
1057 
1058     for (Uint32 i = 1; i < MAX_NODES; i++)
1059     {
1060       trp_node& theNode = theNodes[i];
1061       if (theNode.defined && theNode.nfCompleteRep == false)
1062       {
1063         rep->failedNodeId = i;
1064         execNF_COMPLETEREP(&signal, 0);
1065       }
1066     }
1067   }
1068 }
1069 
1070 void
print_nodes(const char * where,NdbOut & out)1071 ClusterMgr::print_nodes(const char* where, NdbOut& out)
1072 {
1073   out << where << " >>" << endl;
1074   for (NodeId n = 1; n < MAX_NODES ; n++)
1075   {
1076     const trp_node node = getNodeInfo(n);
1077     if (!node.defined)
1078       continue;
1079     out << "node: " << n << endl;
1080     out << " -";
1081     out << " connected: " << node.is_connected();
1082     out << ", compatible: " << node.compatible;
1083     out << ", nf_complete_rep: " << node.nfCompleteRep;
1084     out << ", alive: " << node.m_alive;
1085     out << ", confirmed: " << node.is_confirmed();
1086     out << endl;
1087 
1088     out << " - " << node.m_info << endl;
1089     out << " - " << node.m_state << endl;
1090   }
1091   out << "<<" << endl;
1092 }
1093 
1094 
1095 /******************************************************************************
1096  * Arbitrator
1097  ******************************************************************************/
ArbitMgr(ClusterMgr & c)1098 ArbitMgr::ArbitMgr(ClusterMgr & c)
1099   : m_clusterMgr(c)
1100 {
1101   DBUG_ENTER("ArbitMgr::ArbitMgr");
1102 
1103   theThreadMutex = NdbMutex_Create();
1104   theInputCond = NdbCondition_Create();
1105   theInputMutex = NdbMutex_Create();
1106 
1107   theRank = 0;
1108   theDelay = 0;
1109   theThread = 0;
1110 
1111   theInputTimeout = 0;
1112   theInputFull = false;
1113   memset(&theInputBuffer, 0, sizeof(theInputBuffer));
1114   theState = StateInit;
1115 
1116   memset(&theStartReq, 0, sizeof(theStartReq));
1117   memset(&theChooseReq1, 0, sizeof(theChooseReq1));
1118   memset(&theChooseReq2, 0, sizeof(theChooseReq2));
1119   memset(&theStopOrd, 0, sizeof(theStopOrd));
1120 
1121   DBUG_VOID_RETURN;
1122 }
1123 
~ArbitMgr()1124 ArbitMgr::~ArbitMgr()
1125 {
1126   DBUG_ENTER("ArbitMgr::~ArbitMgr");
1127   NdbMutex_Destroy(theThreadMutex);
1128   NdbCondition_Destroy(theInputCond);
1129   NdbMutex_Destroy(theInputMutex);
1130   DBUG_VOID_RETURN;
1131 }
1132 
1133 // Start arbitrator thread.  This is kernel request.
1134 // First stop any previous thread since it is a left-over
1135 // which was never used and which now has wrong ticket.
1136 void
doStart(const Uint32 * theData)1137 ArbitMgr::doStart(const Uint32* theData)
1138 {
1139   ArbitSignal aSignal;
1140   NdbMutex_Lock(theThreadMutex);
1141   if (theThread != NULL) {
1142     aSignal.init(GSN_ARBIT_STOPORD, NULL);
1143     aSignal.data.code = StopRestart;
1144     sendSignalToThread(aSignal);
1145     void* value;
1146     NdbThread_WaitFor(theThread, &value);
1147     NdbThread_Destroy(&theThread);
1148     theState = StateInit;
1149     theInputFull = false;
1150   }
1151   aSignal.init(GSN_ARBIT_STARTREQ, theData);
1152   sendSignalToThread(aSignal);
1153   theThread = NdbThread_Create(
1154     runArbitMgr_C, (void**)this,
1155     0, // default stack size
1156     "ndb_arbitmgr",
1157     NDB_THREAD_PRIO_HIGH);
1158   NdbMutex_Unlock(theThreadMutex);
1159 }
1160 
1161 // The "choose me" signal from a candidate.
1162 void
doChoose(const Uint32 * theData)1163 ArbitMgr::doChoose(const Uint32* theData)
1164 {
1165   ArbitSignal aSignal;
1166   aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
1167   sendSignalToThread(aSignal);
1168 }
1169 
1170 // Stop arbitrator thread via stop signal from the kernel
1171 // or when exiting API program.
1172 void
doStop(const Uint32 * theData)1173 ArbitMgr::doStop(const Uint32* theData)
1174 {
1175   DBUG_ENTER("ArbitMgr::doStop");
1176   ArbitSignal aSignal;
1177   NdbMutex_Lock(theThreadMutex);
1178   if (theThread != NULL) {
1179     aSignal.init(GSN_ARBIT_STOPORD, theData);
1180     if (theData == 0) {
1181       aSignal.data.code = StopExit;
1182     } else {
1183       aSignal.data.code = StopRequest;
1184     }
1185     sendSignalToThread(aSignal);
1186     void* value;
1187     NdbThread_WaitFor(theThread, &value);
1188     NdbThread_Destroy(&theThread);
1189     theState = StateInit;
1190   }
1191   NdbMutex_Unlock(theThreadMutex);
1192   DBUG_VOID_RETURN;
1193 }
1194 
1195 // private methods
1196 
1197 extern "C"
1198 void*
runArbitMgr_C(void * me)1199 runArbitMgr_C(void* me)
1200 {
1201   ((ArbitMgr*) me)->threadMain();
1202   return NULL;
1203 }
1204 
1205 void
sendSignalToThread(ArbitSignal & aSignal)1206 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
1207 {
1208 #ifdef DEBUG_ARBIT
1209   char buf[17] = "";
1210   ndbout << "arbit recv: ";
1211   ndbout << " gsn=" << aSignal.gsn;
1212   ndbout << " send=" << aSignal.data.sender;
1213   ndbout << " code=" << aSignal.data.code;
1214   ndbout << " node=" << aSignal.data.node;
1215   ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1216   ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1217   ndbout << endl;
1218 #endif
1219   aSignal.setTimestamp();       // signal arrival time
1220   NdbMutex_Lock(theInputMutex);
1221   while (theInputFull) {
1222     NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
1223   }
1224   theInputBuffer = aSignal;
1225   theInputFull = true;
1226   NdbCondition_Signal(theInputCond);
1227   NdbMutex_Unlock(theInputMutex);
1228 }
1229 
1230 void
threadMain()1231 ArbitMgr::threadMain()
1232 {
1233   ArbitSignal aSignal;
1234   aSignal = theInputBuffer;
1235   threadStart(aSignal);
1236   bool stop = false;
1237   while (! stop) {
1238     NdbMutex_Lock(theInputMutex);
1239     while (! theInputFull) {
1240       NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
1241       threadTimeout();
1242     }
1243     aSignal = theInputBuffer;
1244     theInputFull = false;
1245     NdbCondition_Signal(theInputCond);
1246     NdbMutex_Unlock(theInputMutex);
1247     switch (aSignal.gsn) {
1248     case GSN_ARBIT_CHOOSEREQ:
1249       threadChoose(aSignal);
1250       break;
1251     case GSN_ARBIT_STOPORD:
1252       stop = true;
1253       break;
1254     }
1255   }
1256   threadStop(aSignal);
1257 }
1258 
1259 // handle events in the thread
1260 
1261 void
threadStart(ArbitSignal & aSignal)1262 ArbitMgr::threadStart(ArbitSignal& aSignal)
1263 {
1264   theStartReq = aSignal;
1265   sendStartConf(theStartReq, ArbitCode::ApiStart);
1266   theState = StateStarted;
1267   theInputTimeout = 1000;
1268 }
1269 
1270 void
threadChoose(ArbitSignal & aSignal)1271 ArbitMgr::threadChoose(ArbitSignal& aSignal)
1272 {
1273   switch (theState) {
1274   case StateStarted:            // first REQ
1275     if (! theStartReq.data.match(aSignal.data)) {
1276       sendChooseRef(aSignal, ArbitCode::ErrTicket);
1277       break;
1278     }
1279     theChooseReq1 = aSignal;
1280     if (theDelay == 0) {
1281       sendChooseConf(aSignal, ArbitCode::WinChoose);
1282       theState = StateFinished;
1283       theInputTimeout = 1000;
1284       break;
1285     }
1286     theState = StateChoose1;
1287     theInputTimeout = 1;
1288     return;
1289   case StateChoose1:            // second REQ within Delay
1290     if (! theStartReq.data.match(aSignal.data)) {
1291       sendChooseRef(aSignal, ArbitCode::ErrTicket);
1292       break;
1293     }
1294     theChooseReq2 = aSignal;
1295     theState = StateChoose2;
1296     theInputTimeout = 1;
1297     return;
1298   case StateChoose2:            // too many REQs - refuse all
1299     if (! theStartReq.data.match(aSignal.data)) {
1300       sendChooseRef(aSignal, ArbitCode::ErrTicket);
1301       break;
1302     }
1303     sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
1304     sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
1305     sendChooseRef(aSignal, ArbitCode::ErrToomany);
1306     theState = StateFinished;
1307     theInputTimeout = 1000;
1308     return;
1309   default:
1310     sendChooseRef(aSignal, ArbitCode::ErrState);
1311     break;
1312   }
1313 }
1314 
1315 void
threadTimeout()1316 ArbitMgr::threadTimeout()
1317 {
1318   switch (theState) {
1319   case StateStarted:
1320     break;
1321   case StateChoose1:
1322     if (theChooseReq1.getTimediff() < theDelay)
1323       break;
1324     sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1325     theState = StateFinished;
1326     theInputTimeout = 1000;
1327     break;
1328   case StateChoose2:
1329     sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1330     sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1331     theState = StateFinished;
1332     theInputTimeout = 1000;
1333     break;
1334   default:
1335     break;
1336   }
1337 }
1338 
1339 void
threadStop(ArbitSignal & aSignal)1340 ArbitMgr::threadStop(ArbitSignal& aSignal)
1341 {
1342   switch (aSignal.data.code) {
1343   case StopExit:
1344     switch (theState) {
1345     case StateStarted:
1346       sendStopRep(theStartReq, 0);
1347       break;
1348     case StateChoose1:                  // just in time
1349       sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1350       break;
1351     case StateChoose2:
1352       sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1353       sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1354       break;
1355     case StateInit:
1356     case StateFinished:
1357       //??
1358       break;
1359     }
1360     break;
1361   case StopRequest:
1362     break;
1363   case StopRestart:
1364     break;
1365   }
1366 }
1367 
1368 // output routines
1369 
1370 void
sendStartConf(ArbitSignal & aSignal,Uint32 code)1371 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
1372 {
1373   ArbitSignal copySignal = aSignal;
1374   copySignal.gsn = GSN_ARBIT_STARTCONF;
1375   copySignal.data.code = code;
1376   sendSignalToQmgr(copySignal);
1377 }
1378 
1379 void
sendChooseConf(ArbitSignal & aSignal,Uint32 code)1380 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
1381 {
1382   ArbitSignal copySignal = aSignal;
1383   copySignal.gsn = GSN_ARBIT_CHOOSECONF;
1384   copySignal.data.code = code;
1385   sendSignalToQmgr(copySignal);
1386 }
1387 
1388 void
sendChooseRef(ArbitSignal & aSignal,Uint32 code)1389 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
1390 {
1391   ArbitSignal copySignal = aSignal;
1392   copySignal.gsn = GSN_ARBIT_CHOOSEREF;
1393   copySignal.data.code = code;
1394   sendSignalToQmgr(copySignal);
1395 }
1396 
1397 void
sendStopRep(ArbitSignal & aSignal,Uint32 code)1398 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
1399 {
1400   ArbitSignal copySignal = aSignal;
1401   copySignal.gsn = GSN_ARBIT_STOPREP;
1402   copySignal.data.code = code;
1403   sendSignalToQmgr(copySignal);
1404 }
1405 
1406 /**
1407  * Send signal to QMGR.  The input includes signal number and
1408  * signal data.  The signal data is normally a copy of a received
1409  * signal so it contains expected arbitrator node id and ticket.
1410  * The sender in signal data is the QMGR node id.
1411  */
1412 void
sendSignalToQmgr(ArbitSignal & aSignal)1413 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
1414 {
1415   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId()));
1416 
1417   signal.theVerId_signalNumber = aSignal.gsn;
1418   signal.theReceiversBlockNumber = QMGR;
1419   signal.theTrace  = 0;
1420   signal.theLength = ArbitSignalData::SignalLength;
1421 
1422   ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
1423 
1424   sd->sender = numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId());
1425   sd->code = aSignal.data.code;
1426   sd->node = aSignal.data.node;
1427   sd->ticket = aSignal.data.ticket;
1428   sd->mask = aSignal.data.mask;
1429 
1430 #ifdef DEBUG_ARBIT
1431   char buf[17] = "";
1432   ndbout << "arbit send: ";
1433   ndbout << " gsn=" << aSignal.gsn;
1434   ndbout << " recv=" << aSignal.data.sender;
1435   ndbout << " code=" << aSignal.data.code;
1436   ndbout << " node=" << aSignal.data.node;
1437   ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1438   ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1439   ndbout << endl;
1440 #endif
1441 
1442   {
1443     m_clusterMgr.lock();
1444     m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
1445     m_clusterMgr.unlock();
1446   }
1447 }
1448 
1449