1 /*
2 Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26 #include <ndb_limits.h>
27 #include <util/version.h>
28
29 #include "TransporterFacade.hpp"
30 #include <kernel/GlobalSignalNumbers.h>
31
32 #include "ClusterMgr.hpp"
33 #include <IPCConfig.hpp>
34 #include "NdbApiSignal.hpp"
35 #include <NdbSleep.h>
36 #include <NdbOut.hpp>
37 #include <NdbTick.h>
38
39
40 #include <signaldata/NodeFailRep.hpp>
41 #include <signaldata/NFCompleteRep.hpp>
42 #include <signaldata/ApiRegSignalData.hpp>
43 #include <signaldata/AlterTable.hpp>
44 #include <signaldata/SumaImpl.hpp>
45
46 #include <mgmapi.h>
47 #include <mgmapi_configuration.hpp>
48 #include <mgmapi_config_parameters.h>
49
50 int global_flag_skip_invalidate_cache = 0;
51 int global_flag_skip_waiting_for_clean_cache = 0;
52 //#define DEBUG_REG
53
54 // Just a C wrapper for threadMain
55 extern "C"
56 void*
runClusterMgr_C(void * me)57 runClusterMgr_C(void * me)
58 {
59 ((ClusterMgr*) me)->threadMain();
60
61 return NULL;
62 }
63
ClusterMgr(TransporterFacade & _facade)64 ClusterMgr::ClusterMgr(TransporterFacade & _facade):
65 theStop(0),
66 theFacade(_facade),
67 theArbitMgr(NULL),
68 m_connect_count(0),
69 m_max_api_reg_req_interval(~0),
70 noOfAliveNodes(0),
71 noOfConnectedNodes(0),
72 minDbVersion(0),
73 theClusterMgrThread(NULL),
74 waitingForHB(false),
75 m_cluster_state(CS_waiting_for_clean_cache)
76 {
77 DBUG_ENTER("ClusterMgr::ClusterMgr");
78 clusterMgrThreadMutex = NdbMutex_Create();
79 waitForHBCond= NdbCondition_Create();
80 m_auto_reconnect = -1;
81
82 Uint32 ret = this->open(&theFacade, API_CLUSTERMGR);
83 if (unlikely(ret == 0))
84 {
85 ndbout_c("Failed to register ClusterMgr! ret: %d", ret);
86 abort();
87 }
88 DBUG_VOID_RETURN;
89 }
90
~ClusterMgr()91 ClusterMgr::~ClusterMgr()
92 {
93 DBUG_ENTER("ClusterMgr::~ClusterMgr");
94 doStop();
95 if (theArbitMgr != 0)
96 {
97 delete theArbitMgr;
98 theArbitMgr = 0;
99 }
100 this->close(); // disconnect from TransporterFacade
101 NdbCondition_Destroy(waitForHBCond);
102 NdbMutex_Destroy(clusterMgrThreadMutex);
103 DBUG_VOID_RETURN;
104 }
105
106 void
configure(Uint32 nodeId,const ndb_mgm_configuration * config)107 ClusterMgr::configure(Uint32 nodeId,
108 const ndb_mgm_configuration* config)
109 {
110 ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
111 for(iter.first(); iter.valid(); iter.next()){
112 Uint32 nodeId = 0;
113 if(iter.get(CFG_NODE_ID, &nodeId))
114 continue;
115
116 // Check array bounds + don't allow node 0 to be touched
117 assert(nodeId > 0 && nodeId < MAX_NODES);
118 trp_node& theNode = theNodes[nodeId];
119 theNode.defined = true;
120
121 unsigned type;
122 if(iter.get(CFG_TYPE_OF_SECTION, &type))
123 continue;
124
125 switch(type){
126 case NODE_TYPE_DB:
127 theNode.m_info.m_type = NodeInfo::DB;
128 break;
129 case NODE_TYPE_API:
130 theNode.m_info.m_type = NodeInfo::API;
131 break;
132 case NODE_TYPE_MGM:
133 theNode.m_info.m_type = NodeInfo::MGM;
134 break;
135 default:
136 type = type;
137 break;
138 }
139 }
140
141 /* Mark all non existing nodes as not defined */
142 for(Uint32 i = 0; i<MAX_NODES; i++) {
143 if (iter.first())
144 continue;
145
146 if (iter.find(CFG_NODE_ID, i))
147 theNodes[i]= Node();
148 }
149
150 #if 0
151 print_nodes("init");
152 #endif
153
154 // Configure arbitrator
155 Uint32 rank = 0;
156 iter.first();
157 iter.find(CFG_NODE_ID, nodeId); // let not found in config mean rank=0
158 iter.get(CFG_NODE_ARBIT_RANK, &rank);
159
160 if (rank > 0)
161 {
162 // The arbitrator should be active
163 if (!theArbitMgr)
164 theArbitMgr = new ArbitMgr(* this);
165 theArbitMgr->setRank(rank);
166
167 Uint32 delay = 0;
168 iter.get(CFG_NODE_ARBIT_DELAY, &delay);
169 theArbitMgr->setDelay(delay);
170 }
171 else if (theArbitMgr)
172 {
173 // No arbitrator should be started
174 theArbitMgr->doStop(NULL);
175 delete theArbitMgr;
176 theArbitMgr= NULL;
177 }
178 }
179
180 void
startThread()181 ClusterMgr::startThread() {
182 Guard g(clusterMgrThreadMutex);
183
184 theStop = -1;
185 theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
186 (void**)this,
187 0, // default stack size
188 "ndb_clustermgr",
189 NDB_THREAD_PRIO_HIGH);
190 Uint32 cnt = 0;
191 while (theStop == -1 && cnt < 60)
192 {
193 NdbCondition_WaitTimeout(waitForHBCond, clusterMgrThreadMutex, 1000);
194 }
195
196 assert(theStop == 0);
197 }
198
199 void
doStop()200 ClusterMgr::doStop( ){
201 DBUG_ENTER("ClusterMgr::doStop");
202 {
203 Guard g(clusterMgrThreadMutex);
204 if(theStop == 1){
205 DBUG_VOID_RETURN;
206 }
207 }
208
209 void *status;
210 theStop = 1;
211 if (theClusterMgrThread) {
212 NdbThread_WaitFor(theClusterMgrThread, &status);
213 NdbThread_Destroy(&theClusterMgrThread);
214 }
215
216 if (theArbitMgr != NULL)
217 {
218 theArbitMgr->doStop(NULL);
219 }
220
221 DBUG_VOID_RETURN;
222 }
223
224 void
forceHB()225 ClusterMgr::forceHB()
226 {
227 theFacade.lock_mutex();
228
229 if(waitingForHB)
230 {
231 NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
232 theFacade.unlock_mutex();
233 return;
234 }
235
236 waitingForHB= true;
237
238 NodeBitmask ndb_nodes;
239 ndb_nodes.clear();
240 waitForHBFromNodes.clear();
241 for(Uint32 i = 1; i < MAX_NDB_NODES; i++)
242 {
243 const trp_node &node= getNodeInfo(i);
244 if(!node.defined)
245 continue;
246 if(node.m_info.getType() == NodeInfo::DB)
247 {
248 ndb_nodes.set(i);
249 waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
250 }
251 }
252 waitForHBFromNodes.bitAND(ndb_nodes);
253 theFacade.unlock_mutex();
254
255 #ifdef DEBUG_REG
256 char buf[128];
257 ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
258 #endif
259 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
260
261 signal.theVerId_signalNumber = GSN_API_REGREQ;
262 signal.theReceiversBlockNumber = QMGR;
263 signal.theTrace = 0;
264 signal.theLength = ApiRegReq::SignalLength;
265
266 ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
267 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
268 req->version = NDB_VERSION;
269 req->mysql_version = NDB_MYSQL_VERSION_D;
270
271 {
272 lock();
273 int nodeId= 0;
274 for(int i=0;
275 (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.find(i));
276 i= nodeId+1)
277 {
278 #ifdef DEBUG_REG
279 ndbout << "FORCE HB to " << nodeId << endl;
280 #endif
281 raw_sendSignal(&signal, nodeId);
282 }
283 unlock();
284 }
285 /* Wait for nodes to reply - if any heartbeats was sent */
286 theFacade.lock_mutex();
287 if (!waitForHBFromNodes.isclear())
288 NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
289
290 waitingForHB= false;
291 #ifdef DEBUG_REG
292 ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
293 #endif
294 theFacade.unlock_mutex();
295 }
296
297 void
startup()298 ClusterMgr::startup()
299 {
300 assert(theStop == -1);
301 Uint32 nodeId = getOwnNodeId();
302 Node & cm_node = theNodes[nodeId];
303 trp_node & theNode = cm_node;
304 assert(theNode.defined);
305
306 lock();
307 theFacade.doConnect(nodeId);
308 unlock();
309
310 for (Uint32 i = 0; i<3000; i++)
311 {
312 lock();
313 theFacade.theTransporterRegistry->update_connections();
314 unlock();
315 if (theNode.is_connected())
316 break;
317 NdbSleep_MilliSleep(20);
318 }
319
320 assert(theNode.is_connected());
321 Guard g(clusterMgrThreadMutex);
322 theStop = 0;
323 NdbCondition_Broadcast(waitForHBCond);
324 }
325
326 void
threadMain()327 ClusterMgr::threadMain()
328 {
329 startup();
330
331 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
332
333 signal.theVerId_signalNumber = GSN_API_REGREQ;
334 signal.theTrace = 0;
335 signal.theLength = ApiRegReq::SignalLength;
336
337 ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
338 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
339 req->version = NDB_VERSION;
340 req->mysql_version = NDB_MYSQL_VERSION_D;
341
342 NdbApiSignal nodeFail_signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
343 nodeFail_signal.theVerId_signalNumber = GSN_NODE_FAILREP;
344 nodeFail_signal.theReceiversBlockNumber = API_CLUSTERMGR;
345 nodeFail_signal.theTrace = 0;
346 nodeFail_signal.theLength = NodeFailRep::SignalLengthLong;
347
348 NDB_TICKS timeSlept = 100;
349 NDB_TICKS now = NdbTick_CurrentMillisecond();
350
351 while(!theStop)
352 {
353 /* Sleep at 100ms between each heartbeat check */
354 NDB_TICKS before = now;
355 for (Uint32 i = 0; i<10; i++)
356 {
357 NdbSleep_MilliSleep(10);
358 {
359 Guard g(clusterMgrThreadMutex);
360 /**
361 * Protect from ArbitMgr sending signals while we poll
362 */
363 start_poll();
364 do_poll(0);
365 complete_poll();
366 }
367 }
368 now = NdbTick_CurrentMillisecond();
369 timeSlept = (now - before);
370
371 if (m_cluster_state == CS_waiting_for_clean_cache &&
372 theFacade.m_globalDictCache)
373 {
374 if (!global_flag_skip_waiting_for_clean_cache)
375 {
376 theFacade.m_globalDictCache->lock();
377 unsigned sz= theFacade.m_globalDictCache->get_size();
378 theFacade.m_globalDictCache->unlock();
379 if (sz)
380 continue;
381 }
382 m_cluster_state = CS_waiting_for_first_connect;
383 }
384
385
386 NodeFailRep * nodeFailRep = CAST_PTR(NodeFailRep,
387 nodeFail_signal.getDataPtrSend());
388 nodeFailRep->noOfNodes = 0;
389 NodeBitmask::clear(nodeFailRep->theNodes);
390
391 trp_client::lock();
392 for (int i = 1; i < MAX_NODES; i++){
393 /**
394 * Send register request (heartbeat) to all available nodes
395 * at specified timing intervals
396 */
397 const NodeId nodeId = i;
398 // Check array bounds + don't allow node 0 to be touched
399 assert(nodeId > 0 && nodeId < MAX_NODES);
400 Node & cm_node = theNodes[nodeId];
401 trp_node & theNode = cm_node;
402
403 if (!theNode.defined)
404 continue;
405
406 if (theNode.is_connected() == false){
407 theFacade.doConnect(nodeId);
408 continue;
409 }
410
411 if (!theNode.compatible){
412 continue;
413 }
414
415 if (nodeId == getOwnNodeId() && theNode.is_confirmed())
416 {
417 /**
418 * Don't send HB to self more than once
419 * (once needed to avoid weird special cases in e.g ConfigManager)
420 */
421 continue;
422 }
423
424 cm_node.hbCounter += (Uint32)timeSlept;
425 if (cm_node.hbCounter >= m_max_api_reg_req_interval ||
426 cm_node.hbCounter >= cm_node.hbFrequency)
427 {
428 /**
429 * It is now time to send a new Heartbeat
430 */
431 if (cm_node.hbCounter >= cm_node.hbFrequency)
432 {
433 cm_node.hbMissed++;
434 cm_node.hbCounter = 0;
435 }
436
437 if (theNode.m_info.m_type != NodeInfo::DB)
438 signal.theReceiversBlockNumber = API_CLUSTERMGR;
439 else
440 signal.theReceiversBlockNumber = QMGR;
441
442 #ifdef DEBUG_REG
443 ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
444 #endif
445 raw_sendSignal(&signal, nodeId);
446 }//if
447
448 if (cm_node.hbMissed == 4 && cm_node.hbFrequency > 0)
449 {
450 nodeFailRep->noOfNodes++;
451 NodeBitmask::set(nodeFailRep->theNodes, nodeId);
452 }
453 }
454
455 if (nodeFailRep->noOfNodes)
456 {
457 raw_sendSignal(&nodeFail_signal, getOwnNodeId());
458 }
459 trp_client::unlock();
460 }
461 }
462
463 void
trp_deliver_signal(const NdbApiSignal * sig,const LinearSectionPtr ptr[3])464 ClusterMgr::trp_deliver_signal(const NdbApiSignal* sig,
465 const LinearSectionPtr ptr[3])
466 {
467 const Uint32 gsn = sig->theVerId_signalNumber;
468 const Uint32 * theData = sig->getDataPtr();
469
470 switch (gsn){
471 case GSN_API_REGREQ:
472 execAPI_REGREQ(theData);
473 break;
474
475 case GSN_API_REGCONF:
476 execAPI_REGCONF(sig, ptr);
477 break;
478
479 case GSN_API_REGREF:
480 execAPI_REGREF(theData);
481 break;
482
483 case GSN_NODE_FAILREP:
484 execNODE_FAILREP(sig, ptr);
485 break;
486
487 case GSN_NF_COMPLETEREP:
488 execNF_COMPLETEREP(sig, ptr);
489 break;
490 case GSN_ARBIT_STARTREQ:
491 if (theArbitMgr != NULL)
492 theArbitMgr->doStart(theData);
493 break;
494
495 case GSN_ARBIT_CHOOSEREQ:
496 if (theArbitMgr != NULL)
497 theArbitMgr->doChoose(theData);
498 break;
499
500 case GSN_ARBIT_STOPORD:
501 if(theArbitMgr != NULL)
502 theArbitMgr->doStop(theData);
503 break;
504
505 case GSN_ALTER_TABLE_REP:
506 {
507 if (theFacade.m_globalDictCache == NULL)
508 break;
509 const AlterTableRep* rep = (const AlterTableRep*)theData;
510 theFacade.m_globalDictCache->lock();
511 theFacade.m_globalDictCache->
512 alter_table_rep((const char*)ptr[0].p,
513 rep->tableId,
514 rep->tableVersion,
515 rep->changeType == AlterTableRep::CT_ALTERED);
516 theFacade.m_globalDictCache->unlock();
517 break;
518 }
519 case GSN_SUB_GCP_COMPLETE_REP:
520 {
521 /**
522 * Report
523 */
524 theFacade.for_each(this, sig, ptr);
525
526 /**
527 * Reply
528 */
529 {
530 BlockReference ownRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
531 NdbApiSignal tSignal(* sig);
532 Uint32* send= tSignal.getDataPtrSend();
533 memcpy(send, theData, tSignal.getLength() << 2);
534 CAST_PTR(SubGcpCompleteAck, send)->rep.senderRef = ownRef;
535 Uint32 ref= sig->theSendersBlockRef;
536 Uint32 aNodeId= refToNode(ref);
537 tSignal.theReceiversBlockNumber= refToBlock(ref);
538 tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
539 tSignal.theSendersBlockRef = API_CLUSTERMGR;
540 safe_sendSignal(&tSignal, aNodeId);
541 }
542 break;
543 }
544 case GSN_TAKE_OVERTCCONF:
545 {
546 /**
547 * Report
548 */
549 theFacade.for_each(this, sig, ptr);
550 return;
551 }
552 case GSN_CONNECT_REP:
553 {
554 execCONNECT_REP(sig, ptr);
555 return;
556 }
557 case GSN_DISCONNECT_REP:
558 {
559 execDISCONNECT_REP(sig, ptr);
560 return;
561 }
562 default:
563 break;
564
565 }
566 return;
567 }
568
Node()569 ClusterMgr::Node::Node()
570 : hbFrequency(0), hbCounter(0)
571 {
572 }
573
574 /**
575 * recalcMinDbVersion
576 *
577 * This method is called whenever the 'minimum DB node
578 * version' data for the connected DB nodes changes
579 * It calculates the minimum version of all the connected
580 * DB nodes.
581 * This information is cached by Ndb object instances.
582 * This information is useful when implementing API compatibility
583 * with older DB nodes
584 */
585 void
recalcMinDbVersion()586 ClusterMgr::recalcMinDbVersion()
587 {
588 Uint32 newMinDbVersion = ~ (Uint32) 0;
589
590 for (Uint32 i = 0; i < MAX_NODES; i++)
591 {
592 trp_node& node = theNodes[i];
593
594 if (node.is_connected() &&
595 node.is_confirmed() &&
596 node.m_info.getType() == NodeInfo::DB)
597 {
598 /* Include this node in the set of nodes used to
599 * compute the lowest current DB node version
600 */
601 assert(node.m_info.m_version);
602
603 if (node.minDbVersion < newMinDbVersion)
604 {
605 newMinDbVersion = node.minDbVersion;
606 }
607 }
608 }
609
610 /* Now update global min Db version if we have one.
611 * Otherwise set it to 0
612 */
613 newMinDbVersion = (newMinDbVersion == ~ (Uint32) 0) ?
614 0 :
615 newMinDbVersion;
616
617 //#ifdef DEBUG_MINVER
618
619 #ifdef DEBUG_MINVER
620 if (newMinDbVersion != minDbVersion)
621 {
622 ndbout << "Previous min Db node version was "
623 << NdbVersion(minDbVersion)
624 << " new min is "
625 << NdbVersion(newMinDbVersion)
626 << endl;
627 }
628 else
629 {
630 ndbout << "MinDbVersion recalculated, but is same : "
631 << NdbVersion(minDbVersion)
632 << endl;
633 }
634 #endif
635
636 minDbVersion = newMinDbVersion;
637 }
638
639 /******************************************************************************
640 * API_REGREQ and friends
641 ******************************************************************************/
642
643 void
execAPI_REGREQ(const Uint32 * theData)644 ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
645 const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
646 const NodeId nodeId = refToNode(apiRegReq->ref);
647
648 #ifdef DEBUG_REG
649 ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
650 #endif
651
652 assert(nodeId > 0 && nodeId < MAX_NODES);
653
654 Node & cm_node = theNodes[nodeId];
655 trp_node & node = cm_node;
656 assert(node.defined == true);
657 assert(node.is_connected() == true);
658
659 if(node.m_info.m_version != apiRegReq->version){
660 node.m_info.m_version = apiRegReq->version;
661 node.m_info.m_mysql_version = apiRegReq->mysql_version;
662 if (node.m_info.m_version < NDBD_SPLIT_VERSION)
663 node.m_info.m_mysql_version = 0;
664
665 if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
666 getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
667 node.compatible = false;
668 } else {
669 node.compatible = true;
670 }
671 }
672
673 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
674 signal.theVerId_signalNumber = GSN_API_REGCONF;
675 signal.theReceiversBlockNumber = API_CLUSTERMGR;
676 signal.theTrace = 0;
677 signal.theLength = ApiRegConf::SignalLength;
678
679 ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
680 conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
681 conf->version = NDB_VERSION;
682 conf->mysql_version = NDB_MYSQL_VERSION_D;
683 conf->apiHeartbeatFrequency = cm_node.hbFrequency;
684
685 conf->minDbVersion= 0;
686 conf->nodeState= node.m_state;
687
688 node.set_confirmed(true);
689 if (safe_sendSignal(&signal, nodeId) != 0)
690 node.set_confirmed(false);
691 }
692
693 void
execAPI_REGCONF(const NdbApiSignal * signal,const LinearSectionPtr ptr[])694 ClusterMgr::execAPI_REGCONF(const NdbApiSignal * signal,
695 const LinearSectionPtr ptr[])
696 {
697 const ApiRegConf * apiRegConf = CAST_CONSTPTR(ApiRegConf,
698 signal->getDataPtr());
699 const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
700
701 #ifdef DEBUG_REG
702 ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
703 #endif
704
705 assert(nodeId > 0 && nodeId < MAX_NODES);
706
707 Node & cm_node = theNodes[nodeId];
708 trp_node & node = cm_node;
709 assert(node.defined == true);
710 assert(node.is_connected() == true);
711
712 if(node.m_info.m_version != apiRegConf->version){
713 node.m_info.m_version = apiRegConf->version;
714 node.m_info.m_mysql_version = apiRegConf->mysql_version;
715 if (node.m_info.m_version < NDBD_SPLIT_VERSION)
716 node.m_info.m_mysql_version = 0;
717
718 if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM)
719 node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
720 node.m_info.m_version);
721 else
722 node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
723 node.m_info.m_version);
724 }
725
726 node.set_confirmed(true);
727
728 if (node.minDbVersion != apiRegConf->minDbVersion)
729 {
730 node.minDbVersion = apiRegConf->minDbVersion;
731 recalcMinDbVersion();
732 }
733
734 if (node.m_info.m_version >= NDBD_255_NODES_VERSION)
735 {
736 node.m_state = apiRegConf->nodeState;
737 }
738 else
739 {
740 /**
741 * from 2 to 8 words = 6 words diff, 6*4 = 24
742 */
743 memcpy(&node.m_state, &apiRegConf->nodeState, sizeof(node.m_state) - 24);
744 }
745
746 if (node.m_info.m_type == NodeInfo::DB)
747 {
748 /**
749 * Only set DB nodes to "alive"
750 */
751 if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED ||
752 node.m_state.getSingleUserMode()))
753 {
754 set_node_alive(node, true);
755 }
756 else
757 {
758 set_node_alive(node, false);
759 }
760 }
761
762 cm_node.hbMissed = 0;
763 cm_node.hbCounter = 0;
764 cm_node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
765
766 // Distribute signal to all threads/blocks
767 // TODO only if state changed...
768 theFacade.for_each(this, signal, ptr);
769
770 check_wait_for_hb(nodeId);
771 }
772
773 void
check_wait_for_hb(NodeId nodeId)774 ClusterMgr::check_wait_for_hb(NodeId nodeId)
775 {
776 if(waitingForHB)
777 {
778 waitForHBFromNodes.clear(nodeId);
779
780 if(waitForHBFromNodes.isclear())
781 {
782 waitingForHB= false;
783 NdbCondition_Broadcast(waitForHBCond);
784 }
785 }
786 return;
787 }
788
789
790 void
execAPI_REGREF(const Uint32 * theData)791 ClusterMgr::execAPI_REGREF(const Uint32 * theData){
792
793 ApiRegRef * ref = (ApiRegRef*)theData;
794
795 const NodeId nodeId = refToNode(ref->ref);
796
797 assert(nodeId > 0 && nodeId < MAX_NODES);
798
799 Node & cm_node = theNodes[nodeId];
800 trp_node & node = cm_node;
801
802 assert(node.is_connected() == true);
803 assert(node.defined == true);
804 /* Only DB nodes will send API_REGREF */
805 assert(node.m_info.getType() == NodeInfo::DB);
806
807 node.compatible = false;
808 set_node_alive(node, false);
809 node.m_state = NodeState::SL_NOTHING;
810 node.m_info.m_version = ref->version;
811
812 switch(ref->errorCode){
813 case ApiRegRef::WrongType:
814 ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
815 abort();
816 case ApiRegRef::UnsupportedVersion:
817 default:
818 break;
819 }
820
821 check_wait_for_hb(nodeId);
822 }
823
824 void
execNF_COMPLETEREP(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])825 ClusterMgr::execNF_COMPLETEREP(const NdbApiSignal* signal,
826 const LinearSectionPtr ptr[3])
827 {
828 const NFCompleteRep * nfComp = CAST_CONSTPTR(NFCompleteRep,
829 signal->getDataPtr());
830 const NodeId nodeId = nfComp->failedNodeId;
831 assert(nodeId > 0 && nodeId < MAX_NODES);
832
833 trp_node & node = theNodes[nodeId];
834 if (node.nfCompleteRep == false)
835 {
836 node.nfCompleteRep = true;
837 theFacade.for_each(this, signal, ptr);
838 }
839 }
840
841 void
reportConnected(NodeId nodeId)842 ClusterMgr::reportConnected(NodeId nodeId)
843 {
844 DBUG_ENTER("ClusterMgr::reportConnected");
845 DBUG_PRINT("info", ("nodeId: %u", nodeId));
846 /**
847 * Ensure that we are sending heartbeat every 100 ms
848 * until we have got the first reply from NDB providing
849 * us with the real time-out period to use.
850 */
851 assert(nodeId > 0 && nodeId < MAX_NODES);
852 if (nodeId == getOwnNodeId())
853 {
854 noOfConnectedNodes--; // Don't count self...
855 }
856
857 noOfConnectedNodes++;
858
859 Node & cm_node = theNodes[nodeId];
860 trp_node & theNode = cm_node;
861
862 cm_node.hbMissed = 0;
863 cm_node.hbCounter = 0;
864 cm_node.hbFrequency = 0;
865
866 assert(theNode.is_connected() == false);
867
868 /**
869 * make sure the node itself is marked connected even
870 * if first API_REGCONF has not arrived
871 */
872 theNode.set_connected(true);
873 theNode.m_state.m_connected_nodes.set(nodeId);
874 theNode.m_info.m_version = 0;
875 theNode.compatible = true;
876 theNode.nfCompleteRep = true;
877 theNode.m_node_fail_rep = false;
878 theNode.m_state.startLevel = NodeState::SL_NOTHING;
879 theNode.minDbVersion = 0;
880
881 /**
882 * We know that we have clusterMgrThreadMutex and trp_client::mutex
883 * but we don't know if we are polling...and for_each can
884 * only be used by a poller...
885 *
886 * Send signal to self, so that we can do this when receiving a signal
887 */
888 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
889 signal.theVerId_signalNumber = GSN_CONNECT_REP;
890 signal.theReceiversBlockNumber = API_CLUSTERMGR;
891 signal.theTrace = 0;
892 signal.theLength = 1;
893 signal.getDataPtrSend()[0] = nodeId;
894 raw_sendSignal(&signal, getOwnNodeId());
895 DBUG_VOID_RETURN;
896 }
897
898 void
execCONNECT_REP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])899 ClusterMgr::execCONNECT_REP(const NdbApiSignal* sig,
900 const LinearSectionPtr ptr[])
901 {
902 theFacade.for_each(this, sig, 0);
903 }
904
905 void
set_node_dead(trp_node & theNode)906 ClusterMgr::set_node_dead(trp_node& theNode)
907 {
908 set_node_alive(theNode, false);
909 theNode.set_confirmed(false);
910 theNode.m_state.m_connected_nodes.clear();
911 theNode.m_state.startLevel = NodeState::SL_NOTHING;
912 theNode.m_info.m_connectCount ++;
913 theNode.nfCompleteRep = false;
914 }
915
916 void
reportDisconnected(NodeId nodeId)917 ClusterMgr::reportDisconnected(NodeId nodeId)
918 {
919 assert(nodeId > 0 && nodeId < MAX_NODES);
920 assert(noOfConnectedNodes > 0);
921
922 /**
923 * We know that we have clusterMgrThreadMutex and trp_client::mutex
924 * but we don't know if we are polling...and for_each can
925 * only be used by a poller...
926 *
927 * Send signal to self, so that we can do this when receiving a signal
928 */
929 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
930 signal.theVerId_signalNumber = GSN_DISCONNECT_REP;
931 signal.theReceiversBlockNumber = API_CLUSTERMGR;
932 signal.theTrace = 0;
933 signal.theLength = DisconnectRep::SignalLength;
934
935 DisconnectRep * rep = CAST_PTR(DisconnectRep, signal.getDataPtrSend());
936 rep->nodeId = nodeId;
937 rep->err = 0;
938 raw_sendSignal(&signal, getOwnNodeId());
939 }
940
941 void
execDISCONNECT_REP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])942 ClusterMgr::execDISCONNECT_REP(const NdbApiSignal* sig,
943 const LinearSectionPtr ptr[])
944 {
945 const DisconnectRep * rep = CAST_CONSTPTR(DisconnectRep, sig->getDataPtr());
946 Uint32 nodeId = rep->nodeId;
947
948 assert(nodeId > 0 && nodeId < MAX_NODES);
949 Node & cm_node = theNodes[nodeId];
950 trp_node & theNode = cm_node;
951
952 bool node_failrep = theNode.m_node_fail_rep;
953 set_node_dead(theNode);
954 theNode.set_connected(false);
955
956 noOfConnectedNodes--;
957 if (noOfConnectedNodes == 0)
958 {
959 if (!global_flag_skip_invalidate_cache &&
960 theFacade.m_globalDictCache)
961 {
962 theFacade.m_globalDictCache->lock();
963 theFacade.m_globalDictCache->invalidate_all();
964 theFacade.m_globalDictCache->unlock();
965 m_connect_count ++;
966 m_cluster_state = CS_waiting_for_clean_cache;
967 }
968
969 if (m_auto_reconnect == 0)
970 {
971 theStop = 2;
972 }
973 }
974
975 if (node_failrep == false)
976 {
977 /**
978 * Inform API
979 */
980 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
981 signal.theVerId_signalNumber = GSN_NODE_FAILREP;
982 signal.theReceiversBlockNumber = API_CLUSTERMGR;
983 signal.theTrace = 0;
984 signal.theLength = NodeFailRep::SignalLengthLong;
985
986 NodeFailRep * rep = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
987 rep->failNo = 0;
988 rep->masterNodeId = 0;
989 rep->noOfNodes = 1;
990 NodeBitmask::clear(rep->theNodes);
991 NodeBitmask::set(rep->theNodes, nodeId);
992 execNODE_FAILREP(&signal, 0);
993 }
994 }
995
996 void
execNODE_FAILREP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])997 ClusterMgr::execNODE_FAILREP(const NdbApiSignal* sig,
998 const LinearSectionPtr ptr[])
999 {
1000 const NodeFailRep * rep = CAST_CONSTPTR(NodeFailRep, sig->getDataPtr());
1001
1002 NdbApiSignal signal(sig->theSendersBlockRef);
1003 signal.theVerId_signalNumber = GSN_NODE_FAILREP;
1004 signal.theReceiversBlockNumber = API_CLUSTERMGR;
1005 signal.theTrace = 0;
1006 signal.theLength = NodeFailRep::SignalLengthLong;
1007
1008 NodeFailRep * copy = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
1009 copy->failNo = 0;
1010 copy->masterNodeId = 0;
1011 copy->noOfNodes = 0;
1012 NodeBitmask::clear(copy->theNodes);
1013
1014 for (Uint32 i = NdbNodeBitmask::find_first(rep->theNodes);
1015 i != NdbNodeBitmask::NotFound;
1016 i = NdbNodeBitmask::find_next(rep->theNodes, i + 1))
1017 {
1018 Node & cm_node = theNodes[i];
1019 trp_node & theNode = cm_node;
1020
1021 bool node_failrep = theNode.m_node_fail_rep;
1022 bool connected = theNode.is_connected();
1023 set_node_dead(theNode);
1024
1025 if (node_failrep == false)
1026 {
1027 theNode.m_node_fail_rep = true;
1028 NodeBitmask::set(copy->theNodes, i);
1029 copy->noOfNodes++;
1030 }
1031
1032 if (connected)
1033 {
1034 theFacade.doDisconnect(i);
1035 }
1036 }
1037
1038 recalcMinDbVersion();
1039 if (copy->noOfNodes)
1040 {
1041 theFacade.for_each(this, &signal, 0); // report GSN_NODE_FAILREP
1042 }
1043
1044 if (noOfAliveNodes == 0)
1045 {
1046 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
1047 signal.theVerId_signalNumber = GSN_NF_COMPLETEREP;
1048 signal.theReceiversBlockNumber = 0;
1049 signal.theTrace = 0;
1050 signal.theLength = NFCompleteRep::SignalLength;
1051
1052 NFCompleteRep * rep = CAST_PTR(NFCompleteRep, signal.getDataPtrSend());
1053 rep->blockNo =0;
1054 rep->nodeId = getOwnNodeId();
1055 rep->unused = 0;
1056 rep->from = __LINE__;
1057
1058 for (Uint32 i = 1; i < MAX_NODES; i++)
1059 {
1060 trp_node& theNode = theNodes[i];
1061 if (theNode.defined && theNode.nfCompleteRep == false)
1062 {
1063 rep->failedNodeId = i;
1064 execNF_COMPLETEREP(&signal, 0);
1065 }
1066 }
1067 }
1068 }
1069
1070 void
print_nodes(const char * where,NdbOut & out)1071 ClusterMgr::print_nodes(const char* where, NdbOut& out)
1072 {
1073 out << where << " >>" << endl;
1074 for (NodeId n = 1; n < MAX_NODES ; n++)
1075 {
1076 const trp_node node = getNodeInfo(n);
1077 if (!node.defined)
1078 continue;
1079 out << "node: " << n << endl;
1080 out << " -";
1081 out << " connected: " << node.is_connected();
1082 out << ", compatible: " << node.compatible;
1083 out << ", nf_complete_rep: " << node.nfCompleteRep;
1084 out << ", alive: " << node.m_alive;
1085 out << ", confirmed: " << node.is_confirmed();
1086 out << endl;
1087
1088 out << " - " << node.m_info << endl;
1089 out << " - " << node.m_state << endl;
1090 }
1091 out << "<<" << endl;
1092 }
1093
1094
1095 /******************************************************************************
1096 * Arbitrator
1097 ******************************************************************************/
ArbitMgr(ClusterMgr & c)1098 ArbitMgr::ArbitMgr(ClusterMgr & c)
1099 : m_clusterMgr(c)
1100 {
1101 DBUG_ENTER("ArbitMgr::ArbitMgr");
1102
1103 theThreadMutex = NdbMutex_Create();
1104 theInputCond = NdbCondition_Create();
1105 theInputMutex = NdbMutex_Create();
1106
1107 theRank = 0;
1108 theDelay = 0;
1109 theThread = 0;
1110
1111 theInputTimeout = 0;
1112 theInputFull = false;
1113 memset(&theInputBuffer, 0, sizeof(theInputBuffer));
1114 theState = StateInit;
1115
1116 memset(&theStartReq, 0, sizeof(theStartReq));
1117 memset(&theChooseReq1, 0, sizeof(theChooseReq1));
1118 memset(&theChooseReq2, 0, sizeof(theChooseReq2));
1119 memset(&theStopOrd, 0, sizeof(theStopOrd));
1120
1121 DBUG_VOID_RETURN;
1122 }
1123
~ArbitMgr()1124 ArbitMgr::~ArbitMgr()
1125 {
1126 DBUG_ENTER("ArbitMgr::~ArbitMgr");
1127 NdbMutex_Destroy(theThreadMutex);
1128 NdbCondition_Destroy(theInputCond);
1129 NdbMutex_Destroy(theInputMutex);
1130 DBUG_VOID_RETURN;
1131 }
1132
1133 // Start arbitrator thread. This is kernel request.
1134 // First stop any previous thread since it is a left-over
1135 // which was never used and which now has wrong ticket.
1136 void
doStart(const Uint32 * theData)1137 ArbitMgr::doStart(const Uint32* theData)
1138 {
1139 ArbitSignal aSignal;
1140 NdbMutex_Lock(theThreadMutex);
1141 if (theThread != NULL) {
1142 aSignal.init(GSN_ARBIT_STOPORD, NULL);
1143 aSignal.data.code = StopRestart;
1144 sendSignalToThread(aSignal);
1145 void* value;
1146 NdbThread_WaitFor(theThread, &value);
1147 NdbThread_Destroy(&theThread);
1148 theState = StateInit;
1149 theInputFull = false;
1150 }
1151 aSignal.init(GSN_ARBIT_STARTREQ, theData);
1152 sendSignalToThread(aSignal);
1153 theThread = NdbThread_Create(
1154 runArbitMgr_C, (void**)this,
1155 0, // default stack size
1156 "ndb_arbitmgr",
1157 NDB_THREAD_PRIO_HIGH);
1158 NdbMutex_Unlock(theThreadMutex);
1159 }
1160
1161 // The "choose me" signal from a candidate.
1162 void
doChoose(const Uint32 * theData)1163 ArbitMgr::doChoose(const Uint32* theData)
1164 {
1165 ArbitSignal aSignal;
1166 aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
1167 sendSignalToThread(aSignal);
1168 }
1169
1170 // Stop arbitrator thread via stop signal from the kernel
1171 // or when exiting API program.
1172 void
doStop(const Uint32 * theData)1173 ArbitMgr::doStop(const Uint32* theData)
1174 {
1175 DBUG_ENTER("ArbitMgr::doStop");
1176 ArbitSignal aSignal;
1177 NdbMutex_Lock(theThreadMutex);
1178 if (theThread != NULL) {
1179 aSignal.init(GSN_ARBIT_STOPORD, theData);
1180 if (theData == 0) {
1181 aSignal.data.code = StopExit;
1182 } else {
1183 aSignal.data.code = StopRequest;
1184 }
1185 sendSignalToThread(aSignal);
1186 void* value;
1187 NdbThread_WaitFor(theThread, &value);
1188 NdbThread_Destroy(&theThread);
1189 theState = StateInit;
1190 }
1191 NdbMutex_Unlock(theThreadMutex);
1192 DBUG_VOID_RETURN;
1193 }
1194
1195 // private methods
1196
1197 extern "C"
1198 void*
runArbitMgr_C(void * me)1199 runArbitMgr_C(void* me)
1200 {
1201 ((ArbitMgr*) me)->threadMain();
1202 return NULL;
1203 }
1204
1205 void
sendSignalToThread(ArbitSignal & aSignal)1206 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
1207 {
1208 #ifdef DEBUG_ARBIT
1209 char buf[17] = "";
1210 ndbout << "arbit recv: ";
1211 ndbout << " gsn=" << aSignal.gsn;
1212 ndbout << " send=" << aSignal.data.sender;
1213 ndbout << " code=" << aSignal.data.code;
1214 ndbout << " node=" << aSignal.data.node;
1215 ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1216 ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1217 ndbout << endl;
1218 #endif
1219 aSignal.setTimestamp(); // signal arrival time
1220 NdbMutex_Lock(theInputMutex);
1221 while (theInputFull) {
1222 NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
1223 }
1224 theInputBuffer = aSignal;
1225 theInputFull = true;
1226 NdbCondition_Signal(theInputCond);
1227 NdbMutex_Unlock(theInputMutex);
1228 }
1229
1230 void
threadMain()1231 ArbitMgr::threadMain()
1232 {
1233 ArbitSignal aSignal;
1234 aSignal = theInputBuffer;
1235 threadStart(aSignal);
1236 bool stop = false;
1237 while (! stop) {
1238 NdbMutex_Lock(theInputMutex);
1239 while (! theInputFull) {
1240 NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
1241 threadTimeout();
1242 }
1243 aSignal = theInputBuffer;
1244 theInputFull = false;
1245 NdbCondition_Signal(theInputCond);
1246 NdbMutex_Unlock(theInputMutex);
1247 switch (aSignal.gsn) {
1248 case GSN_ARBIT_CHOOSEREQ:
1249 threadChoose(aSignal);
1250 break;
1251 case GSN_ARBIT_STOPORD:
1252 stop = true;
1253 break;
1254 }
1255 }
1256 threadStop(aSignal);
1257 }
1258
1259 // handle events in the thread
1260
1261 void
threadStart(ArbitSignal & aSignal)1262 ArbitMgr::threadStart(ArbitSignal& aSignal)
1263 {
1264 theStartReq = aSignal;
1265 sendStartConf(theStartReq, ArbitCode::ApiStart);
1266 theState = StateStarted;
1267 theInputTimeout = 1000;
1268 }
1269
1270 void
threadChoose(ArbitSignal & aSignal)1271 ArbitMgr::threadChoose(ArbitSignal& aSignal)
1272 {
1273 switch (theState) {
1274 case StateStarted: // first REQ
1275 if (! theStartReq.data.match(aSignal.data)) {
1276 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1277 break;
1278 }
1279 theChooseReq1 = aSignal;
1280 if (theDelay == 0) {
1281 sendChooseConf(aSignal, ArbitCode::WinChoose);
1282 theState = StateFinished;
1283 theInputTimeout = 1000;
1284 break;
1285 }
1286 theState = StateChoose1;
1287 theInputTimeout = 1;
1288 return;
1289 case StateChoose1: // second REQ within Delay
1290 if (! theStartReq.data.match(aSignal.data)) {
1291 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1292 break;
1293 }
1294 theChooseReq2 = aSignal;
1295 theState = StateChoose2;
1296 theInputTimeout = 1;
1297 return;
1298 case StateChoose2: // too many REQs - refuse all
1299 if (! theStartReq.data.match(aSignal.data)) {
1300 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1301 break;
1302 }
1303 sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
1304 sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
1305 sendChooseRef(aSignal, ArbitCode::ErrToomany);
1306 theState = StateFinished;
1307 theInputTimeout = 1000;
1308 return;
1309 default:
1310 sendChooseRef(aSignal, ArbitCode::ErrState);
1311 break;
1312 }
1313 }
1314
1315 void
threadTimeout()1316 ArbitMgr::threadTimeout()
1317 {
1318 switch (theState) {
1319 case StateStarted:
1320 break;
1321 case StateChoose1:
1322 if (theChooseReq1.getTimediff() < theDelay)
1323 break;
1324 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1325 theState = StateFinished;
1326 theInputTimeout = 1000;
1327 break;
1328 case StateChoose2:
1329 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1330 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1331 theState = StateFinished;
1332 theInputTimeout = 1000;
1333 break;
1334 default:
1335 break;
1336 }
1337 }
1338
1339 void
threadStop(ArbitSignal & aSignal)1340 ArbitMgr::threadStop(ArbitSignal& aSignal)
1341 {
1342 switch (aSignal.data.code) {
1343 case StopExit:
1344 switch (theState) {
1345 case StateStarted:
1346 sendStopRep(theStartReq, 0);
1347 break;
1348 case StateChoose1: // just in time
1349 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1350 break;
1351 case StateChoose2:
1352 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1353 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1354 break;
1355 case StateInit:
1356 case StateFinished:
1357 //??
1358 break;
1359 }
1360 break;
1361 case StopRequest:
1362 break;
1363 case StopRestart:
1364 break;
1365 }
1366 }
1367
1368 // output routines
1369
1370 void
sendStartConf(ArbitSignal & aSignal,Uint32 code)1371 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
1372 {
1373 ArbitSignal copySignal = aSignal;
1374 copySignal.gsn = GSN_ARBIT_STARTCONF;
1375 copySignal.data.code = code;
1376 sendSignalToQmgr(copySignal);
1377 }
1378
1379 void
sendChooseConf(ArbitSignal & aSignal,Uint32 code)1380 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
1381 {
1382 ArbitSignal copySignal = aSignal;
1383 copySignal.gsn = GSN_ARBIT_CHOOSECONF;
1384 copySignal.data.code = code;
1385 sendSignalToQmgr(copySignal);
1386 }
1387
1388 void
sendChooseRef(ArbitSignal & aSignal,Uint32 code)1389 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
1390 {
1391 ArbitSignal copySignal = aSignal;
1392 copySignal.gsn = GSN_ARBIT_CHOOSEREF;
1393 copySignal.data.code = code;
1394 sendSignalToQmgr(copySignal);
1395 }
1396
1397 void
sendStopRep(ArbitSignal & aSignal,Uint32 code)1398 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
1399 {
1400 ArbitSignal copySignal = aSignal;
1401 copySignal.gsn = GSN_ARBIT_STOPREP;
1402 copySignal.data.code = code;
1403 sendSignalToQmgr(copySignal);
1404 }
1405
1406 /**
1407 * Send signal to QMGR. The input includes signal number and
1408 * signal data. The signal data is normally a copy of a received
1409 * signal so it contains expected arbitrator node id and ticket.
1410 * The sender in signal data is the QMGR node id.
1411 */
1412 void
sendSignalToQmgr(ArbitSignal & aSignal)1413 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
1414 {
1415 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId()));
1416
1417 signal.theVerId_signalNumber = aSignal.gsn;
1418 signal.theReceiversBlockNumber = QMGR;
1419 signal.theTrace = 0;
1420 signal.theLength = ArbitSignalData::SignalLength;
1421
1422 ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
1423
1424 sd->sender = numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId());
1425 sd->code = aSignal.data.code;
1426 sd->node = aSignal.data.node;
1427 sd->ticket = aSignal.data.ticket;
1428 sd->mask = aSignal.data.mask;
1429
1430 #ifdef DEBUG_ARBIT
1431 char buf[17] = "";
1432 ndbout << "arbit send: ";
1433 ndbout << " gsn=" << aSignal.gsn;
1434 ndbout << " recv=" << aSignal.data.sender;
1435 ndbout << " code=" << aSignal.data.code;
1436 ndbout << " node=" << aSignal.data.node;
1437 ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1438 ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1439 ndbout << endl;
1440 #endif
1441
1442 {
1443 m_clusterMgr.lock();
1444 m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
1445 m_clusterMgr.unlock();
1446 }
1447 }
1448
1449