1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26 #include <ndb_limits.h>
27 #include <util/version.h>
28
29 #include "TransporterFacade.hpp"
30 #include <kernel/GlobalSignalNumbers.h>
31
32 #include "ClusterMgr.hpp"
33 #include <IPCConfig.hpp>
34 #include "NdbApiSignal.hpp"
35 #include <NdbSleep.h>
36 #include <NdbOut.hpp>
37 #include <NdbTick.h>
38
39
40 #include <signaldata/NodeFailRep.hpp>
41 #include <signaldata/NFCompleteRep.hpp>
42 #include <signaldata/ApiRegSignalData.hpp>
43 #include <signaldata/AlterTable.hpp>
44 #include <signaldata/SumaImpl.hpp>
45
46 #include <mgmapi.h>
47 #include <mgmapi_configuration.hpp>
48 #include <mgmapi_config_parameters.h>
49
50 int global_flag_skip_invalidate_cache = 0;
51 int global_flag_skip_waiting_for_clean_cache = 0;
52 //#define DEBUG_REG
53
54 // Just a C wrapper for threadMain
55 extern "C"
56 void*
runClusterMgr_C(void * me)57 runClusterMgr_C(void * me)
58 {
59 ((ClusterMgr*) me)->threadMain();
60
61 return NULL;
62 }
63
ClusterMgr(TransporterFacade & _facade)64 ClusterMgr::ClusterMgr(TransporterFacade & _facade):
65 theStop(0),
66 m_sent_API_REGREQ_to_myself(false),
67 theFacade(_facade),
68 theArbitMgr(NULL),
69 m_connect_count(0),
70 m_max_api_reg_req_interval(~0),
71 noOfAliveNodes(0),
72 noOfConnectedNodes(0),
73 noOfConnectedDBNodes(0),
74 minDbVersion(0),
75 theClusterMgrThread(NULL),
76 m_cluster_state(CS_waiting_for_clean_cache),
77 m_hbFrequency(0)
78 {
79 DBUG_ENTER("ClusterMgr::ClusterMgr");
80 clusterMgrThreadMutex = NdbMutex_Create();
81 waitForHBCond= NdbCondition_Create();
82 m_auto_reconnect = -1;
83
84 Uint32 ret = this->open(&theFacade, API_CLUSTERMGR);
85 if (unlikely(ret == 0))
86 {
87 ndbout_c("Failed to register ClusterMgr! ret: %d", ret);
88 abort();
89 }
90 DBUG_VOID_RETURN;
91 }
92
~ClusterMgr()93 ClusterMgr::~ClusterMgr()
94 {
95 DBUG_ENTER("ClusterMgr::~ClusterMgr");
96 assert(theStop == 1);
97 if (theArbitMgr != 0)
98 {
99 delete theArbitMgr;
100 theArbitMgr = 0;
101 }
102 NdbCondition_Destroy(waitForHBCond);
103 NdbMutex_Destroy(clusterMgrThreadMutex);
104 DBUG_VOID_RETURN;
105 }
106
107 /**
108 * This method is called from start of cluster connection instance and
109 * before we have started any socket services and thus it needs no
110 * mutex protection since the ClusterMgr object isn't known by any other
111 * thread at this point in time.
112 */
113 void
configure(Uint32 nodeId,const ndb_mgm_configuration * config)114 ClusterMgr::configure(Uint32 nodeId,
115 const ndb_mgm_configuration* config)
116 {
117 ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
118 for(iter.first(); iter.valid(); iter.next()){
119 Uint32 nodeId = 0;
120 if(iter.get(CFG_NODE_ID, &nodeId))
121 continue;
122
123 // Check array bounds + don't allow node 0 to be touched
124 assert(nodeId > 0 && nodeId < MAX_NODES);
125 trp_node& theNode = theNodes[nodeId];
126 theNode.defined = true;
127
128 unsigned type;
129 if(iter.get(CFG_TYPE_OF_SECTION, &type))
130 continue;
131
132 switch(type){
133 case NODE_TYPE_DB:
134 theNode.m_info.m_type = NodeInfo::DB;
135 break;
136 case NODE_TYPE_API:
137 theNode.m_info.m_type = NodeInfo::API;
138 break;
139 case NODE_TYPE_MGM:
140 theNode.m_info.m_type = NodeInfo::MGM;
141 break;
142 default:
143 break;
144 }
145 }
146
147 /* Mark all non existing nodes as not defined */
148 for(Uint32 i = 0; i<MAX_NODES; i++) {
149 if (iter.first())
150 continue;
151
152 if (iter.find(CFG_NODE_ID, i))
153 theNodes[i]= Node();
154 }
155
156 #if 0
157 print_nodes("init");
158 #endif
159
160 // Configure arbitrator
161 Uint32 rank = 0;
162 iter.first();
163 iter.find(CFG_NODE_ID, nodeId); // let not found in config mean rank=0
164 iter.get(CFG_NODE_ARBIT_RANK, &rank);
165
166 if (rank > 0)
167 {
168 // The arbitrator should be active
169 if (!theArbitMgr)
170 theArbitMgr = new ArbitMgr(* this);
171 theArbitMgr->setRank(rank);
172
173 Uint32 delay = 0;
174 iter.get(CFG_NODE_ARBIT_DELAY, &delay);
175 theArbitMgr->setDelay(delay);
176 }
177 else if (theArbitMgr)
178 {
179 // No arbitrator should be started
180 theArbitMgr->doStop(NULL);
181 delete theArbitMgr;
182 theArbitMgr= NULL;
183 }
184
185 // Configure heartbeats.
186 unsigned hbFrequency = 0;
187 iter.get(CFG_MGMD_MGMD_HEARTBEAT_INTERVAL, &hbFrequency);
188 m_hbFrequency = static_cast<Uint32>(hbFrequency);
189
190 // Configure max backoff time for connection attempts to first
191 // data node.
192 Uint32 backoff_max_time = 0;
193 iter.get(CFG_START_CONNECT_BACKOFF_MAX_TIME,
194 &backoff_max_time);
195 start_connect_backoff_max_time = backoff_max_time;
196
197 // Configure max backoff time for connection attempts to data
198 // nodes.
199 backoff_max_time = 0;
200 iter.get(CFG_CONNECT_BACKOFF_MAX_TIME, &backoff_max_time);
201 connect_backoff_max_time = backoff_max_time;
202
203 theFacade.get_registry()->set_connect_backoff_max_time_in_ms(
204 start_connect_backoff_max_time);
205 }
206
207 void
startThread()208 ClusterMgr::startThread()
209 {
210 /**
211 * We use the clusterMgrThreadMutex as a signalling object between this
212 * thread and the main thread of the ClusterMgr.
213 * The clusterMgrThreadMutex also protects the theStop-variable.
214 */
215 Guard g(clusterMgrThreadMutex);
216
217 theStop = -1;
218 theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
219 (void**)this,
220 0, // default stack size
221 "ndb_clustermgr",
222 NDB_THREAD_PRIO_HIGH);
223 Uint32 cnt = 0;
224 while (theStop == -1 && cnt < 60)
225 {
226 NdbCondition_WaitTimeout(waitForHBCond, clusterMgrThreadMutex, 1000);
227 }
228
229 assert(theStop == 0);
230 }
231
232 void
doStop()233 ClusterMgr::doStop( ){
234 DBUG_ENTER("ClusterMgr::doStop");
235 {
236 /* Ensure stop is only executed once */
237 Guard g(clusterMgrThreadMutex);
238 if(theStop == 1){
239 DBUG_VOID_RETURN;
240 }
241 theStop = 1;
242 }
243
244 void *status;
245 if (theClusterMgrThread) {
246 NdbThread_WaitFor(theClusterMgrThread, &status);
247 NdbThread_Destroy(&theClusterMgrThread);
248 }
249
250 if (theArbitMgr != NULL)
251 {
252 theArbitMgr->doStop(NULL);
253 }
254 {
255 /**
256 * Need protection against concurrent execution of do_poll in main
257 * thread. We cannot rely only on the trp_client lock since it is
258 * not supposed to be locked when calling close (it is locked as
259 * part of the close logic.
260 */
261 Guard g(clusterMgrThreadMutex);
262 this->close(); // disconnect from TransporterFacade
263 }
264
265 DBUG_VOID_RETURN;
266 }
267
268 void
startup()269 ClusterMgr::startup()
270 {
271 assert(theStop == -1);
272 Uint32 nodeId = getOwnNodeId();
273 Node & cm_node = theNodes[nodeId];
274 trp_node & theNode = cm_node;
275 assert(theNode.defined);
276
277 lock();
278 theFacade.doConnect(nodeId);
279 flush_send_buffers();
280 unlock();
281
282 for (Uint32 i = 0; i<3000; i++)
283 {
284 theFacade.request_connection_check();
285 start_poll();
286 do_poll(0);
287 complete_poll();
288
289 if (theNode.is_connected())
290 break;
291 NdbSleep_MilliSleep(20);
292 }
293
294 assert(theNode.is_connected());
295 Guard g(clusterMgrThreadMutex);
296 /* Signalling to creating thread that we are done with thread startup */
297 theStop = 0;
298 NdbCondition_Broadcast(waitForHBCond);
299 }
300
301 void
threadMain()302 ClusterMgr::threadMain()
303 {
304 startup();
305
306 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
307
308 signal.theVerId_signalNumber = GSN_API_REGREQ;
309 signal.theTrace = 0;
310 signal.theLength = ApiRegReq::SignalLength;
311
312 ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
313 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
314 req->version = NDB_VERSION;
315 req->mysql_version = NDB_MYSQL_VERSION_D;
316
317 NdbApiSignal nodeFail_signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
318 nodeFail_signal.theVerId_signalNumber = GSN_NODE_FAILREP;
319 nodeFail_signal.theReceiversBlockNumber = API_CLUSTERMGR;
320 nodeFail_signal.theTrace = 0;
321 nodeFail_signal.theLength = NodeFailRep::SignalLengthLong;
322
323 NDB_TICKS now = NdbTick_getCurrentTicks();
324
325 while(!theStop)
326 {
327 /* Sleep 1/5 of minHeartBeatInterval between each check */
328 const NDB_TICKS before = now;
329 for (Uint32 i = 0; i<5; i++)
330 {
331 NdbSleep_MilliSleep(minHeartBeatInterval/5);
332 {
333 /**
334 * start_poll does lock the trp_client and complete_poll
335 * releases this lock. This means that this protects
336 * against concurrent calls to send signals in ArbitMgr.
337 * We do however need to protect also against concurrent
338 * close in doStop, so to avoid this problem we need to
339 * also lock clusterMgrThreadMutex before we start the
340 * poll.
341 */
342 Guard g(clusterMgrThreadMutex);
343 start_poll();
344 do_poll(0);
345 complete_poll();
346 }
347 }
348 now = NdbTick_getCurrentTicks();
349 const Uint32 timeSlept = (Uint32)NdbTick_Elapsed(before, now).milliSec();
350
351 lock();
352 if (m_cluster_state == CS_waiting_for_clean_cache &&
353 theFacade.m_globalDictCache)
354 {
355 if (!global_flag_skip_waiting_for_clean_cache)
356 {
357 theFacade.m_globalDictCache->lock();
358 unsigned sz= theFacade.m_globalDictCache->get_size();
359 theFacade.m_globalDictCache->unlock();
360 if (sz)
361 {
362 unlock();
363 continue;
364 }
365 }
366 m_cluster_state = CS_waiting_for_first_connect;
367 }
368
369 NodeFailRep * nodeFailRep = CAST_PTR(NodeFailRep,
370 nodeFail_signal.getDataPtrSend());
371 nodeFailRep->noOfNodes = 0;
372 NodeBitmask::clear(nodeFailRep->theAllNodes);
373
374 for (int i = 1; i < MAX_NODES; i++)
375 {
376 /**
377 * Send register request (heartbeat) to all available nodes
378 * at specified timing intervals
379 */
380 const NodeId nodeId = i;
381 // Check array bounds + don't allow node 0 to be touched
382 assert(nodeId > 0 && nodeId < MAX_NODES);
383 Node & cm_node = theNodes[nodeId];
384 trp_node & theNode = cm_node;
385
386 if (!theNode.defined)
387 continue;
388
389 if (theNode.is_connected() == false){
390 theFacade.doConnect(nodeId);
391 continue;
392 }
393
394 if (!theNode.compatible){
395 continue;
396 }
397
398 if (nodeId == getOwnNodeId())
399 {
400 /**
401 * Don't send HB to self more than once
402 * (once needed to avoid weird special cases in e.g ConfigManager)
403 */
404 if (m_sent_API_REGREQ_to_myself)
405 {
406 continue;
407 }
408 }
409
410 cm_node.hbCounter += timeSlept;
411 if (cm_node.hbCounter >= m_max_api_reg_req_interval ||
412 cm_node.hbCounter >= cm_node.hbFrequency)
413 {
414 /**
415 * It is now time to send a new Heartbeat
416 */
417 if (cm_node.hbCounter >= cm_node.hbFrequency)
418 {
419 cm_node.hbMissed++;
420 cm_node.hbCounter = 0;
421 }
422
423 if (theNode.m_info.m_type != NodeInfo::DB)
424 signal.theReceiversBlockNumber = API_CLUSTERMGR;
425 else
426 signal.theReceiversBlockNumber = QMGR;
427
428 #ifdef DEBUG_REG
429 ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
430 #endif
431 if (nodeId == getOwnNodeId())
432 {
433 /* Set flag to ensure we only send once to ourself */
434 m_sent_API_REGREQ_to_myself = true;
435 }
436 raw_sendSignal(&signal, nodeId);
437 }//if
438
439 if (cm_node.hbMissed == 4 && cm_node.hbFrequency > 0)
440 {
441 nodeFailRep->noOfNodes++;
442 NodeBitmask::set(nodeFailRep->theAllNodes, nodeId);
443 }
444 }
445 flush_send_buffers();
446 unlock();
447
448 if (nodeFailRep->noOfNodes)
449 {
450 lock();
451 raw_sendSignal(&nodeFail_signal, getOwnNodeId());
452 flush_send_buffers();
453 unlock();
454 }
455 }
456 }
457
458 /**
459 * We're holding the trp_client lock while performing poll from
460 * ClusterMgr. So we always execute all the execSIGNAL-methods in
461 * ClusterMgr with protection other methods that use the trp_client
462 * lock (reportDisconnect, reportConnect, is_cluster_completely_unavailable,
463 * ArbitMgr (sendSignalToQmgr)).
464 */
465 void
trp_deliver_signal(const NdbApiSignal * sig,const LinearSectionPtr ptr[3])466 ClusterMgr::trp_deliver_signal(const NdbApiSignal* sig,
467 const LinearSectionPtr ptr[3])
468 {
469 const Uint32 gsn = sig->theVerId_signalNumber;
470 const Uint32 * theData = sig->getDataPtr();
471
472 switch (gsn){
473 case GSN_API_REGREQ:
474 execAPI_REGREQ(theData);
475 break;
476
477 case GSN_API_REGCONF:
478 execAPI_REGCONF(sig, ptr);
479 break;
480
481 case GSN_API_REGREF:
482 execAPI_REGREF(theData);
483 break;
484
485 case GSN_NODE_FAILREP:
486 execNODE_FAILREP(sig, ptr);
487 break;
488
489 case GSN_NF_COMPLETEREP:
490 execNF_COMPLETEREP(sig, ptr);
491 break;
492 case GSN_ARBIT_STARTREQ:
493 if (theArbitMgr != NULL)
494 theArbitMgr->doStart(theData);
495 break;
496
497 case GSN_ARBIT_CHOOSEREQ:
498 if (theArbitMgr != NULL)
499 theArbitMgr->doChoose(theData);
500 break;
501
502 case GSN_ARBIT_STOPORD:
503 if(theArbitMgr != NULL)
504 theArbitMgr->doStop(theData);
505 break;
506
507 case GSN_ALTER_TABLE_REP:
508 {
509 if (theFacade.m_globalDictCache == NULL)
510 break;
511 const AlterTableRep* rep = (const AlterTableRep*)theData;
512 theFacade.m_globalDictCache->lock();
513 theFacade.m_globalDictCache->
514 alter_table_rep((const char*)ptr[0].p,
515 rep->tableId,
516 rep->tableVersion,
517 rep->changeType == AlterTableRep::CT_ALTERED);
518 theFacade.m_globalDictCache->unlock();
519 break;
520 }
521 case GSN_SUB_GCP_COMPLETE_REP:
522 {
523 /**
524 * Report
525 */
526 theFacade.for_each(this, sig, ptr);
527
528 /**
529 * Reply
530 */
531 {
532 BlockReference ownRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
533 NdbApiSignal tSignal(* sig);
534 Uint32* send= tSignal.getDataPtrSend();
535 memcpy(send, theData, tSignal.getLength() << 2);
536 CAST_PTR(SubGcpCompleteAck, send)->rep.senderRef = ownRef;
537 Uint32 ref= sig->theSendersBlockRef;
538 Uint32 aNodeId= refToNode(ref);
539 tSignal.theReceiversBlockNumber= refToBlock(ref);
540 tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
541 tSignal.theSendersBlockRef = API_CLUSTERMGR;
542 safe_noflush_sendSignal(&tSignal, aNodeId);
543 }
544 break;
545 }
546 case GSN_TAKE_OVERTCCONF:
547 {
548 /**
549 * Report
550 */
551 theFacade.for_each(this, sig, ptr);
552 return;
553 }
554 case GSN_CONNECT_REP:
555 {
556 execCONNECT_REP(sig, ptr);
557 return;
558 }
559 case GSN_DISCONNECT_REP:
560 {
561 execDISCONNECT_REP(sig, ptr);
562 return;
563 }
564 case GSN_CLOSE_COMREQ:
565 {
566 theFacade.perform_close_clnt(this);
567 return;
568 }
569 case GSN_EXPAND_CLNT:
570 {
571 theFacade.expand_clnt();
572 return;
573 }
574 default:
575 break;
576
577 }
578 return;
579 }
580
Node()581 ClusterMgr::Node::Node()
582 : hbFrequency(0), hbCounter(0)
583 {
584 }
585
586 /**
587 * recalcMinDbVersion
588 *
589 * This method is called whenever the 'minimum DB node
590 * version' data for the connected DB nodes changes
591 * It calculates the minimum version of all the connected
592 * DB nodes.
593 * This information is cached by Ndb object instances.
594 * This information is useful when implementing API compatibility
595 * with older DB nodes
596 */
597 void
recalcMinDbVersion()598 ClusterMgr::recalcMinDbVersion()
599 {
600 Uint32 newMinDbVersion = ~ (Uint32) 0;
601
602 for (Uint32 i = 0; i < MAX_NODES; i++)
603 {
604 trp_node& node = theNodes[i];
605
606 if (node.is_connected() &&
607 node.is_confirmed() &&
608 node.m_info.getType() == NodeInfo::DB)
609 {
610 /* Include this node in the set of nodes used to
611 * compute the lowest current DB node version
612 */
613 assert(node.m_info.m_version);
614
615 if (node.minDbVersion < newMinDbVersion)
616 {
617 newMinDbVersion = node.minDbVersion;
618 }
619 }
620 }
621
622 /* Now update global min Db version if we have one.
623 * Otherwise set it to 0
624 */
625 newMinDbVersion = (newMinDbVersion == ~ (Uint32) 0) ?
626 0 :
627 newMinDbVersion;
628
629 //#ifdef DEBUG_MINVER
630
631 #ifdef DEBUG_MINVER
632 if (newMinDbVersion != minDbVersion)
633 {
634 ndbout << "Previous min Db node version was "
635 << NdbVersion(minDbVersion)
636 << " new min is "
637 << NdbVersion(newMinDbVersion)
638 << endl;
639 }
640 else
641 {
642 ndbout << "MinDbVersion recalculated, but is same : "
643 << NdbVersion(minDbVersion)
644 << endl;
645 }
646 #endif
647
648 minDbVersion = newMinDbVersion;
649 }
650
651 /******************************************************************************
652 * API_REGREQ and friends
653 ******************************************************************************/
654
655 void
execAPI_REGREQ(const Uint32 * theData)656 ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
657 const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
658 const NodeId nodeId = refToNode(apiRegReq->ref);
659
660 #ifdef DEBUG_REG
661 ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
662 #endif
663
664 assert(nodeId > 0 && nodeId < MAX_NODES);
665
666 Node & cm_node = theNodes[nodeId];
667 trp_node & node = cm_node;
668 assert(node.defined == true);
669 assert(node.is_connected() == true);
670
671 /*
672 API nodes send API_REGREQ once to themselves. Other than that, there are
673 no API-API heart beats.
674 */
675 assert(cm_node.m_info.m_type != NodeInfo::API ||
676 (nodeId == getOwnNodeId() &&
677 !cm_node.is_confirmed()));
678
679 if(node.m_info.m_version != apiRegReq->version){
680 node.m_info.m_version = apiRegReq->version;
681 node.m_info.m_mysql_version = apiRegReq->mysql_version;
682 if (node.m_info.m_version < NDBD_SPLIT_VERSION)
683 node.m_info.m_mysql_version = 0;
684
685 if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
686 getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
687 node.compatible = false;
688 } else {
689 node.compatible = true;
690 }
691 }
692
693 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
694 signal.theVerId_signalNumber = GSN_API_REGCONF;
695 signal.theReceiversBlockNumber = API_CLUSTERMGR;
696 signal.theTrace = 0;
697 signal.theLength = ApiRegConf::SignalLength;
698
699 ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
700 conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
701 conf->version = NDB_VERSION;
702 conf->mysql_version = NDB_MYSQL_VERSION_D;
703
704 /*
705 This is the frequency (in centiseonds) at which we want the other node
706 to send API_REGREQ messages.
707 */
708 conf->apiHeartbeatFrequency = m_hbFrequency/10;
709
710 conf->minDbVersion= 0;
711 conf->nodeState= node.m_state;
712
713 node.set_confirmed(true);
714 if (safe_sendSignal(&signal, nodeId) != 0)
715 node.set_confirmed(false);
716 }
717
718 void
execAPI_REGCONF(const NdbApiSignal * signal,const LinearSectionPtr ptr[])719 ClusterMgr::execAPI_REGCONF(const NdbApiSignal * signal,
720 const LinearSectionPtr ptr[])
721 {
722 const ApiRegConf * apiRegConf = CAST_CONSTPTR(ApiRegConf,
723 signal->getDataPtr());
724 const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
725
726 #ifdef DEBUG_REG
727 ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
728 #endif
729
730 assert(nodeId > 0 && nodeId < MAX_NODES);
731
732 Node & cm_node = theNodes[nodeId];
733 trp_node & node = cm_node;
734 assert(node.defined == true);
735 assert(node.is_connected() == true);
736
737 if(node.m_info.m_version != apiRegConf->version){
738 node.m_info.m_version = apiRegConf->version;
739 node.m_info.m_mysql_version = apiRegConf->mysql_version;
740 if (node.m_info.m_version < NDBD_SPLIT_VERSION)
741 node.m_info.m_mysql_version = 0;
742
743 if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM)
744 node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
745 node.m_info.m_version);
746 else
747 node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
748 node.m_info.m_version);
749 }
750
751 node.set_confirmed(true);
752
753 if (node.minDbVersion != apiRegConf->minDbVersion)
754 {
755 node.minDbVersion = apiRegConf->minDbVersion;
756 recalcMinDbVersion();
757 }
758
759 if (node.m_info.m_version >= NDBD_255_NODES_VERSION)
760 {
761 node.m_state = apiRegConf->nodeState;
762 }
763 else
764 {
765 /**
766 * from 2 to 8 words = 6 words diff, 6*4 = 24
767 */
768 memcpy(&node.m_state, &apiRegConf->nodeState, sizeof(node.m_state) - 24);
769 }
770
771 if (node.m_info.m_type == NodeInfo::DB)
772 {
773 /**
774 * Only set DB nodes to "alive"
775 */
776 if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED ||
777 node.m_state.getSingleUserMode()))
778 {
779 set_node_alive(node, true);
780 }
781 else
782 {
783 set_node_alive(node, false);
784 }
785 }
786
787 cm_node.hbMissed = 0;
788 cm_node.hbCounter = 0;
789 /*
790 By convention, conf->apiHeartbeatFrequency is in centiseconds rather than
791 milliseconds. See also Qmgr::sendApiRegConf().
792 */
793 const Int64 freq =
794 (static_cast<Int64>(apiRegConf->apiHeartbeatFrequency) * 10) - 50;
795
796 if (freq > UINT_MAX32)
797 {
798 // In case of overflow.
799 assert(false); /* Note this assert fails on some upgrades... */
800 cm_node.hbFrequency = UINT_MAX32;
801 }
802 else if (freq < minHeartBeatInterval)
803 {
804 /**
805 * We use minHeartBeatInterval as a lower limit. This also prevents
806 * against underflow.
807 */
808 cm_node.hbFrequency = minHeartBeatInterval;
809 }
810 else
811 {
812 cm_node.hbFrequency = static_cast<Uint32>(freq);
813 }
814
815 // If responding nodes indicates that it is connected to other
816 // nodes, that makes it probable that those nodes are alive and
817 // available also for this node.
818 for (int db_node_id = 1; db_node_id <= MAX_DATA_NODE_ID; db_node_id ++)
819 {
820 if (node.m_state.m_connected_nodes.get(db_node_id))
821 {
822 // Tell this nodes start clients thread that db_node_id
823 // is up and probable connectable.
824 theFacade.theTransporterRegistry->indicate_node_up(db_node_id);
825 }
826 }
827
828 // Distribute signal to all threads/blocks
829 // TODO only if state changed...
830 theFacade.for_each(this, signal, ptr);
831 }
832
833 void
execAPI_REGREF(const Uint32 * theData)834 ClusterMgr::execAPI_REGREF(const Uint32 * theData){
835
836 ApiRegRef * ref = (ApiRegRef*)theData;
837
838 const NodeId nodeId = refToNode(ref->ref);
839
840 assert(nodeId > 0 && nodeId < MAX_NODES);
841
842 Node & cm_node = theNodes[nodeId];
843 trp_node & node = cm_node;
844
845 assert(node.is_connected() == true);
846 assert(node.defined == true);
847 /* Only DB nodes will send API_REGREF */
848 assert(node.m_info.getType() == NodeInfo::DB);
849
850 node.compatible = false;
851 set_node_alive(node, false);
852 node.m_state = NodeState::SL_NOTHING;
853 node.m_info.m_version = ref->version;
854
855 switch(ref->errorCode){
856 case ApiRegRef::WrongType:
857 ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
858 abort();
859 case ApiRegRef::UnsupportedVersion:
860 default:
861 break;
862 }
863 }
864
865 void
execNF_COMPLETEREP(const NdbApiSignal * signal,const LinearSectionPtr ptr[3])866 ClusterMgr::execNF_COMPLETEREP(const NdbApiSignal* signal,
867 const LinearSectionPtr ptr[3])
868 {
869 const NFCompleteRep * nfComp = CAST_CONSTPTR(NFCompleteRep,
870 signal->getDataPtr());
871 const NodeId nodeId = nfComp->failedNodeId;
872 assert(nodeId > 0 && nodeId < MAX_NODES);
873
874 trp_node & node = theNodes[nodeId];
875 if (node.nfCompleteRep == false)
876 {
877 node.nfCompleteRep = true;
878 theFacade.for_each(this, signal, ptr);
879 }
880 }
881
882 /**
883 * This is called as a callback when executing update_connections which
884 * is always called with ownership of trp_client lock.
885 */
886 void
reportConnected(NodeId nodeId)887 ClusterMgr::reportConnected(NodeId nodeId)
888 {
889 DBUG_ENTER("ClusterMgr::reportConnected");
890 DBUG_PRINT("info", ("nodeId: %u", nodeId));
891 /**
892 * Ensure that we are sending heartbeat every 100 ms
893 * until we have got the first reply from NDB providing
894 * us with the real time-out period to use.
895 */
896 assert(nodeId > 0 && nodeId < MAX_NODES);
897 if (nodeId != getOwnNodeId())
898 {
899 noOfConnectedNodes++;
900 }
901
902 Node & cm_node = theNodes[nodeId];
903 trp_node & theNode = cm_node;
904
905 if (theNode.m_info.m_type == NodeInfo::DB)
906 {
907 noOfConnectedDBNodes++;
908 if (noOfConnectedDBNodes == 1)
909 {
910 // Data node connected, use ConnectBackoffMaxTime
911 theFacade.get_registry()->set_connect_backoff_max_time_in_ms(connect_backoff_max_time);
912 }
913 }
914
915 cm_node.hbMissed = 0;
916 cm_node.hbCounter = 0;
917 cm_node.hbFrequency = 0;
918
919 assert(theNode.is_connected() == false);
920
921 /**
922 * make sure the node itself is marked connected even
923 * if first API_REGCONF has not arrived
924 */
925 theNode.set_connected(true);
926 theNode.m_state.m_connected_nodes.set(nodeId);
927 theNode.m_info.m_version = 0;
928 theNode.compatible = true;
929 theNode.nfCompleteRep = true;
930 theNode.m_node_fail_rep = false;
931 theNode.m_state.startLevel = NodeState::SL_NOTHING;
932 theNode.minDbVersion = 0;
933
934 /**
935 * We know that we have clusterMgrThreadMutex and trp_client::mutex
936 * but we don't know if we are polling...and for_each can
937 * only be used by a poller...
938 *
939 * Send signal to self, so that we can do this when receiving a signal
940 */
941 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
942 signal.theVerId_signalNumber = GSN_CONNECT_REP;
943 signal.theReceiversBlockNumber = API_CLUSTERMGR;
944 signal.theTrace = 0;
945 signal.theLength = 1;
946 signal.getDataPtrSend()[0] = nodeId;
947 raw_sendSignal(&signal, getOwnNodeId());
948 DBUG_VOID_RETURN;
949 }
950
951 void
execCONNECT_REP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])952 ClusterMgr::execCONNECT_REP(const NdbApiSignal* sig,
953 const LinearSectionPtr ptr[])
954 {
955 theFacade.for_each(this, sig, 0);
956 }
957
958 void
set_node_dead(trp_node & theNode)959 ClusterMgr::set_node_dead(trp_node& theNode)
960 {
961 set_node_alive(theNode, false);
962 theNode.set_confirmed(false);
963 theNode.m_state.m_connected_nodes.clear();
964 theNode.m_state.startLevel = NodeState::SL_NOTHING;
965 theNode.m_info.m_connectCount ++;
966 theNode.nfCompleteRep = false;
967 }
968
969 void
reportDisconnected(NodeId nodeId)970 ClusterMgr::reportDisconnected(NodeId nodeId)
971 {
972 assert(nodeId > 0 && nodeId < MAX_NODES);
973 assert(noOfConnectedNodes > 0);
974
975 /**
976 * We know that we have trp_client lock
977 * but we don't know if we are polling...and for_each can
978 * only be used by a poller...
979 *
980 * Send signal to self, so that we can do this when receiving a signal
981 */
982 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
983 signal.theVerId_signalNumber = GSN_DISCONNECT_REP;
984 signal.theReceiversBlockNumber = API_CLUSTERMGR;
985 signal.theTrace = 0;
986 signal.theLength = DisconnectRep::SignalLength;
987
988 DisconnectRep * rep = CAST_PTR(DisconnectRep, signal.getDataPtrSend());
989 rep->nodeId = nodeId;
990 rep->err = 0;
991 raw_sendSignal(&signal, getOwnNodeId());
992 }
993
994 void
execDISCONNECT_REP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])995 ClusterMgr::execDISCONNECT_REP(const NdbApiSignal* sig,
996 const LinearSectionPtr ptr[])
997 {
998 const DisconnectRep * rep = CAST_CONSTPTR(DisconnectRep, sig->getDataPtr());
999 Uint32 nodeId = rep->nodeId;
1000
1001 assert(nodeId > 0 && nodeId < MAX_NODES);
1002 Node & cm_node = theNodes[nodeId];
1003 trp_node & theNode = cm_node;
1004
1005 bool node_failrep = theNode.m_node_fail_rep;
1006 set_node_dead(theNode);
1007 theNode.set_connected(false);
1008
1009 noOfConnectedNodes--;
1010 if (noOfConnectedNodes == 0)
1011 {
1012 if (!global_flag_skip_invalidate_cache &&
1013 theFacade.m_globalDictCache)
1014 {
1015 theFacade.m_globalDictCache->lock();
1016 theFacade.m_globalDictCache->invalidate_all();
1017 theFacade.m_globalDictCache->unlock();
1018 m_connect_count ++;
1019 m_cluster_state = CS_waiting_for_clean_cache;
1020 }
1021
1022 if (m_auto_reconnect == 0)
1023 {
1024 theStop = 2;
1025 }
1026 }
1027
1028 if (theNode.m_info.m_type == NodeInfo::DB)
1029 {
1030 assert(noOfConnectedDBNodes > 0);
1031 noOfConnectedDBNodes--;
1032 if (noOfConnectedDBNodes == 0)
1033 {
1034 // No data nodes connected, use StartConnectBackoffMaxTime
1035 theFacade.get_registry()->set_connect_backoff_max_time_in_ms(start_connect_backoff_max_time);
1036 }
1037 }
1038
1039 if (node_failrep == false)
1040 {
1041 /**
1042 * Inform API
1043 */
1044 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
1045 signal.theVerId_signalNumber = GSN_NODE_FAILREP;
1046 signal.theReceiversBlockNumber = API_CLUSTERMGR;
1047 signal.theTrace = 0;
1048 signal.theLength = NodeFailRep::SignalLengthLong;
1049
1050 NodeFailRep * rep = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
1051 rep->failNo = 0;
1052 rep->masterNodeId = 0;
1053 rep->noOfNodes = 1;
1054 NodeBitmask::clear(rep->theAllNodes);
1055 NodeBitmask::set(rep->theAllNodes, nodeId);
1056 execNODE_FAILREP(&signal, 0);
1057 }
1058 }
1059
1060 void
execNODE_FAILREP(const NdbApiSignal * sig,const LinearSectionPtr ptr[])1061 ClusterMgr::execNODE_FAILREP(const NdbApiSignal* sig,
1062 const LinearSectionPtr ptr[])
1063 {
1064 const NodeFailRep * rep = CAST_CONSTPTR(NodeFailRep, sig->getDataPtr());
1065 NodeBitmask mask;
1066 if (sig->getLength() == NodeFailRep::SignalLengthLong)
1067 {
1068 mask.assign(NodeBitmask::Size, rep->theAllNodes);
1069 }
1070 else
1071 {
1072 mask.assign(NdbNodeBitmask::Size, rep->theNodes);
1073 }
1074
1075 NdbApiSignal signal(sig->theSendersBlockRef);
1076 signal.theVerId_signalNumber = GSN_NODE_FAILREP;
1077 signal.theReceiversBlockNumber = API_CLUSTERMGR;
1078 signal.theTrace = 0;
1079 signal.theLength = NodeFailRep::SignalLengthLong;
1080
1081 NodeFailRep * copy = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
1082 copy->failNo = 0;
1083 copy->masterNodeId = 0;
1084 copy->noOfNodes = 0;
1085 NodeBitmask::clear(copy->theAllNodes);
1086
1087 for (Uint32 i = mask.find_first(); i != NodeBitmask::NotFound;
1088 i = mask.find_next(i + 1))
1089 {
1090 Node & cm_node = theNodes[i];
1091 trp_node & theNode = cm_node;
1092
1093 bool node_failrep = theNode.m_node_fail_rep;
1094 bool connected = theNode.is_connected();
1095 set_node_dead(theNode);
1096
1097 if (node_failrep == false)
1098 {
1099 theNode.m_node_fail_rep = true;
1100 NodeBitmask::set(copy->theAllNodes, i);
1101 copy->noOfNodes++;
1102 }
1103
1104 if (connected)
1105 {
1106 theFacade.doDisconnect(i);
1107 }
1108 }
1109
1110 recalcMinDbVersion();
1111 if (copy->noOfNodes)
1112 {
1113 theFacade.for_each(this, &signal, 0); // report GSN_NODE_FAILREP
1114 }
1115
1116 if (noOfAliveNodes == 0)
1117 {
1118 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
1119 signal.theVerId_signalNumber = GSN_NF_COMPLETEREP;
1120 signal.theReceiversBlockNumber = 0;
1121 signal.theTrace = 0;
1122 signal.theLength = NFCompleteRep::SignalLength;
1123
1124 NFCompleteRep * rep = CAST_PTR(NFCompleteRep, signal.getDataPtrSend());
1125 rep->blockNo =0;
1126 rep->nodeId = getOwnNodeId();
1127 rep->unused = 0;
1128 rep->from = __LINE__;
1129
1130 for (Uint32 i = 1; i < MAX_NODES; i++)
1131 {
1132 trp_node& theNode = theNodes[i];
1133 if (theNode.defined && theNode.nfCompleteRep == false)
1134 {
1135 rep->failedNodeId = i;
1136 execNF_COMPLETEREP(&signal, 0);
1137 }
1138 }
1139 }
1140 }
1141
1142 bool
is_cluster_completely_unavailable()1143 ClusterMgr::is_cluster_completely_unavailable()
1144 {
1145 bool ret_code = true;
1146
1147 /**
1148 * This method (and several other 'node state getters') allow
1149 * reading of theNodes[] from multiple block threads while
1150 * ClusterMgr concurrently updates them. Thus, a mutex should
1151 * have been expected here. See bug#20391191, and addendum patches
1152 * to bug#19524096, to understand what prevents us from locking (yet)
1153 */
1154 for (NodeId n = 1; n < MAX_NDB_NODES ; n++)
1155 {
1156 const trp_node& node = theNodes[n];
1157 if (!node.defined)
1158 {
1159 /**
1160 * Node isn't even part of configuration.
1161 */
1162 continue;
1163 }
1164 if (node.m_state.startLevel > NodeState::SL_STARTED)
1165 {
1166 /**
1167 * Node is stopping, so isn't available for any transactions,
1168 * so not available for us to use.
1169 */
1170 continue;
1171 }
1172 if (!node.compatible)
1173 {
1174 /**
1175 * The node isn't compatible with ours, so we can't use it
1176 */
1177 continue;
1178 }
1179 if (node.m_alive ||
1180 node.m_state.startLevel == NodeState::SL_STARTING ||
1181 node.m_state.startLevel == NodeState::SL_STARTED)
1182 {
1183 /**
1184 * We found a node that is either alive (less likely since we call this
1185 * method), or it is in state SL_STARTING which means that we were
1186 * allowed to connect, this means that we will very shortly be able to
1187 * use this connection. So this means that we know that the current
1188 * connection problem is a temporary issue and we can report a temporary
1189 * error instead of reporting 4009.
1190 *
1191 * We can deduce that the cluster isn't ready to be declared down
1192 * yet, we have a link to a starting node. We either very soon have
1193 * a working cluster, or we already have a working cluster but we
1194 * haven't yet the most up-to-date information about the cluster state.
1195 * So the cluster will soon be available again very likely, so
1196 * we can report a temporary error rather than an unknown error.
1197 */
1198 ret_code = false;
1199 break;
1200 }
1201 }
1202 return ret_code;
1203 }
1204
1205 void
print_nodes(const char * where,NdbOut & out)1206 ClusterMgr::print_nodes(const char* where, NdbOut& out)
1207 {
1208 out << where << " >>" << endl;
1209 for (NodeId n = 1; n < MAX_NODES ; n++)
1210 {
1211 const trp_node node = getNodeInfo(n);
1212 if (!node.defined)
1213 continue;
1214 out << "node: " << n << endl;
1215 out << " -";
1216 out << " connected: " << node.is_connected();
1217 out << ", compatible: " << node.compatible;
1218 out << ", nf_complete_rep: " << node.nfCompleteRep;
1219 out << ", alive: " << node.m_alive;
1220 out << ", confirmed: " << node.is_confirmed();
1221 out << endl;
1222
1223 out << " - " << node.m_info << endl;
1224 out << " - " << node.m_state << endl;
1225 }
1226 out << "<<" << endl;
1227 }
1228
1229
1230 /******************************************************************************
1231 * Arbitrator
1232 ******************************************************************************/
ArbitMgr(ClusterMgr & c)1233 ArbitMgr::ArbitMgr(ClusterMgr & c)
1234 : m_clusterMgr(c)
1235 {
1236 DBUG_ENTER("ArbitMgr::ArbitMgr");
1237
1238 theThreadMutex = NdbMutex_Create();
1239 theInputCond = NdbCondition_Create();
1240 theInputMutex = NdbMutex_Create();
1241
1242 theRank = 0;
1243 theDelay = 0;
1244 theThread = 0;
1245
1246 theInputTimeout = 0;
1247 theInputFull = false;
1248 memset(&theInputBuffer, 0, sizeof(theInputBuffer));
1249 theState = StateInit;
1250
1251 memset(&theStartReq, 0, sizeof(theStartReq));
1252 memset(&theChooseReq1, 0, sizeof(theChooseReq1));
1253 memset(&theChooseReq2, 0, sizeof(theChooseReq2));
1254 memset(&theStopOrd, 0, sizeof(theStopOrd));
1255
1256 DBUG_VOID_RETURN;
1257 }
1258
~ArbitMgr()1259 ArbitMgr::~ArbitMgr()
1260 {
1261 DBUG_ENTER("ArbitMgr::~ArbitMgr");
1262 NdbMutex_Destroy(theThreadMutex);
1263 NdbCondition_Destroy(theInputCond);
1264 NdbMutex_Destroy(theInputMutex);
1265 DBUG_VOID_RETURN;
1266 }
1267
1268 // Start arbitrator thread. This is kernel request.
1269 // First stop any previous thread since it is a left-over
1270 // which was never used and which now has wrong ticket.
1271 void
doStart(const Uint32 * theData)1272 ArbitMgr::doStart(const Uint32* theData)
1273 {
1274 ArbitSignal aSignal;
1275 NdbMutex_Lock(theThreadMutex);
1276 if (theThread != NULL) {
1277 aSignal.init(GSN_ARBIT_STOPORD, NULL);
1278 aSignal.data.code = StopRestart;
1279 sendSignalToThread(aSignal);
1280 void* value;
1281 NdbThread_WaitFor(theThread, &value);
1282 NdbThread_Destroy(&theThread);
1283 theState = StateInit;
1284 theInputFull = false;
1285 }
1286 aSignal.init(GSN_ARBIT_STARTREQ, theData);
1287 sendSignalToThread(aSignal);
1288 theThread = NdbThread_Create(
1289 runArbitMgr_C, (void**)this,
1290 0, // default stack size
1291 "ndb_arbitmgr",
1292 NDB_THREAD_PRIO_HIGH);
1293 NdbMutex_Unlock(theThreadMutex);
1294 }
1295
1296 // The "choose me" signal from a candidate.
1297 void
doChoose(const Uint32 * theData)1298 ArbitMgr::doChoose(const Uint32* theData)
1299 {
1300 ArbitSignal aSignal;
1301 aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
1302 sendSignalToThread(aSignal);
1303 }
1304
1305 // Stop arbitrator thread via stop signal from the kernel
1306 // or when exiting API program.
1307 void
doStop(const Uint32 * theData)1308 ArbitMgr::doStop(const Uint32* theData)
1309 {
1310 DBUG_ENTER("ArbitMgr::doStop");
1311 ArbitSignal aSignal;
1312 NdbMutex_Lock(theThreadMutex);
1313 if (theThread != NULL) {
1314 aSignal.init(GSN_ARBIT_STOPORD, theData);
1315 if (theData == 0) {
1316 aSignal.data.code = StopExit;
1317 } else {
1318 aSignal.data.code = StopRequest;
1319 }
1320 sendSignalToThread(aSignal);
1321 void* value;
1322 NdbThread_WaitFor(theThread, &value);
1323 NdbThread_Destroy(&theThread);
1324 theState = StateInit;
1325 }
1326 NdbMutex_Unlock(theThreadMutex);
1327 DBUG_VOID_RETURN;
1328 }
1329
1330 // private methods
1331
1332 extern "C"
1333 void*
runArbitMgr_C(void * me)1334 runArbitMgr_C(void* me)
1335 {
1336 ((ArbitMgr*) me)->threadMain();
1337 return NULL;
1338 }
1339
1340 void
sendSignalToThread(ArbitSignal & aSignal)1341 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
1342 {
1343 #ifdef DEBUG_ARBIT
1344 char buf[17] = "";
1345 ndbout << "arbit recv: ";
1346 ndbout << " gsn=" << aSignal.gsn;
1347 ndbout << " send=" << aSignal.data.sender;
1348 ndbout << " code=" << aSignal.data.code;
1349 ndbout << " node=" << aSignal.data.node;
1350 ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1351 ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1352 ndbout << endl;
1353 #endif
1354 aSignal.setTimestamp(); // signal arrival time
1355 NdbMutex_Lock(theInputMutex);
1356 while (theInputFull) {
1357 NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
1358 }
1359 theInputBuffer = aSignal;
1360 theInputFull = true;
1361 NdbCondition_Signal(theInputCond);
1362 NdbMutex_Unlock(theInputMutex);
1363 }
1364
1365 void
threadMain()1366 ArbitMgr::threadMain()
1367 {
1368 ArbitSignal aSignal;
1369 aSignal = theInputBuffer;
1370 threadStart(aSignal);
1371 bool stop = false;
1372 while (! stop) {
1373 NdbMutex_Lock(theInputMutex);
1374 while (! theInputFull) {
1375 NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
1376 threadTimeout();
1377 }
1378 aSignal = theInputBuffer;
1379 theInputFull = false;
1380 NdbCondition_Signal(theInputCond);
1381 NdbMutex_Unlock(theInputMutex);
1382 switch (aSignal.gsn) {
1383 case GSN_ARBIT_CHOOSEREQ:
1384 threadChoose(aSignal);
1385 break;
1386 case GSN_ARBIT_STOPORD:
1387 stop = true;
1388 break;
1389 }
1390 }
1391 threadStop(aSignal);
1392 }
1393
1394 // handle events in the thread
1395
1396 void
threadStart(ArbitSignal & aSignal)1397 ArbitMgr::threadStart(ArbitSignal& aSignal)
1398 {
1399 theStartReq = aSignal;
1400 sendStartConf(theStartReq, ArbitCode::ApiStart);
1401 theState = StateStarted;
1402 theInputTimeout = 1000;
1403 }
1404
1405 void
threadChoose(ArbitSignal & aSignal)1406 ArbitMgr::threadChoose(ArbitSignal& aSignal)
1407 {
1408 switch (theState) {
1409 case StateStarted: // first REQ
1410 if (! theStartReq.data.match(aSignal.data)) {
1411 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1412 break;
1413 }
1414 theChooseReq1 = aSignal;
1415 if (theDelay == 0) {
1416 sendChooseConf(aSignal, ArbitCode::WinChoose);
1417 theState = StateFinished;
1418 theInputTimeout = 1000;
1419 break;
1420 }
1421 theState = StateChoose1;
1422 theInputTimeout = 1;
1423 return;
1424 case StateChoose1: // second REQ within Delay
1425 if (! theStartReq.data.match(aSignal.data)) {
1426 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1427 break;
1428 }
1429 theChooseReq2 = aSignal;
1430 theState = StateChoose2;
1431 theInputTimeout = 1;
1432 return;
1433 case StateChoose2: // too many REQs - refuse all
1434 if (! theStartReq.data.match(aSignal.data)) {
1435 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1436 break;
1437 }
1438 sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
1439 sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
1440 sendChooseRef(aSignal, ArbitCode::ErrToomany);
1441 theState = StateFinished;
1442 theInputTimeout = 1000;
1443 return;
1444 default:
1445 sendChooseRef(aSignal, ArbitCode::ErrState);
1446 break;
1447 }
1448 }
1449
1450 void
threadTimeout()1451 ArbitMgr::threadTimeout()
1452 {
1453 switch (theState) {
1454 case StateStarted:
1455 break;
1456 case StateChoose1:
1457 if (theChooseReq1.getTimediff() < theDelay)
1458 break;
1459 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1460 theState = StateFinished;
1461 theInputTimeout = 1000;
1462 break;
1463 case StateChoose2:
1464 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1465 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1466 theState = StateFinished;
1467 theInputTimeout = 1000;
1468 break;
1469 default:
1470 break;
1471 }
1472 }
1473
1474 void
threadStop(ArbitSignal & aSignal)1475 ArbitMgr::threadStop(ArbitSignal& aSignal)
1476 {
1477 switch (aSignal.data.code) {
1478 case StopExit:
1479 switch (theState) {
1480 case StateStarted:
1481 sendStopRep(theStartReq, 0);
1482 break;
1483 case StateChoose1: // just in time
1484 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1485 break;
1486 case StateChoose2:
1487 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1488 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1489 break;
1490 case StateInit:
1491 case StateFinished:
1492 //??
1493 break;
1494 }
1495 break;
1496 case StopRequest:
1497 break;
1498 case StopRestart:
1499 break;
1500 }
1501 }
1502
1503 // output routines
1504
1505 void
sendStartConf(ArbitSignal & aSignal,Uint32 code)1506 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
1507 {
1508 ArbitSignal copySignal = aSignal;
1509 copySignal.gsn = GSN_ARBIT_STARTCONF;
1510 copySignal.data.code = code;
1511 sendSignalToQmgr(copySignal);
1512 }
1513
1514 void
sendChooseConf(ArbitSignal & aSignal,Uint32 code)1515 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
1516 {
1517 ArbitSignal copySignal = aSignal;
1518 copySignal.gsn = GSN_ARBIT_CHOOSECONF;
1519 copySignal.data.code = code;
1520 sendSignalToQmgr(copySignal);
1521 }
1522
1523 void
sendChooseRef(ArbitSignal & aSignal,Uint32 code)1524 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
1525 {
1526 ArbitSignal copySignal = aSignal;
1527 copySignal.gsn = GSN_ARBIT_CHOOSEREF;
1528 copySignal.data.code = code;
1529 sendSignalToQmgr(copySignal);
1530 }
1531
1532 void
sendStopRep(ArbitSignal & aSignal,Uint32 code)1533 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
1534 {
1535 ArbitSignal copySignal = aSignal;
1536 copySignal.gsn = GSN_ARBIT_STOPREP;
1537 copySignal.data.code = code;
1538 sendSignalToQmgr(copySignal);
1539 }
1540
1541 /**
1542 * Send signal to QMGR. The input includes signal number and
1543 * signal data. The signal data is normally a copy of a received
1544 * signal so it contains expected arbitrator node id and ticket.
1545 * The sender in signal data is the QMGR node id.
1546 */
1547 void
sendSignalToQmgr(ArbitSignal & aSignal)1548 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
1549 {
1550 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId()));
1551
1552 signal.theVerId_signalNumber = aSignal.gsn;
1553 signal.theReceiversBlockNumber = QMGR;
1554 signal.theTrace = 0;
1555 signal.theLength = ArbitSignalData::SignalLength;
1556
1557 ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
1558
1559 sd->sender = numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId());
1560 sd->code = aSignal.data.code;
1561 sd->node = aSignal.data.node;
1562 sd->ticket = aSignal.data.ticket;
1563 sd->mask = aSignal.data.mask;
1564
1565 #ifdef DEBUG_ARBIT
1566 char buf[17] = "";
1567 ndbout << "arbit send: ";
1568 ndbout << " gsn=" << aSignal.gsn;
1569 ndbout << " recv=" << aSignal.data.sender;
1570 ndbout << " code=" << aSignal.data.code;
1571 ndbout << " node=" << aSignal.data.node;
1572 ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
1573 ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
1574 ndbout << endl;
1575 #endif
1576
1577 {
1578 m_clusterMgr.lock();
1579 m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
1580 m_clusterMgr.flush_send_buffers();
1581 m_clusterMgr.unlock();
1582 }
1583 }
1584