1 /* Copyright (c) 2003-2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #include <ndb_global.h>
18 #include <my_pthread.h>
19 #include <ndb_limits.h>
20 #include <util/version.h>
21 
22 #include "TransporterFacade.hpp"
23 #include "ClusterMgr.hpp"
24 #include <IPCConfig.hpp>
25 #include "NdbApiSignal.hpp"
26 #include "API.hpp"
27 #include <NdbSleep.h>
28 #include <NdbOut.hpp>
29 #include <NdbTick.h>
30 
31 
32 #include <signaldata/NodeFailRep.hpp>
33 #include <signaldata/NFCompleteRep.hpp>
34 #include <signaldata/ApiRegSignalData.hpp>
35 
36 #include <mgmapi.h>
37 #include <mgmapi_configuration.hpp>
38 #include <mgmapi_config_parameters.h>
39 
40 int global_flag_skip_invalidate_cache = 0;
41 //#define DEBUG_REG
42 
43 // Just a C wrapper for threadMain
44 extern "C"
45 void*
runClusterMgr_C(void * me)46 runClusterMgr_C(void * me)
47 {
48   ((ClusterMgr*) me)->threadMain();
49 
50   return NULL;
51 }
52 
53 extern "C" {
54   void ndbSetOwnVersion();
55 }
ClusterMgr(TransporterFacade & _facade)56 ClusterMgr::ClusterMgr(TransporterFacade & _facade):
57   theStop(0),
58   theFacade(_facade)
59 {
60   DBUG_ENTER("ClusterMgr::ClusterMgr");
61   ndbSetOwnVersion();
62   clusterMgrThreadMutex = NdbMutex_Create();
63   waitForHBCond= NdbCondition_Create();
64   waitingForHB= false;
65   m_max_api_reg_req_interval= 0xFFFFFFFF; // MAX_INT
66   noOfAliveNodes= 0;
67   noOfConnectedNodes= 0;
68   theClusterMgrThread= 0;
69   m_connect_count = 0;
70   m_cluster_state = CS_waiting_for_clean_cache;
71   DBUG_VOID_RETURN;
72 }
73 
~ClusterMgr()74 ClusterMgr::~ClusterMgr()
75 {
76   DBUG_ENTER("ClusterMgr::~ClusterMgr");
77   doStop();
78   NdbCondition_Destroy(waitForHBCond);
79   NdbMutex_Destroy(clusterMgrThreadMutex);
80   DBUG_VOID_RETURN;
81 }
82 
83 void
init(ndb_mgm_configuration_iterator & iter)84 ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
85   for(iter.first(); iter.valid(); iter.next()){
86     Uint32 tmp = 0;
87     if(iter.get(CFG_NODE_ID, &tmp))
88       continue;
89 
90     theNodes[tmp].defined = true;
91 #if 0
92     ndbout << "--------------------------------------" << endl;
93     ndbout << "--------------------------------------" << endl;
94     ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp));
95 #endif
96 
97     unsigned type;
98     if(iter.get(CFG_TYPE_OF_SECTION, &type))
99       continue;
100 
101     switch(type){
102     case NODE_TYPE_DB:
103       theNodes[tmp].m_info.m_type = NodeInfo::DB;
104       break;
105     case NODE_TYPE_API:
106       theNodes[tmp].m_info.m_type = NodeInfo::API;
107       break;
108     case NODE_TYPE_MGM:
109       theNodes[tmp].m_info.m_type = NodeInfo::MGM;
110       break;
111     default:
112       type = type;
113 #if 0
114       ndbout_c("ClusterMgr: Unknown node type: %d", type);
115 #endif
116     }
117   }
118 }
119 
120 void
startThread()121 ClusterMgr::startThread() {
122   NdbMutex_Lock(clusterMgrThreadMutex);
123 
124   theStop = 0;
125 
126   theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
127                                          (void**)this,
128                                          32768,
129                                          "ndb_clustermgr",
130                                          NDB_THREAD_PRIO_LOW);
131   NdbMutex_Unlock(clusterMgrThreadMutex);
132 }
133 
134 void
doStop()135 ClusterMgr::doStop( ){
136   DBUG_ENTER("ClusterMgr::doStop");
137   NdbMutex_Lock(clusterMgrThreadMutex);
138   if(theStop){
139     NdbMutex_Unlock(clusterMgrThreadMutex);
140     DBUG_VOID_RETURN;
141   }
142   void *status;
143   theStop = 1;
144   if (theClusterMgrThread) {
145     NdbThread_WaitFor(theClusterMgrThread, &status);
146     NdbThread_Destroy(&theClusterMgrThread);
147   }
148   NdbMutex_Unlock(clusterMgrThreadMutex);
149   DBUG_VOID_RETURN;
150 }
151 
152 void
forceHB()153 ClusterMgr::forceHB()
154 {
155     theFacade.lock_mutex();
156 
157     if(waitingForHB)
158     {
159       NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
160       theFacade.unlock_mutex();
161       return;
162     }
163 
164     waitingForHB= true;
165 
166     NodeBitmask ndb_nodes;
167     ndb_nodes.clear();
168     waitForHBFromNodes.clear();
169     for(Uint32 i = 0; i < MAX_NODES; i++)
170     {
171       if(!theNodes[i].defined)
172         continue;
173       if(theNodes[i].m_info.m_type == NodeInfo::DB)
174       {
175         ndb_nodes.set(i);
176         const ClusterMgr::Node &node= getNodeInfo(i);
177         waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
178       }
179     }
180     waitForHBFromNodes.bitAND(ndb_nodes);
181 
182 #ifdef DEBUG_REG
183     char buf[128];
184     ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
185 #endif
186     NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
187 
188     signal.theVerId_signalNumber   = GSN_API_REGREQ;
189     signal.theReceiversBlockNumber = QMGR;
190     signal.theTrace                = 0;
191     signal.theLength               = ApiRegReq::SignalLength;
192 
193     ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
194     req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
195     req->version = NDB_VERSION;
196 
197     int nodeId= 0;
198     for(int i=0;
199         (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.find(i));
200         i= nodeId+1)
201     {
202 #ifdef DEBUG_REG
203       ndbout << "FORCE HB to " << nodeId << endl;
204 #endif
205       theFacade.sendSignalUnCond(&signal, nodeId);
206     }
207 
208     /* Wait for nodes to reply - if any heartbeats was sent */
209     if (!waitForHBFromNodes.isclear())
210       NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
211 
212     waitingForHB= false;
213 #ifdef DEBUG_REG
214     ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
215 #endif
216     theFacade.unlock_mutex();
217 }
218 
219 void
threadMain()220 ClusterMgr::threadMain( ){
221   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
222 
223   signal.theVerId_signalNumber   = GSN_API_REGREQ;
224   signal.theReceiversBlockNumber = QMGR;
225   signal.theTrace                = 0;
226   signal.theLength               = ApiRegReq::SignalLength;
227 
228   ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
229   req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
230   req->version = NDB_VERSION;
231 
232 
233   Uint32 timeSlept = 100;
234   Uint64 now = NdbTick_CurrentMillisecond();
235 
236   while(!theStop){
237     /**
238      * Start of Secure area for use of Transporter
239      */
240     if (m_cluster_state == CS_waiting_for_clean_cache)
241     {
242       theFacade.m_globalDictCache.lock();
243       unsigned sz= theFacade.m_globalDictCache.get_size();
244       theFacade.m_globalDictCache.unlock();
245       if (sz)
246         goto next;
247       m_cluster_state = CS_waiting_for_first_connect;
248     }
249 
250     theFacade.lock_mutex();
251     for (int i = 1; i < MAX_NDB_NODES; i++){
252       /**
253        * Send register request (heartbeat) to all available nodes
254        * at specified timing intervals
255        */
256       const NodeId nodeId = i;
257       Node & theNode = theNodes[nodeId];
258 
259       if (!theNode.defined)
260 	continue;
261 
262       if (theNode.connected == false){
263 	theFacade.doConnect(nodeId);
264 	continue;
265       }
266 
267       if (!theNode.compatible){
268 	continue;
269       }
270 
271       theNode.hbCounter += timeSlept;
272       if (theNode.hbCounter >= m_max_api_reg_req_interval ||
273           theNode.hbCounter >= theNode.hbFrequency) {
274 	/**
275 	 * It is now time to send a new Heartbeat
276 	 */
277 	if (theNode.hbCounter >= theNode.hbFrequency) {
278 	  theNode.m_info.m_heartbeat_cnt++;
279 	  theNode.hbCounter = 0;
280 	}
281 
282 #ifdef DEBUG_REG
283 	ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
284 #endif
285 	theFacade.sendSignalUnCond(&signal, nodeId);
286       }//if
287 
288       if (theNode.m_info.m_heartbeat_cnt == 4 && theNode.hbFrequency > 0){
289 	reportNodeFailed(i);
290       }//if
291     }
292 
293     /**
294      * End of secure area. Let other threads in
295      */
296     theFacade.unlock_mutex();
297 
298 next:
299     // Sleep for 100 ms between each Registration Heartbeat
300     Uint64 before = now;
301     NdbSleep_MilliSleep(100);
302     now = NdbTick_CurrentMillisecond();
303     timeSlept = (now - before);
304   }
305 }
306 
307 #if 0
308 void
309 ClusterMgr::showState(NodeId nodeId){
310   ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl;
311   ndbout << "theNodeList      = " << theNodeList[nodeId] << endl;
312   ndbout << "theNodeState     = " << theNodeState[nodeId] << endl;
313   ndbout << "theNodeCount     = " << theNodeCount[nodeId] << endl;
314   ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl;
315   ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl;
316 }
317 #endif
318 
Node()319 ClusterMgr::Node::Node()
320   : m_state(NodeState::SL_NOTHING) {
321   compatible = nfCompleteRep = true;
322   connected = defined = m_alive = m_api_reg_conf = false;
323   m_state.m_connected_nodes.clear();
324 }
325 
326 /******************************************************************************
327  * API_REGREQ and friends
328  ******************************************************************************/
329 
330 void
execAPI_REGREQ(const Uint32 * theData)331 ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
332   const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
333   const NodeId nodeId = refToNode(apiRegReq->ref);
334 
335 #ifdef DEBUG_REG
336   ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
337 #endif
338 
339   assert(nodeId > 0 && nodeId < MAX_NODES);
340 
341   Node & node = theNodes[nodeId];
342   assert(node.defined == true);
343   assert(node.connected == true);
344 
345   if(node.m_info.m_version != apiRegReq->version){
346     node.m_info.m_version = apiRegReq->version;
347 
348     if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
349 	getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
350       node.compatible = false;
351     } else {
352       node.compatible = true;
353     }
354   }
355 
356   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
357   signal.theVerId_signalNumber   = GSN_API_REGCONF;
358   signal.theReceiversBlockNumber = API_CLUSTERMGR;
359   signal.theTrace                = 0;
360   signal.theLength               = ApiRegConf::SignalLength;
361 
362   ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
363   conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
364   conf->version = NDB_VERSION;
365   conf->apiHeartbeatFrequency = node.hbFrequency;
366   theFacade.sendSignalUnCond(&signal, nodeId);
367 }
368 
369 void
execAPI_REGCONF(const Uint32 * theData)370 ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
371   const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
372   const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
373 
374 #ifdef DEBUG_REG
375   ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
376 #endif
377 
378   assert(nodeId > 0 && nodeId < MAX_NODES);
379 
380   Node & node = theNodes[nodeId];
381   assert(node.defined == true);
382   assert(node.connected == true);
383 
384   if(node.m_info.m_version != apiRegConf->version){
385     node.m_info.m_version = apiRegConf->version;
386     if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM)
387       node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
388 					       node.m_info.m_version);
389     else
390       node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
391 					      node.m_info.m_version);
392   }
393 
394   node.m_api_reg_conf = true;
395 
396   node.m_state = apiRegConf->nodeState;
397   if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED  ||
398 			  node.m_state.getSingleUserMode())){
399     set_node_alive(node, true);
400   } else {
401     set_node_alive(node, false);
402   }//if
403   node.m_info.m_heartbeat_cnt = 0;
404   node.hbCounter = 0;
405 
406   if(waitingForHB)
407   {
408     waitForHBFromNodes.clear(nodeId);
409 
410     if(waitForHBFromNodes.isclear())
411     {
412       waitingForHB= false;
413       NdbCondition_Broadcast(waitForHBCond);
414     }
415   }
416   node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
417 }
418 
419 void
execAPI_REGREF(const Uint32 * theData)420 ClusterMgr::execAPI_REGREF(const Uint32 * theData){
421 
422   ApiRegRef * ref = (ApiRegRef*)theData;
423 
424   const NodeId nodeId = refToNode(ref->ref);
425 
426   assert(nodeId > 0 && nodeId < MAX_NODES);
427 
428   Node & node = theNodes[nodeId];
429   assert(node.connected == true);
430   assert(node.defined == true);
431 
432   node.compatible = false;
433   set_node_alive(node, false);
434   node.m_state = NodeState::SL_NOTHING;
435   node.m_info.m_version = ref->version;
436 
437   switch(ref->errorCode){
438   case ApiRegRef::WrongType:
439     ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
440     abort();
441   case ApiRegRef::UnsupportedVersion:
442   default:
443     break;
444   }
445 
446   waitForHBFromNodes.clear(nodeId);
447   if(waitForHBFromNodes.isclear())
448     NdbCondition_Signal(waitForHBCond);
449 }
450 
451 void
execNODE_FAILREP(const Uint32 * theData)452 ClusterMgr::execNODE_FAILREP(const Uint32 * theData){
453   NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0];
454   for(int i = 1; i<MAX_NODES; i++){
455     if(NodeBitmask::get(nodeFail->theNodes, i)){
456       reportNodeFailed(i);
457     }
458   }
459 }
460 
461 void
execNF_COMPLETEREP(const Uint32 * theData)462 ClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){
463   NFCompleteRep * const nfComp = (NFCompleteRep *)theData;
464 
465   const NodeId nodeId = nfComp->failedNodeId;
466   assert(nodeId > 0 && nodeId < MAX_NODES);
467 
468   theFacade.ReportNodeFailureComplete(nodeId);
469   theNodes[nodeId].nfCompleteRep = true;
470 }
471 
472 void
reportConnected(NodeId nodeId)473 ClusterMgr::reportConnected(NodeId nodeId){
474   DBUG_ENTER("ClusterMgr::reportConnected");
475   DBUG_PRINT("info", ("nodeId: %u", nodeId));
476   /**
477    * Ensure that we are sending heartbeat every 100 ms
478    * until we have got the first reply from NDB providing
479    * us with the real time-out period to use.
480    */
481   assert(nodeId > 0 && nodeId < MAX_NODES);
482 
483   noOfConnectedNodes++;
484 
485   Node & theNode = theNodes[nodeId];
486   theNode.connected = true;
487   theNode.m_info.m_heartbeat_cnt = 0;
488   theNode.hbCounter = 0;
489 
490   /**
491    * make sure the node itself is marked connected even
492    * if first API_REGCONF has not arrived
493    */
494   theNode.m_state.m_connected_nodes.set(nodeId);
495   theNode.hbFrequency = 0;
496   theNode.m_info.m_version = 0;
497   theNode.compatible = true;
498   theNode.nfCompleteRep = true;
499   theNode.m_state.startLevel = NodeState::SL_NOTHING;
500 
501   theFacade.ReportNodeAlive(nodeId);
502   DBUG_VOID_RETURN;
503 }
504 
505 void
reportDisconnected(NodeId nodeId)506 ClusterMgr::reportDisconnected(NodeId nodeId){
507   assert(nodeId > 0 && nodeId < MAX_NODES);
508   assert(noOfConnectedNodes > 0);
509 
510   noOfConnectedNodes--;
511   theNodes[nodeId].connected = false;
512   theNodes[nodeId].m_api_reg_conf = false;
513   theNodes[nodeId].m_state.m_connected_nodes.clear();
514 
515   reportNodeFailed(nodeId, true);
516 }
517 
518 void
reportNodeFailed(NodeId nodeId,bool disconnect)519 ClusterMgr::reportNodeFailed(NodeId nodeId, bool disconnect){
520 
521   Node & theNode = theNodes[nodeId];
522 
523   set_node_alive(theNode, false);
524   theNode.m_info.m_connectCount ++;
525 
526   if(theNode.connected)
527   {
528     theFacade.doDisconnect(nodeId);
529   }
530 
531   const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING);
532   theNode.m_state.startLevel = NodeState::SL_NOTHING;
533 
534   if(disconnect || report)
535   {
536     theFacade.ReportNodeDead(nodeId);
537   }
538 
539   theNode.nfCompleteRep = false;
540   if(noOfAliveNodes == 0)
541   {
542     if (!global_flag_skip_invalidate_cache)
543     {
544       theFacade.m_globalDictCache.lock();
545       theFacade.m_globalDictCache.invalidate_all();
546       theFacade.m_globalDictCache.unlock();
547       m_connect_count ++;
548       m_cluster_state = CS_waiting_for_clean_cache;
549     }
550     NFCompleteRep rep;
551     for(Uint32 i = 1; i<MAX_NODES; i++){
552       if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){
553 	rep.failedNodeId = i;
554 	execNF_COMPLETEREP((Uint32*)&rep);
555       }
556     }
557   }
558 }
559 
560 /******************************************************************************
561  * Arbitrator
562  ******************************************************************************/
ArbitMgr(TransporterFacade & _fac)563 ArbitMgr::ArbitMgr(TransporterFacade & _fac)
564   : theFacade(_fac)
565 {
566   DBUG_ENTER("ArbitMgr::ArbitMgr");
567 
568   theThreadMutex = NdbMutex_Create();
569   theInputCond = NdbCondition_Create();
570   theInputMutex = NdbMutex_Create();
571 
572   theRank = 0;
573   theDelay = 0;
574   theThread = 0;
575 
576   theInputTimeout = 0;
577   theInputFull = false;
578   memset(&theInputFull, 0, sizeof(theInputFull));
579   theState = StateInit;
580 
581   memset(&theStartReq, 0, sizeof(theStartReq));
582   memset(&theChooseReq1, 0, sizeof(theChooseReq1));
583   memset(&theChooseReq2, 0, sizeof(theChooseReq2));
584   memset(&theStopOrd, 0, sizeof(theStopOrd));
585 
586   DBUG_VOID_RETURN;
587 }
588 
~ArbitMgr()589 ArbitMgr::~ArbitMgr()
590 {
591   DBUG_ENTER("ArbitMgr::~ArbitMgr");
592   NdbMutex_Destroy(theThreadMutex);
593   NdbCondition_Destroy(theInputCond);
594   NdbMutex_Destroy(theInputMutex);
595   DBUG_VOID_RETURN;
596 }
597 
598 // Start arbitrator thread.  This is kernel request.
599 // First stop any previous thread since it is a left-over
600 // which was never used and which now has wrong ticket.
601 void
doStart(const Uint32 * theData)602 ArbitMgr::doStart(const Uint32* theData)
603 {
604   ArbitSignal aSignal;
605   NdbMutex_Lock(theThreadMutex);
606   if (theThread != NULL) {
607     aSignal.init(GSN_ARBIT_STOPORD, NULL);
608     aSignal.data.code = StopRestart;
609     sendSignalToThread(aSignal);
610     void* value;
611     NdbThread_WaitFor(theThread, &value);
612     NdbThread_Destroy(&theThread);
613     theState = StateInit;
614     theInputFull = false;
615   }
616   aSignal.init(GSN_ARBIT_STARTREQ, theData);
617   sendSignalToThread(aSignal);
618   theThread = NdbThread_Create(
619     runArbitMgr_C, (void**)this, 32768, "ndb_arbitmgr",
620     NDB_THREAD_PRIO_HIGH);
621   NdbMutex_Unlock(theThreadMutex);
622 }
623 
624 // The "choose me" signal from a candidate.
625 void
doChoose(const Uint32 * theData)626 ArbitMgr::doChoose(const Uint32* theData)
627 {
628   ArbitSignal aSignal;
629   aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
630   sendSignalToThread(aSignal);
631 }
632 
633 // Stop arbitrator thread via stop signal from the kernel
634 // or when exiting API program.
635 void
doStop(const Uint32 * theData)636 ArbitMgr::doStop(const Uint32* theData)
637 {
638   DBUG_ENTER("ArbitMgr::doStop");
639   ArbitSignal aSignal;
640   NdbMutex_Lock(theThreadMutex);
641   if (theThread != NULL) {
642     aSignal.init(GSN_ARBIT_STOPORD, theData);
643     if (theData == 0) {
644       aSignal.data.code = StopExit;
645     } else {
646       aSignal.data.code = StopRequest;
647     }
648     sendSignalToThread(aSignal);
649     void* value;
650     NdbThread_WaitFor(theThread, &value);
651     NdbThread_Destroy(&theThread);
652     theState = StateInit;
653   }
654   NdbMutex_Unlock(theThreadMutex);
655   DBUG_VOID_RETURN;
656 }
657 
658 // private methods
659 
660 extern "C"
661 void*
runArbitMgr_C(void * me)662 runArbitMgr_C(void* me)
663 {
664   ((ArbitMgr*) me)->threadMain();
665   return NULL;
666 }
667 
668 void
sendSignalToThread(ArbitSignal & aSignal)669 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
670 {
671 #ifdef DEBUG_ARBIT
672   char buf[17] = "";
673   ndbout << "arbit recv: ";
674   ndbout << " gsn=" << aSignal.gsn;
675   ndbout << " send=" << aSignal.data.sender;
676   ndbout << " code=" << aSignal.data.code;
677   ndbout << " node=" << aSignal.data.node;
678   ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
679   ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
680   ndbout << endl;
681 #endif
682   aSignal.setTimestamp();       // signal arrival time
683   NdbMutex_Lock(theInputMutex);
684   while (theInputFull) {
685     NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
686   }
687   theInputBuffer = aSignal;
688   theInputFull = true;
689   NdbCondition_Signal(theInputCond);
690   NdbMutex_Unlock(theInputMutex);
691 }
692 
693 void
threadMain()694 ArbitMgr::threadMain()
695 {
696   ArbitSignal aSignal;
697   aSignal = theInputBuffer;
698   threadStart(aSignal);
699   bool stop = false;
700   while (! stop) {
701     NdbMutex_Lock(theInputMutex);
702     while (! theInputFull) {
703       NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
704       threadTimeout();
705     }
706     aSignal = theInputBuffer;
707     theInputFull = false;
708     NdbCondition_Signal(theInputCond);
709     NdbMutex_Unlock(theInputMutex);
710     switch (aSignal.gsn) {
711     case GSN_ARBIT_CHOOSEREQ:
712       threadChoose(aSignal);
713       break;
714     case GSN_ARBIT_STOPORD:
715       stop = true;
716       break;
717     }
718   }
719   threadStop(aSignal);
720 }
721 
722 // handle events in the thread
723 
724 void
threadStart(ArbitSignal & aSignal)725 ArbitMgr::threadStart(ArbitSignal& aSignal)
726 {
727   theStartReq = aSignal;
728   sendStartConf(theStartReq, ArbitCode::ApiStart);
729   theState = StateStarted;
730   theInputTimeout = 1000;
731 }
732 
733 void
threadChoose(ArbitSignal & aSignal)734 ArbitMgr::threadChoose(ArbitSignal& aSignal)
735 {
736   switch (theState) {
737   case StateStarted:            // first REQ
738     if (! theStartReq.data.match(aSignal.data)) {
739       sendChooseRef(aSignal, ArbitCode::ErrTicket);
740       break;
741     }
742     theChooseReq1 = aSignal;
743     if (theDelay == 0) {
744       sendChooseConf(aSignal, ArbitCode::WinChoose);
745       theState = StateFinished;
746       theInputTimeout = 1000;
747       break;
748     }
749     theState = StateChoose1;
750     theInputTimeout = 1;
751     return;
752   case StateChoose1:            // second REQ within Delay
753     if (! theStartReq.data.match(aSignal.data)) {
754       sendChooseRef(aSignal, ArbitCode::ErrTicket);
755       break;
756     }
757     theChooseReq2 = aSignal;
758     theState = StateChoose2;
759     theInputTimeout = 1;
760     return;
761   case StateChoose2:            // too many REQs - refuse all
762     if (! theStartReq.data.match(aSignal.data)) {
763       sendChooseRef(aSignal, ArbitCode::ErrTicket);
764       break;
765     }
766     sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
767     sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
768     sendChooseRef(aSignal, ArbitCode::ErrToomany);
769     theState = StateFinished;
770     theInputTimeout = 1000;
771     return;
772   default:
773     sendChooseRef(aSignal, ArbitCode::ErrState);
774     break;
775   }
776 }
777 
778 void
threadTimeout()779 ArbitMgr::threadTimeout()
780 {
781   switch (theState) {
782   case StateStarted:
783     break;
784   case StateChoose1:
785     if (theChooseReq1.getTimediff() < theDelay)
786       break;
787     sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
788     theState = StateFinished;
789     theInputTimeout = 1000;
790     break;
791   case StateChoose2:
792     sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
793     sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
794     theState = StateFinished;
795     theInputTimeout = 1000;
796     break;
797   default:
798     break;
799   }
800 }
801 
802 void
threadStop(ArbitSignal & aSignal)803 ArbitMgr::threadStop(ArbitSignal& aSignal)
804 {
805   switch (aSignal.data.code) {
806   case StopExit:
807     switch (theState) {
808     case StateStarted:
809       sendStopRep(theStartReq, 0);
810       break;
811     case StateChoose1:                  // just in time
812       sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
813       break;
814     case StateChoose2:
815       sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
816       sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
817       break;
818     case StateInit:
819     case StateFinished:
820       //??
821       break;
822     }
823     break;
824   case StopRequest:
825     break;
826   case StopRestart:
827     break;
828   }
829 }
830 
831 // output routines
832 
833 void
sendStartConf(ArbitSignal & aSignal,Uint32 code)834 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
835 {
836   ArbitSignal copySignal = aSignal;
837   copySignal.gsn = GSN_ARBIT_STARTCONF;
838   copySignal.data.code = code;
839   sendSignalToQmgr(copySignal);
840 }
841 
842 void
sendChooseConf(ArbitSignal & aSignal,Uint32 code)843 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
844 {
845   ArbitSignal copySignal = aSignal;
846   copySignal.gsn = GSN_ARBIT_CHOOSECONF;
847   copySignal.data.code = code;
848   sendSignalToQmgr(copySignal);
849 }
850 
851 void
sendChooseRef(ArbitSignal & aSignal,Uint32 code)852 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
853 {
854   ArbitSignal copySignal = aSignal;
855   copySignal.gsn = GSN_ARBIT_CHOOSEREF;
856   copySignal.data.code = code;
857   sendSignalToQmgr(copySignal);
858 }
859 
860 void
sendStopRep(ArbitSignal & aSignal,Uint32 code)861 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
862 {
863   ArbitSignal copySignal = aSignal;
864   copySignal.gsn = GSN_ARBIT_STOPREP;
865   copySignal.data.code = code;
866   sendSignalToQmgr(copySignal);
867 }
868 
869 /**
870  * Send signal to QMGR.  The input includes signal number and
871  * signal data.  The signal data is normally a copy of a received
872  * signal so it contains expected arbitrator node id and ticket.
873  * The sender in signal data is the QMGR node id.
874  */
875 void
sendSignalToQmgr(ArbitSignal & aSignal)876 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
877 {
878   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
879 
880   signal.theVerId_signalNumber = aSignal.gsn;
881   signal.theReceiversBlockNumber = QMGR;
882   signal.theTrace  = 0;
883   signal.theLength = ArbitSignalData::SignalLength;
884 
885   ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
886 
887   sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId());
888   sd->code = aSignal.data.code;
889   sd->node = aSignal.data.node;
890   sd->ticket = aSignal.data.ticket;
891   sd->mask = aSignal.data.mask;
892 
893 #ifdef DEBUG_ARBIT
894   char buf[17] = "";
895   ndbout << "arbit send: ";
896   ndbout << " gsn=" << aSignal.gsn;
897   ndbout << " recv=" << aSignal.data.sender;
898   ndbout << " code=" << aSignal.data.code;
899   ndbout << " node=" << aSignal.data.node;
900   ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
901   ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
902   ndbout << endl;
903 #endif
904 
905   theFacade.lock_mutex();
906   theFacade.sendSignalUnCond(&signal, aSignal.data.sender);
907   theFacade.unlock_mutex();
908 }
909 
910