1 /* Copyright (c) 2003-2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #include <ndb_global.h>
18 #include <my_pthread.h>
19 #include <ndb_limits.h>
20 #include <util/version.h>
21
22 #include "TransporterFacade.hpp"
23 #include "ClusterMgr.hpp"
24 #include <IPCConfig.hpp>
25 #include "NdbApiSignal.hpp"
26 #include "API.hpp"
27 #include <NdbSleep.h>
28 #include <NdbOut.hpp>
29 #include <NdbTick.h>
30
31
32 #include <signaldata/NodeFailRep.hpp>
33 #include <signaldata/NFCompleteRep.hpp>
34 #include <signaldata/ApiRegSignalData.hpp>
35
36 #include <mgmapi.h>
37 #include <mgmapi_configuration.hpp>
38 #include <mgmapi_config_parameters.h>
39
40 int global_flag_skip_invalidate_cache = 0;
41 //#define DEBUG_REG
42
43 // Just a C wrapper for threadMain
44 extern "C"
45 void*
runClusterMgr_C(void * me)46 runClusterMgr_C(void * me)
47 {
48 ((ClusterMgr*) me)->threadMain();
49
50 return NULL;
51 }
52
53 extern "C" {
54 void ndbSetOwnVersion();
55 }
ClusterMgr(TransporterFacade & _facade)56 ClusterMgr::ClusterMgr(TransporterFacade & _facade):
57 theStop(0),
58 theFacade(_facade)
59 {
60 DBUG_ENTER("ClusterMgr::ClusterMgr");
61 ndbSetOwnVersion();
62 clusterMgrThreadMutex = NdbMutex_Create();
63 waitForHBCond= NdbCondition_Create();
64 waitingForHB= false;
65 m_max_api_reg_req_interval= 0xFFFFFFFF; // MAX_INT
66 noOfAliveNodes= 0;
67 noOfConnectedNodes= 0;
68 theClusterMgrThread= 0;
69 m_connect_count = 0;
70 m_cluster_state = CS_waiting_for_clean_cache;
71 DBUG_VOID_RETURN;
72 }
73
~ClusterMgr()74 ClusterMgr::~ClusterMgr()
75 {
76 DBUG_ENTER("ClusterMgr::~ClusterMgr");
77 doStop();
78 NdbCondition_Destroy(waitForHBCond);
79 NdbMutex_Destroy(clusterMgrThreadMutex);
80 DBUG_VOID_RETURN;
81 }
82
83 void
init(ndb_mgm_configuration_iterator & iter)84 ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
85 for(iter.first(); iter.valid(); iter.next()){
86 Uint32 tmp = 0;
87 if(iter.get(CFG_NODE_ID, &tmp))
88 continue;
89
90 theNodes[tmp].defined = true;
91 #if 0
92 ndbout << "--------------------------------------" << endl;
93 ndbout << "--------------------------------------" << endl;
94 ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp));
95 #endif
96
97 unsigned type;
98 if(iter.get(CFG_TYPE_OF_SECTION, &type))
99 continue;
100
101 switch(type){
102 case NODE_TYPE_DB:
103 theNodes[tmp].m_info.m_type = NodeInfo::DB;
104 break;
105 case NODE_TYPE_API:
106 theNodes[tmp].m_info.m_type = NodeInfo::API;
107 break;
108 case NODE_TYPE_MGM:
109 theNodes[tmp].m_info.m_type = NodeInfo::MGM;
110 break;
111 default:
112 type = type;
113 #if 0
114 ndbout_c("ClusterMgr: Unknown node type: %d", type);
115 #endif
116 }
117 }
118 }
119
120 void
startThread()121 ClusterMgr::startThread() {
122 NdbMutex_Lock(clusterMgrThreadMutex);
123
124 theStop = 0;
125
126 theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
127 (void**)this,
128 32768,
129 "ndb_clustermgr",
130 NDB_THREAD_PRIO_LOW);
131 NdbMutex_Unlock(clusterMgrThreadMutex);
132 }
133
134 void
doStop()135 ClusterMgr::doStop( ){
136 DBUG_ENTER("ClusterMgr::doStop");
137 NdbMutex_Lock(clusterMgrThreadMutex);
138 if(theStop){
139 NdbMutex_Unlock(clusterMgrThreadMutex);
140 DBUG_VOID_RETURN;
141 }
142 void *status;
143 theStop = 1;
144 if (theClusterMgrThread) {
145 NdbThread_WaitFor(theClusterMgrThread, &status);
146 NdbThread_Destroy(&theClusterMgrThread);
147 }
148 NdbMutex_Unlock(clusterMgrThreadMutex);
149 DBUG_VOID_RETURN;
150 }
151
152 void
forceHB()153 ClusterMgr::forceHB()
154 {
155 theFacade.lock_mutex();
156
157 if(waitingForHB)
158 {
159 NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
160 theFacade.unlock_mutex();
161 return;
162 }
163
164 waitingForHB= true;
165
166 NodeBitmask ndb_nodes;
167 ndb_nodes.clear();
168 waitForHBFromNodes.clear();
169 for(Uint32 i = 0; i < MAX_NODES; i++)
170 {
171 if(!theNodes[i].defined)
172 continue;
173 if(theNodes[i].m_info.m_type == NodeInfo::DB)
174 {
175 ndb_nodes.set(i);
176 const ClusterMgr::Node &node= getNodeInfo(i);
177 waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
178 }
179 }
180 waitForHBFromNodes.bitAND(ndb_nodes);
181
182 #ifdef DEBUG_REG
183 char buf[128];
184 ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
185 #endif
186 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
187
188 signal.theVerId_signalNumber = GSN_API_REGREQ;
189 signal.theReceiversBlockNumber = QMGR;
190 signal.theTrace = 0;
191 signal.theLength = ApiRegReq::SignalLength;
192
193 ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
194 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
195 req->version = NDB_VERSION;
196
197 int nodeId= 0;
198 for(int i=0;
199 (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.find(i));
200 i= nodeId+1)
201 {
202 #ifdef DEBUG_REG
203 ndbout << "FORCE HB to " << nodeId << endl;
204 #endif
205 theFacade.sendSignalUnCond(&signal, nodeId);
206 }
207
208 /* Wait for nodes to reply - if any heartbeats was sent */
209 if (!waitForHBFromNodes.isclear())
210 NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
211
212 waitingForHB= false;
213 #ifdef DEBUG_REG
214 ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
215 #endif
216 theFacade.unlock_mutex();
217 }
218
219 void
threadMain()220 ClusterMgr::threadMain( ){
221 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
222
223 signal.theVerId_signalNumber = GSN_API_REGREQ;
224 signal.theReceiversBlockNumber = QMGR;
225 signal.theTrace = 0;
226 signal.theLength = ApiRegReq::SignalLength;
227
228 ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
229 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
230 req->version = NDB_VERSION;
231
232
233 Uint32 timeSlept = 100;
234 Uint64 now = NdbTick_CurrentMillisecond();
235
236 while(!theStop){
237 /**
238 * Start of Secure area for use of Transporter
239 */
240 if (m_cluster_state == CS_waiting_for_clean_cache)
241 {
242 theFacade.m_globalDictCache.lock();
243 unsigned sz= theFacade.m_globalDictCache.get_size();
244 theFacade.m_globalDictCache.unlock();
245 if (sz)
246 goto next;
247 m_cluster_state = CS_waiting_for_first_connect;
248 }
249
250 theFacade.lock_mutex();
251 for (int i = 1; i < MAX_NDB_NODES; i++){
252 /**
253 * Send register request (heartbeat) to all available nodes
254 * at specified timing intervals
255 */
256 const NodeId nodeId = i;
257 Node & theNode = theNodes[nodeId];
258
259 if (!theNode.defined)
260 continue;
261
262 if (theNode.connected == false){
263 theFacade.doConnect(nodeId);
264 continue;
265 }
266
267 if (!theNode.compatible){
268 continue;
269 }
270
271 theNode.hbCounter += timeSlept;
272 if (theNode.hbCounter >= m_max_api_reg_req_interval ||
273 theNode.hbCounter >= theNode.hbFrequency) {
274 /**
275 * It is now time to send a new Heartbeat
276 */
277 if (theNode.hbCounter >= theNode.hbFrequency) {
278 theNode.m_info.m_heartbeat_cnt++;
279 theNode.hbCounter = 0;
280 }
281
282 #ifdef DEBUG_REG
283 ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
284 #endif
285 theFacade.sendSignalUnCond(&signal, nodeId);
286 }//if
287
288 if (theNode.m_info.m_heartbeat_cnt == 4 && theNode.hbFrequency > 0){
289 reportNodeFailed(i);
290 }//if
291 }
292
293 /**
294 * End of secure area. Let other threads in
295 */
296 theFacade.unlock_mutex();
297
298 next:
299 // Sleep for 100 ms between each Registration Heartbeat
300 Uint64 before = now;
301 NdbSleep_MilliSleep(100);
302 now = NdbTick_CurrentMillisecond();
303 timeSlept = (now - before);
304 }
305 }
306
307 #if 0
308 void
309 ClusterMgr::showState(NodeId nodeId){
310 ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl;
311 ndbout << "theNodeList = " << theNodeList[nodeId] << endl;
312 ndbout << "theNodeState = " << theNodeState[nodeId] << endl;
313 ndbout << "theNodeCount = " << theNodeCount[nodeId] << endl;
314 ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl;
315 ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl;
316 }
317 #endif
318
Node()319 ClusterMgr::Node::Node()
320 : m_state(NodeState::SL_NOTHING) {
321 compatible = nfCompleteRep = true;
322 connected = defined = m_alive = m_api_reg_conf = false;
323 m_state.m_connected_nodes.clear();
324 }
325
326 /******************************************************************************
327 * API_REGREQ and friends
328 ******************************************************************************/
329
330 void
execAPI_REGREQ(const Uint32 * theData)331 ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
332 const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
333 const NodeId nodeId = refToNode(apiRegReq->ref);
334
335 #ifdef DEBUG_REG
336 ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
337 #endif
338
339 assert(nodeId > 0 && nodeId < MAX_NODES);
340
341 Node & node = theNodes[nodeId];
342 assert(node.defined == true);
343 assert(node.connected == true);
344
345 if(node.m_info.m_version != apiRegReq->version){
346 node.m_info.m_version = apiRegReq->version;
347
348 if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
349 getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
350 node.compatible = false;
351 } else {
352 node.compatible = true;
353 }
354 }
355
356 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
357 signal.theVerId_signalNumber = GSN_API_REGCONF;
358 signal.theReceiversBlockNumber = API_CLUSTERMGR;
359 signal.theTrace = 0;
360 signal.theLength = ApiRegConf::SignalLength;
361
362 ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
363 conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
364 conf->version = NDB_VERSION;
365 conf->apiHeartbeatFrequency = node.hbFrequency;
366 theFacade.sendSignalUnCond(&signal, nodeId);
367 }
368
369 void
execAPI_REGCONF(const Uint32 * theData)370 ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
371 const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
372 const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
373
374 #ifdef DEBUG_REG
375 ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
376 #endif
377
378 assert(nodeId > 0 && nodeId < MAX_NODES);
379
380 Node & node = theNodes[nodeId];
381 assert(node.defined == true);
382 assert(node.connected == true);
383
384 if(node.m_info.m_version != apiRegConf->version){
385 node.m_info.m_version = apiRegConf->version;
386 if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM)
387 node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
388 node.m_info.m_version);
389 else
390 node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
391 node.m_info.m_version);
392 }
393
394 node.m_api_reg_conf = true;
395
396 node.m_state = apiRegConf->nodeState;
397 if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED ||
398 node.m_state.getSingleUserMode())){
399 set_node_alive(node, true);
400 } else {
401 set_node_alive(node, false);
402 }//if
403 node.m_info.m_heartbeat_cnt = 0;
404 node.hbCounter = 0;
405
406 if(waitingForHB)
407 {
408 waitForHBFromNodes.clear(nodeId);
409
410 if(waitForHBFromNodes.isclear())
411 {
412 waitingForHB= false;
413 NdbCondition_Broadcast(waitForHBCond);
414 }
415 }
416 node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
417 }
418
419 void
execAPI_REGREF(const Uint32 * theData)420 ClusterMgr::execAPI_REGREF(const Uint32 * theData){
421
422 ApiRegRef * ref = (ApiRegRef*)theData;
423
424 const NodeId nodeId = refToNode(ref->ref);
425
426 assert(nodeId > 0 && nodeId < MAX_NODES);
427
428 Node & node = theNodes[nodeId];
429 assert(node.connected == true);
430 assert(node.defined == true);
431
432 node.compatible = false;
433 set_node_alive(node, false);
434 node.m_state = NodeState::SL_NOTHING;
435 node.m_info.m_version = ref->version;
436
437 switch(ref->errorCode){
438 case ApiRegRef::WrongType:
439 ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
440 abort();
441 case ApiRegRef::UnsupportedVersion:
442 default:
443 break;
444 }
445
446 waitForHBFromNodes.clear(nodeId);
447 if(waitForHBFromNodes.isclear())
448 NdbCondition_Signal(waitForHBCond);
449 }
450
451 void
execNODE_FAILREP(const Uint32 * theData)452 ClusterMgr::execNODE_FAILREP(const Uint32 * theData){
453 NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0];
454 for(int i = 1; i<MAX_NODES; i++){
455 if(NodeBitmask::get(nodeFail->theNodes, i)){
456 reportNodeFailed(i);
457 }
458 }
459 }
460
461 void
execNF_COMPLETEREP(const Uint32 * theData)462 ClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){
463 NFCompleteRep * const nfComp = (NFCompleteRep *)theData;
464
465 const NodeId nodeId = nfComp->failedNodeId;
466 assert(nodeId > 0 && nodeId < MAX_NODES);
467
468 theFacade.ReportNodeFailureComplete(nodeId);
469 theNodes[nodeId].nfCompleteRep = true;
470 }
471
472 void
reportConnected(NodeId nodeId)473 ClusterMgr::reportConnected(NodeId nodeId){
474 DBUG_ENTER("ClusterMgr::reportConnected");
475 DBUG_PRINT("info", ("nodeId: %u", nodeId));
476 /**
477 * Ensure that we are sending heartbeat every 100 ms
478 * until we have got the first reply from NDB providing
479 * us with the real time-out period to use.
480 */
481 assert(nodeId > 0 && nodeId < MAX_NODES);
482
483 noOfConnectedNodes++;
484
485 Node & theNode = theNodes[nodeId];
486 theNode.connected = true;
487 theNode.m_info.m_heartbeat_cnt = 0;
488 theNode.hbCounter = 0;
489
490 /**
491 * make sure the node itself is marked connected even
492 * if first API_REGCONF has not arrived
493 */
494 theNode.m_state.m_connected_nodes.set(nodeId);
495 theNode.hbFrequency = 0;
496 theNode.m_info.m_version = 0;
497 theNode.compatible = true;
498 theNode.nfCompleteRep = true;
499 theNode.m_state.startLevel = NodeState::SL_NOTHING;
500
501 theFacade.ReportNodeAlive(nodeId);
502 DBUG_VOID_RETURN;
503 }
504
505 void
reportDisconnected(NodeId nodeId)506 ClusterMgr::reportDisconnected(NodeId nodeId){
507 assert(nodeId > 0 && nodeId < MAX_NODES);
508 assert(noOfConnectedNodes > 0);
509
510 noOfConnectedNodes--;
511 theNodes[nodeId].connected = false;
512 theNodes[nodeId].m_api_reg_conf = false;
513 theNodes[nodeId].m_state.m_connected_nodes.clear();
514
515 reportNodeFailed(nodeId, true);
516 }
517
518 void
reportNodeFailed(NodeId nodeId,bool disconnect)519 ClusterMgr::reportNodeFailed(NodeId nodeId, bool disconnect){
520
521 Node & theNode = theNodes[nodeId];
522
523 set_node_alive(theNode, false);
524 theNode.m_info.m_connectCount ++;
525
526 if(theNode.connected)
527 {
528 theFacade.doDisconnect(nodeId);
529 }
530
531 const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING);
532 theNode.m_state.startLevel = NodeState::SL_NOTHING;
533
534 if(disconnect || report)
535 {
536 theFacade.ReportNodeDead(nodeId);
537 }
538
539 theNode.nfCompleteRep = false;
540 if(noOfAliveNodes == 0)
541 {
542 if (!global_flag_skip_invalidate_cache)
543 {
544 theFacade.m_globalDictCache.lock();
545 theFacade.m_globalDictCache.invalidate_all();
546 theFacade.m_globalDictCache.unlock();
547 m_connect_count ++;
548 m_cluster_state = CS_waiting_for_clean_cache;
549 }
550 NFCompleteRep rep;
551 for(Uint32 i = 1; i<MAX_NODES; i++){
552 if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){
553 rep.failedNodeId = i;
554 execNF_COMPLETEREP((Uint32*)&rep);
555 }
556 }
557 }
558 }
559
560 /******************************************************************************
561 * Arbitrator
562 ******************************************************************************/
ArbitMgr(TransporterFacade & _fac)563 ArbitMgr::ArbitMgr(TransporterFacade & _fac)
564 : theFacade(_fac)
565 {
566 DBUG_ENTER("ArbitMgr::ArbitMgr");
567
568 theThreadMutex = NdbMutex_Create();
569 theInputCond = NdbCondition_Create();
570 theInputMutex = NdbMutex_Create();
571
572 theRank = 0;
573 theDelay = 0;
574 theThread = 0;
575
576 theInputTimeout = 0;
577 theInputFull = false;
578 memset(&theInputFull, 0, sizeof(theInputFull));
579 theState = StateInit;
580
581 memset(&theStartReq, 0, sizeof(theStartReq));
582 memset(&theChooseReq1, 0, sizeof(theChooseReq1));
583 memset(&theChooseReq2, 0, sizeof(theChooseReq2));
584 memset(&theStopOrd, 0, sizeof(theStopOrd));
585
586 DBUG_VOID_RETURN;
587 }
588
~ArbitMgr()589 ArbitMgr::~ArbitMgr()
590 {
591 DBUG_ENTER("ArbitMgr::~ArbitMgr");
592 NdbMutex_Destroy(theThreadMutex);
593 NdbCondition_Destroy(theInputCond);
594 NdbMutex_Destroy(theInputMutex);
595 DBUG_VOID_RETURN;
596 }
597
598 // Start arbitrator thread. This is kernel request.
599 // First stop any previous thread since it is a left-over
600 // which was never used and which now has wrong ticket.
601 void
doStart(const Uint32 * theData)602 ArbitMgr::doStart(const Uint32* theData)
603 {
604 ArbitSignal aSignal;
605 NdbMutex_Lock(theThreadMutex);
606 if (theThread != NULL) {
607 aSignal.init(GSN_ARBIT_STOPORD, NULL);
608 aSignal.data.code = StopRestart;
609 sendSignalToThread(aSignal);
610 void* value;
611 NdbThread_WaitFor(theThread, &value);
612 NdbThread_Destroy(&theThread);
613 theState = StateInit;
614 theInputFull = false;
615 }
616 aSignal.init(GSN_ARBIT_STARTREQ, theData);
617 sendSignalToThread(aSignal);
618 theThread = NdbThread_Create(
619 runArbitMgr_C, (void**)this, 32768, "ndb_arbitmgr",
620 NDB_THREAD_PRIO_HIGH);
621 NdbMutex_Unlock(theThreadMutex);
622 }
623
624 // The "choose me" signal from a candidate.
625 void
doChoose(const Uint32 * theData)626 ArbitMgr::doChoose(const Uint32* theData)
627 {
628 ArbitSignal aSignal;
629 aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
630 sendSignalToThread(aSignal);
631 }
632
633 // Stop arbitrator thread via stop signal from the kernel
634 // or when exiting API program.
635 void
doStop(const Uint32 * theData)636 ArbitMgr::doStop(const Uint32* theData)
637 {
638 DBUG_ENTER("ArbitMgr::doStop");
639 ArbitSignal aSignal;
640 NdbMutex_Lock(theThreadMutex);
641 if (theThread != NULL) {
642 aSignal.init(GSN_ARBIT_STOPORD, theData);
643 if (theData == 0) {
644 aSignal.data.code = StopExit;
645 } else {
646 aSignal.data.code = StopRequest;
647 }
648 sendSignalToThread(aSignal);
649 void* value;
650 NdbThread_WaitFor(theThread, &value);
651 NdbThread_Destroy(&theThread);
652 theState = StateInit;
653 }
654 NdbMutex_Unlock(theThreadMutex);
655 DBUG_VOID_RETURN;
656 }
657
658 // private methods
659
660 extern "C"
661 void*
runArbitMgr_C(void * me)662 runArbitMgr_C(void* me)
663 {
664 ((ArbitMgr*) me)->threadMain();
665 return NULL;
666 }
667
668 void
sendSignalToThread(ArbitSignal & aSignal)669 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
670 {
671 #ifdef DEBUG_ARBIT
672 char buf[17] = "";
673 ndbout << "arbit recv: ";
674 ndbout << " gsn=" << aSignal.gsn;
675 ndbout << " send=" << aSignal.data.sender;
676 ndbout << " code=" << aSignal.data.code;
677 ndbout << " node=" << aSignal.data.node;
678 ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
679 ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
680 ndbout << endl;
681 #endif
682 aSignal.setTimestamp(); // signal arrival time
683 NdbMutex_Lock(theInputMutex);
684 while (theInputFull) {
685 NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
686 }
687 theInputBuffer = aSignal;
688 theInputFull = true;
689 NdbCondition_Signal(theInputCond);
690 NdbMutex_Unlock(theInputMutex);
691 }
692
693 void
threadMain()694 ArbitMgr::threadMain()
695 {
696 ArbitSignal aSignal;
697 aSignal = theInputBuffer;
698 threadStart(aSignal);
699 bool stop = false;
700 while (! stop) {
701 NdbMutex_Lock(theInputMutex);
702 while (! theInputFull) {
703 NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
704 threadTimeout();
705 }
706 aSignal = theInputBuffer;
707 theInputFull = false;
708 NdbCondition_Signal(theInputCond);
709 NdbMutex_Unlock(theInputMutex);
710 switch (aSignal.gsn) {
711 case GSN_ARBIT_CHOOSEREQ:
712 threadChoose(aSignal);
713 break;
714 case GSN_ARBIT_STOPORD:
715 stop = true;
716 break;
717 }
718 }
719 threadStop(aSignal);
720 }
721
722 // handle events in the thread
723
724 void
threadStart(ArbitSignal & aSignal)725 ArbitMgr::threadStart(ArbitSignal& aSignal)
726 {
727 theStartReq = aSignal;
728 sendStartConf(theStartReq, ArbitCode::ApiStart);
729 theState = StateStarted;
730 theInputTimeout = 1000;
731 }
732
733 void
threadChoose(ArbitSignal & aSignal)734 ArbitMgr::threadChoose(ArbitSignal& aSignal)
735 {
736 switch (theState) {
737 case StateStarted: // first REQ
738 if (! theStartReq.data.match(aSignal.data)) {
739 sendChooseRef(aSignal, ArbitCode::ErrTicket);
740 break;
741 }
742 theChooseReq1 = aSignal;
743 if (theDelay == 0) {
744 sendChooseConf(aSignal, ArbitCode::WinChoose);
745 theState = StateFinished;
746 theInputTimeout = 1000;
747 break;
748 }
749 theState = StateChoose1;
750 theInputTimeout = 1;
751 return;
752 case StateChoose1: // second REQ within Delay
753 if (! theStartReq.data.match(aSignal.data)) {
754 sendChooseRef(aSignal, ArbitCode::ErrTicket);
755 break;
756 }
757 theChooseReq2 = aSignal;
758 theState = StateChoose2;
759 theInputTimeout = 1;
760 return;
761 case StateChoose2: // too many REQs - refuse all
762 if (! theStartReq.data.match(aSignal.data)) {
763 sendChooseRef(aSignal, ArbitCode::ErrTicket);
764 break;
765 }
766 sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
767 sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
768 sendChooseRef(aSignal, ArbitCode::ErrToomany);
769 theState = StateFinished;
770 theInputTimeout = 1000;
771 return;
772 default:
773 sendChooseRef(aSignal, ArbitCode::ErrState);
774 break;
775 }
776 }
777
778 void
threadTimeout()779 ArbitMgr::threadTimeout()
780 {
781 switch (theState) {
782 case StateStarted:
783 break;
784 case StateChoose1:
785 if (theChooseReq1.getTimediff() < theDelay)
786 break;
787 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
788 theState = StateFinished;
789 theInputTimeout = 1000;
790 break;
791 case StateChoose2:
792 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
793 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
794 theState = StateFinished;
795 theInputTimeout = 1000;
796 break;
797 default:
798 break;
799 }
800 }
801
802 void
threadStop(ArbitSignal & aSignal)803 ArbitMgr::threadStop(ArbitSignal& aSignal)
804 {
805 switch (aSignal.data.code) {
806 case StopExit:
807 switch (theState) {
808 case StateStarted:
809 sendStopRep(theStartReq, 0);
810 break;
811 case StateChoose1: // just in time
812 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
813 break;
814 case StateChoose2:
815 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
816 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
817 break;
818 case StateInit:
819 case StateFinished:
820 //??
821 break;
822 }
823 break;
824 case StopRequest:
825 break;
826 case StopRestart:
827 break;
828 }
829 }
830
831 // output routines
832
833 void
sendStartConf(ArbitSignal & aSignal,Uint32 code)834 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
835 {
836 ArbitSignal copySignal = aSignal;
837 copySignal.gsn = GSN_ARBIT_STARTCONF;
838 copySignal.data.code = code;
839 sendSignalToQmgr(copySignal);
840 }
841
842 void
sendChooseConf(ArbitSignal & aSignal,Uint32 code)843 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
844 {
845 ArbitSignal copySignal = aSignal;
846 copySignal.gsn = GSN_ARBIT_CHOOSECONF;
847 copySignal.data.code = code;
848 sendSignalToQmgr(copySignal);
849 }
850
851 void
sendChooseRef(ArbitSignal & aSignal,Uint32 code)852 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
853 {
854 ArbitSignal copySignal = aSignal;
855 copySignal.gsn = GSN_ARBIT_CHOOSEREF;
856 copySignal.data.code = code;
857 sendSignalToQmgr(copySignal);
858 }
859
860 void
sendStopRep(ArbitSignal & aSignal,Uint32 code)861 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
862 {
863 ArbitSignal copySignal = aSignal;
864 copySignal.gsn = GSN_ARBIT_STOPREP;
865 copySignal.data.code = code;
866 sendSignalToQmgr(copySignal);
867 }
868
869 /**
870 * Send signal to QMGR. The input includes signal number and
871 * signal data. The signal data is normally a copy of a received
872 * signal so it contains expected arbitrator node id and ticket.
873 * The sender in signal data is the QMGR node id.
874 */
875 void
sendSignalToQmgr(ArbitSignal & aSignal)876 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
877 {
878 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
879
880 signal.theVerId_signalNumber = aSignal.gsn;
881 signal.theReceiversBlockNumber = QMGR;
882 signal.theTrace = 0;
883 signal.theLength = ArbitSignalData::SignalLength;
884
885 ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
886
887 sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId());
888 sd->code = aSignal.data.code;
889 sd->node = aSignal.data.node;
890 sd->ticket = aSignal.data.ticket;
891 sd->mask = aSignal.data.mask;
892
893 #ifdef DEBUG_ARBIT
894 char buf[17] = "";
895 ndbout << "arbit send: ";
896 ndbout << " gsn=" << aSignal.gsn;
897 ndbout << " recv=" << aSignal.data.sender;
898 ndbout << " code=" << aSignal.data.code;
899 ndbout << " node=" << aSignal.data.node;
900 ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
901 ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
902 ndbout << endl;
903 #endif
904
905 theFacade.lock_mutex();
906 theFacade.sendSignalUnCond(&signal, aSignal.data.sender);
907 theFacade.unlock_mutex();
908 }
909
910