1 /*
2 Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26 #include <ndb_limits.h>
27 #include "TransporterFacade.hpp"
28 #include "trp_client.hpp"
29 #include "ClusterMgr.hpp"
30 #include <IPCConfig.hpp>
31 #include <TransporterCallback.hpp>
32 #include <TransporterRegistry.hpp>
33 #include "NdbApiSignal.hpp"
34 #include "NdbWaiter.hpp"
35 #include <NdbOut.hpp>
36 #include <NdbEnv.h>
37 #include <NdbSleep.h>
38
39 #include <kernel/GlobalSignalNumbers.h>
40 #include <mgmapi_config_parameters.h>
41 #include <mgmapi_configuration.hpp>
42 #include <NdbConfig.h>
43 #include <ndb_version.h>
44 #include <SignalLoggerManager.hpp>
45 #include <kernel/ndb_limits.h>
46 #include <signaldata/AlterTable.hpp>
47 #include <signaldata/SumaImpl.hpp>
48 #include <signaldata/AllocNodeId.hpp>
49
50 //#define REPORT_TRANSPORTER
51 //#define API_TRACE
52
numberToIndex(int number)53 static int numberToIndex(int number)
54 {
55 return number - MIN_API_BLOCK_NO;
56 }
57
indexToNumber(int index)58 static int indexToNumber(int index)
59 {
60 return index + MIN_API_BLOCK_NO;
61 }
62
63 #if defined DEBUG_TRANSPORTER
64 #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
65 #else
66 #define TRP_DEBUG(t)
67 #endif
68
69 /*****************************************************************************
70 * Call back functions
71 *****************************************************************************/
72 void
reportError(NodeId nodeId,TransporterError errorCode,const char * info)73 TransporterFacade::reportError(NodeId nodeId,
74 TransporterError errorCode, const char *info)
75 {
76 #ifdef REPORT_TRANSPORTER
77 ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s",
78 (int)nodeId, (int)errorCode, info ? info : "");
79 #endif
80 if(errorCode & TE_DO_DISCONNECT) {
81 ndbout_c("reportError (%d, %d) %s", (int)nodeId, (int)errorCode,
82 info ? info : "");
83 doDisconnect(nodeId);
84 }
85 }
86
87 /**
88 * Report average send length in bytes (4096 last sends)
89 */
90 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)91 TransporterFacade::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
92 {
93 #ifdef REPORT_TRANSPORTER
94 ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)",
95 (int)nodeId, (Uint32)(bytes/count));
96 #endif
97 (void)nodeId;
98 (void)count;
99 (void)bytes;
100 }
101
102 /**
103 * Report average receive length in bytes (4096 last receives)
104 */
105 void
reportReceiveLen(NodeId nodeId,Uint32 count,Uint64 bytes)106 TransporterFacade::reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes)
107 {
108 #ifdef REPORT_TRANSPORTER
109 ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)",
110 (int)nodeId, (Uint32)(bytes/count));
111 #endif
112 (void)nodeId;
113 (void)count;
114 (void)bytes;
115 }
116
117 /**
118 * Report connection established
119 */
120 void
reportConnect(NodeId nodeId)121 TransporterFacade::reportConnect(NodeId nodeId)
122 {
123 #ifdef REPORT_TRANSPORTER
124 ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
125 #endif
126 reportConnected(nodeId);
127 }
128
129 /**
130 * Report connection broken
131 */
132 void
reportDisconnect(NodeId nodeId,Uint32 error)133 TransporterFacade::reportDisconnect(NodeId nodeId, Uint32 error){
134 #ifdef REPORT_TRANSPORTER
135 ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
136 #endif
137 reportDisconnected(nodeId);
138 }
139
140 void
transporter_recv_from(NodeId nodeId)141 TransporterFacade::transporter_recv_from(NodeId nodeId)
142 {
143 hb_received(nodeId);
144 }
145
146 /****************************************************************************
147 *
148 *****************************************************************************/
149
150 /**
151 * Report connection broken
152 */
153 int
checkJobBuffer()154 TransporterFacade::checkJobBuffer()
155 {
156 return 0;
157 }
158
159 #ifdef API_TRACE
160 static const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";
161 static const char * apiSignalLog = 0;
162 static SignalLoggerManager signalLogger;
163 static
164 inline
165 bool
setSignalLog()166 setSignalLog(){
167 signalLogger.flushSignalLog();
168
169 const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);
170 if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
171 return true;
172 } else if(tmp == 0 && apiSignalLog == 0){
173 return false;
174 } else if(tmp == 0 && apiSignalLog != 0){
175 signalLogger.setOutputStream(0);
176 apiSignalLog = tmp;
177 return false;
178 } else if(tmp !=0){
179 if (strcmp(tmp, "-") == 0)
180 signalLogger.setOutputStream(stdout);
181 #ifndef DBUG_OFF
182 else if (strcmp(tmp, "+") == 0)
183 signalLogger.setOutputStream(DBUG_FILE);
184 #endif
185 else
186 signalLogger.setOutputStream(fopen(tmp, "w"));
187 apiSignalLog = tmp;
188 return true;
189 }
190 return false;
191 }
192 inline
193 bool
TRACE_GSN(Uint32 gsn)194 TRACE_GSN(Uint32 gsn)
195 {
196 switch(gsn){
197 #ifndef TRACE_APIREGREQ
198 case GSN_API_REGREQ:
199 case GSN_API_REGCONF:
200 return false;
201 #endif
202 #if 1
203 case GSN_SUB_GCP_COMPLETE_REP:
204 case GSN_SUB_GCP_COMPLETE_ACK:
205 return false;
206 #endif
207 default:
208 return true;
209 }
210 }
211 #endif
212
213 /**
214 * The execute function : Handle received signal
215 */
216 void
deliver_signal(SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])217 TransporterFacade::deliver_signal(SignalHeader * const header,
218 Uint8 prio, Uint32 * const theData,
219 LinearSectionPtr ptr[3])
220 {
221 Uint32 tRecBlockNo = header->theReceiversBlockNumber;
222
223 #ifdef API_TRACE
224 if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
225 signalLogger.executeSignal(* header,
226 prio,
227 theData,
228 ownId(),
229 ptr, header->m_noOfSections);
230 signalLogger.flushSignalLog();
231 }
232 #endif
233
234 if (tRecBlockNo >= MIN_API_BLOCK_NO)
235 {
236 trp_client * clnt = m_threads.get(tRecBlockNo);
237 if (clnt != 0)
238 {
239 /**
240 * Handle received signal immediately to avoid any unnecessary
241 * copying of data, allocation of memory and other things. Copying
242 * of data could be interesting to support several priority levels
243 * and to support a special memory structure when executing the
244 * signals. Neither of those are interesting when receiving data
245 * in the NDBAPI. The NDBAPI will thus read signal data directly as
246 * it was written by the sender (SCI sender is other node, Shared
247 * memory sender is other process and TCP/IP sender is the OS that
248 * writes the TCP/IP message into a message buffer).
249 */
250 NdbApiSignal tmpSignal(*header);
251 NdbApiSignal * tSignal = &tmpSignal;
252 tSignal->setDataPtr(theData);
253 clnt->trp_deliver_signal(tSignal, ptr);
254 }//if
255 }
256 else if (tRecBlockNo == API_PACKED)
257 {
258 /**
259 * Block number == 2047 is used to signal a signal that consists of
260 * multiple instances of the same signal. This is an effort to
261 * package the signals so as to avoid unnecessary communication
262 * overhead since TCP/IP has a great performance impact.
263 */
264 Uint32 Tlength = header->theLength;
265 Uint32 Tsent = 0;
266 /**
267 * Since it contains at least two data packets we will first
268 * copy the signal data to safe place.
269 */
270 while (Tsent < Tlength) {
271 Uint32 Theader = theData[Tsent];
272 Tsent++;
273 Uint32 TpacketLen = (Theader & 0x1F) + 3;
274 tRecBlockNo = Theader >> 16;
275 if (TpacketLen <= 25)
276 {
277 if ((TpacketLen + Tsent) <= Tlength)
278 {
279 /**
280 * Set the data length of the signal and the receivers block
281 * reference and then call the API.
282 */
283 header->theLength = TpacketLen;
284 header->theReceiversBlockNumber = tRecBlockNo;
285 Uint32* tDataPtr = &theData[Tsent];
286 Tsent += TpacketLen;
287 if (tRecBlockNo >= MIN_API_BLOCK_NO)
288 {
289 trp_client * clnt = m_threads.get(tRecBlockNo);
290 if(clnt != 0)
291 {
292 NdbApiSignal tmpSignal(*header);
293 NdbApiSignal * tSignal = &tmpSignal;
294 tSignal->setDataPtr(tDataPtr);
295 clnt->trp_deliver_signal(tSignal, 0);
296 }
297 }
298 }
299 }
300 }
301 return;
302 }
303 else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
304 tRecBlockNo <= MAX_API_FIXED_BLOCK_NO)
305 {
306 Uint32 dynamic= m_fixed2dynamic[tRecBlockNo - MIN_API_FIXED_BLOCK_NO];
307 trp_client * clnt = m_threads.get(dynamic);
308 if (clnt != 0)
309 {
310 NdbApiSignal tmpSignal(*header);
311 NdbApiSignal * tSignal = &tmpSignal;
312 tSignal->setDataPtr(theData);
313 clnt->trp_deliver_signal(tSignal, ptr);
314 }//if
315 }
316 else
317 {
318 // Ignore all other block numbers.
319 if(header->theVerId_signalNumber != GSN_API_REGREQ)
320 {
321 TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
322 ndbout << "BLOCK NO: " << tRecBlockNo << " sig "
323 << header->theVerId_signalNumber << endl;
324 abort();
325 }
326 }
327 }
328
329 // These symbols are needed, but not used in the API
330 void
printSegmentedSection(FILE *,const SignalHeader &,const SegmentedSectionPtr ptr[3],unsigned i)331 SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &,
332 const SegmentedSectionPtr ptr[3],
333 unsigned i){
334 abort();
335 }
336
337 void
copy(Uint32 * & insertPtr,class SectionSegmentPool & thePool,const SegmentedSectionPtr & _ptr)338 copy(Uint32 * & insertPtr,
339 class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
340 abort();
341 }
342
343 /**
344 * Note that this function needs no locking since it is
345 * only called from the constructor of Ndb (the NdbObject)
346 *
347 * Which is protected by a mutex
348 */
349
350 int
start_instance(NodeId nodeId,const ndb_mgm_configuration * conf)351 TransporterFacade::start_instance(NodeId nodeId,
352 const ndb_mgm_configuration* conf)
353 {
354 assert(theOwnId == 0);
355 theOwnId = nodeId;
356
357 #if defined SIGPIPE && !defined _WIN32
358 (void)signal(SIGPIPE, SIG_IGN);
359 #endif
360
361 theTransporterRegistry = new TransporterRegistry(this);
362 if (theTransporterRegistry == NULL)
363 return -1;
364
365 if (!theTransporterRegistry->init(nodeId))
366 return -1;
367
368 if (theClusterMgr == NULL)
369 theClusterMgr = new ClusterMgr(*this);
370
371 if (theClusterMgr == NULL)
372 return -1;
373
374 if (!configure(nodeId, conf))
375 return -1;
376
377 if (!theTransporterRegistry->start_service(m_socket_server))
378 return -1;
379
380 theReceiveThread = NdbThread_Create(runReceiveResponse_C,
381 (void**)this,
382 0, // Use default stack size
383 "ndb_receive",
384 NDB_THREAD_PRIO_LOW);
385
386 theSendThread = NdbThread_Create(runSendRequest_C,
387 (void**)this,
388 0, // Use default stack size
389 "ndb_send",
390 NDB_THREAD_PRIO_LOW);
391
392 theClusterMgr->startThread();
393
394 return 0;
395 }
396
397 /**
398 * Note that this function need no locking since its
399 * only called from the destructor of Ndb (the NdbObject)
400 *
401 * Which is protected by a mutex
402 */
403 void
stop_instance()404 TransporterFacade::stop_instance(){
405 DBUG_ENTER("TransporterFacade::stop_instance");
406 doStop();
407 DBUG_VOID_RETURN;
408 }
409
410 void
doStop()411 TransporterFacade::doStop(){
412 DBUG_ENTER("TransporterFacade::doStop");
413 /**
414 * First stop the ClusterMgr because it needs to send one more signal
415 * and also uses theFacadeInstance to lock/unlock theMutexPtr
416 */
417 if (theClusterMgr != NULL) theClusterMgr->doStop();
418
419 /**
420 * Now stop the send and receive threads
421 */
422 void *status;
423 theStopReceive = 1;
424 if (theReceiveThread) {
425 NdbThread_WaitFor(theReceiveThread, &status);
426 NdbThread_Destroy(&theReceiveThread);
427 }
428 if (theSendThread) {
429 NdbThread_WaitFor(theSendThread, &status);
430 NdbThread_Destroy(&theSendThread);
431 }
432 DBUG_VOID_RETURN;
433 }
434
435 extern "C"
436 void*
runSendRequest_C(void * me)437 runSendRequest_C(void * me)
438 {
439 ((TransporterFacade*) me)->threadMainSend();
440 return 0;
441 }
442
threadMainSend(void)443 void TransporterFacade::threadMainSend(void)
444 {
445 theTransporterRegistry->startSending();
446 if (theTransporterRegistry->start_clients() == 0){
447 ndbout_c("Unable to start theTransporterRegistry->start_clients");
448 exit(0);
449 }
450
451 m_socket_server.startServer();
452
453 while(!theStopReceive) {
454 NdbSleep_MilliSleep(10);
455 NdbMutex_Lock(theMutexPtr);
456 if (sendPerformedLastInterval == 0) {
457 theTransporterRegistry->performSend();
458 }
459 sendPerformedLastInterval = 0;
460 NdbMutex_Unlock(theMutexPtr);
461 }
462 theTransporterRegistry->stopSending();
463
464 m_socket_server.stopServer();
465 m_socket_server.stopSessions(true);
466
467 theTransporterRegistry->stop_clients();
468 }
469
470 extern "C"
471 void*
runReceiveResponse_C(void * me)472 runReceiveResponse_C(void * me)
473 {
474 ((TransporterFacade*) me)->threadMainReceive();
475 return 0;
476 }
477
478 /*
479 The receiver thread is changed to only wake up once every 10 milliseconds
480 to poll. It will first check that nobody owns the poll "right" before
481 polling. This means that methods using the receiveResponse and
482 sendRecSignal will have a slightly longer response time if they are
483 executed without any parallel key lookups. Currently also scans are
484 affected but this is to be fixed.
485 */
threadMainReceive(void)486 void TransporterFacade::threadMainReceive(void)
487 {
488 theTransporterRegistry->startReceiving();
489 #ifdef NDB_SHM_TRANSPORTER
490 NdbThread_set_shm_sigmask(TRUE);
491 #endif
492 while(!theStopReceive)
493 {
494 theClusterMgr->lock();
495 theTransporterRegistry->update_connections();
496 theClusterMgr->unlock();
497 NdbSleep_MilliSleep(100);
498 }//while
499 theTransporterRegistry->stopReceiving();
500 }
501 /*
502 This method is called by worker thread that owns the poll "rights".
503 It waits for events and if something arrives it takes care of it
504 and returns to caller. It will quickly come back here if not all
505 data was received for the worker thread.
506 */
external_poll(Uint32 wait_time)507 void TransporterFacade::external_poll(Uint32 wait_time)
508 {
509 NdbMutex_Unlock(theMutexPtr);
510
511 #ifdef NDB_SHM_TRANSPORTER
512 /*
513 If shared memory transporters are used we need to set our sigmask
514 such that we wake up also on interrupts on the shared memory
515 interrupt signal.
516 */
517 NdbThread_set_shm_sigmask(FALSE);
518 #endif
519
520 const int res = theTransporterRegistry->pollReceive(wait_time);
521
522 #ifdef NDB_SHM_TRANSPORTER
523 NdbThread_set_shm_sigmask(TRUE);
524 #endif
525
526 NdbMutex_Lock(theMutexPtr);
527 if (res > 0)
528 {
529 theTransporterRegistry->performReceive();
530 }
531 }
532
TransporterFacade(GlobalDictCache * cache)533 TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
534 m_poll_owner(NULL),
535 m_poll_queue_head(NULL),
536 m_poll_queue_tail(NULL),
537 theTransporterRegistry(0),
538 theOwnId(0),
539 theStartNodeId(1),
540 theClusterMgr(NULL),
541 checkCounter(4),
542 currentSendLimit(1),
543 theStopReceive(0),
544 theSendThread(NULL),
545 theReceiveThread(NULL),
546 m_fragmented_signal_id(0),
547 m_globalDictCache(cache)
548 {
549 DBUG_ENTER("TransporterFacade::TransporterFacade");
550 theMutexPtr = NdbMutex_CreateWithName("TTFM");
551 sendPerformedLastInterval = 0;
552
553 for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
554 m_fixed2dynamic[i]= RNIL;
555
556 #ifdef API_TRACE
557 apiSignalLog = 0;
558 #endif
559
560 theClusterMgr = new ClusterMgr(*this);
561
562 DBUG_VOID_RETURN;
563 }
564
565
566 /* Return true if node with "nodeId" is a MGM node */
is_mgmd(Uint32 nodeId,const ndb_mgm_configuration * conf)567 static bool is_mgmd(Uint32 nodeId,
568 const ndb_mgm_configuration * conf)
569 {
570 ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_NODE);
571 if (iter.find(CFG_NODE_ID, nodeId))
572 abort();
573 Uint32 type;
574 if(iter.get(CFG_TYPE_OF_SECTION, &type))
575 abort();
576
577 return (type == NODE_TYPE_MGM);
578 }
579
580
581 bool
do_connect_mgm(NodeId nodeId,const ndb_mgm_configuration * conf)582 TransporterFacade::do_connect_mgm(NodeId nodeId,
583 const ndb_mgm_configuration* conf)
584 {
585 // Allow other MGM nodes to connect
586 DBUG_ENTER("TransporterFacade::do_connect_mgm");
587 ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_CONNECTION);
588 for(iter.first(); iter.valid(); iter.next())
589 {
590 Uint32 nodeId1, nodeId2;
591 if (iter.get(CFG_CONNECTION_NODE_1, &nodeId1) ||
592 iter.get(CFG_CONNECTION_NODE_2, &nodeId2))
593 DBUG_RETURN(false);
594
595 // Skip connections where this node is not involved
596 if (nodeId1 != nodeId && nodeId2 != nodeId)
597 continue;
598
599 // If both sides are MGM, open connection
600 if(is_mgmd(nodeId1, conf) && is_mgmd(nodeId2, conf))
601 {
602 Uint32 remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
603 DBUG_PRINT("info", ("opening connection to node %d", remoteNodeId));
604 doConnect(remoteNodeId);
605 }
606 }
607
608 DBUG_RETURN(true);
609 }
610
611 bool
configure(NodeId nodeId,const ndb_mgm_configuration * conf)612 TransporterFacade::configure(NodeId nodeId,
613 const ndb_mgm_configuration* conf)
614 {
615 DBUG_ENTER("TransporterFacade::configure");
616
617 assert(theOwnId == nodeId);
618 assert(theTransporterRegistry);
619 assert(theClusterMgr);
620
621 // Configure transporters
622 if (!IPCConfig::configureTransporters(nodeId,
623 * conf,
624 * theTransporterRegistry,
625 true))
626 DBUG_RETURN(false);
627
628 // Configure cluster manager
629 theClusterMgr->configure(nodeId, conf);
630
631 ndb_mgm_configuration_iterator iter(* conf, CFG_SECTION_NODE);
632 if(iter.find(CFG_NODE_ID, nodeId))
633 DBUG_RETURN(false);
634
635 // Configure send buffers
636 Uint32 total_send_buffer = 0;
637 iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
638 theTransporterRegistry->allocate_send_buffers(total_send_buffer);
639
640 Uint32 auto_reconnect=1;
641 iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);
642
643 const char * priospec = 0;
644 if (iter.get(CFG_HB_THREAD_PRIO, &priospec) == 0)
645 {
646 NdbThread_SetHighPrioProperties(priospec);
647 }
648
649 /**
650 * Keep value it set before connect (overriding config)
651 */
652 if (theClusterMgr->m_auto_reconnect == -1)
653 {
654 theClusterMgr->m_auto_reconnect = auto_reconnect;
655 }
656
657 #ifdef API_TRACE
658 signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
659 #endif
660
661 // Open connection between MGM servers
662 if (!do_connect_mgm(nodeId, conf))
663 DBUG_RETURN(false);
664
665 /**
666 * Also setup Loopback Transporter
667 */
668 doConnect(nodeId);
669
670 DBUG_RETURN(true);
671 }
672
673 void
for_each(trp_client * sender,const NdbApiSignal * aSignal,const LinearSectionPtr ptr[3])674 TransporterFacade::for_each(trp_client* sender,
675 const NdbApiSignal* aSignal,
676 const LinearSectionPtr ptr[3])
677 {
678 Uint32 sz = m_threads.m_statusNext.size();
679 for (Uint32 i = 0; i < sz ; i ++)
680 {
681 trp_client * clnt = m_threads.m_objectExecute[i];
682 if (clnt != 0 && clnt != sender)
683 {
684 clnt->trp_deliver_signal(aSignal, ptr);
685 }
686 }
687 }
688
689 void
connected()690 TransporterFacade::connected()
691 {
692 DBUG_ENTER("TransporterFacade::connected");
693 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theOwnId));
694 signal.theVerId_signalNumber = GSN_ALLOC_NODEID_CONF;
695 signal.theReceiversBlockNumber = 0;
696 signal.theTrace = 0;
697 signal.theLength = AllocNodeIdConf::SignalLength;
698
699 AllocNodeIdConf * rep = CAST_PTR(AllocNodeIdConf, signal.getDataPtrSend());
700 rep->senderRef = 0;
701 rep->senderData = 0;
702 rep->nodeId = theOwnId;
703 rep->secret_lo = 0;
704 rep->secret_hi = 0;
705
706 Uint32 sz = m_threads.m_statusNext.size();
707 for (Uint32 i = 0; i < sz ; i ++)
708 {
709 trp_client * clnt = m_threads.m_objectExecute[i];
710 if (clnt != 0)
711 {
712 clnt->trp_deliver_signal(&signal, 0);
713 }
714 }
715 DBUG_VOID_RETURN;
716 }
717
718 int
close_clnt(trp_client * clnt)719 TransporterFacade::close_clnt(trp_client* clnt)
720 {
721 int ret = -1;
722 if (clnt)
723 {
724 NdbMutex_Lock(theMutexPtr);
725 if (m_threads.get(clnt->m_blockNo) == clnt)
726 {
727 m_threads.close(clnt->m_blockNo);
728 ret = 0;
729 }
730 else
731 {
732 assert(0);
733 }
734 NdbMutex_Unlock(theMutexPtr);
735 }
736 return ret;
737 }
738
739 Uint32
open_clnt(trp_client * clnt,int blockNo)740 TransporterFacade::open_clnt(trp_client * clnt, int blockNo)
741 {
742 DBUG_ENTER("TransporterFacade::open");
743 Guard g(theMutexPtr);
744 int r= m_threads.open(clnt);
745 if (r < 0)
746 {
747 DBUG_RETURN(0);
748 }
749
750 if (unlikely(blockNo != -1))
751 {
752 // Using fixed block number, add fixed->dymamic mapping
753 Uint32 fixed_index = blockNo - MIN_API_FIXED_BLOCK_NO;
754
755 assert(blockNo >= MIN_API_FIXED_BLOCK_NO &&
756 fixed_index <= NO_API_FIXED_BLOCKS);
757
758 m_fixed2dynamic[fixed_index]= r;
759 }
760
761 if (theOwnId > 0)
762 {
763 r = numberToRef(r, theOwnId);
764 }
765 else
766 {
767 r = numberToRef(r, 0);
768 }
769 DBUG_RETURN(r);
770 }
771
~TransporterFacade()772 TransporterFacade::~TransporterFacade()
773 {
774 DBUG_ENTER("TransporterFacade::~TransporterFacade");
775
776 delete theClusterMgr;
777 NdbMutex_Lock(theMutexPtr);
778 delete theTransporterRegistry;
779 NdbMutex_Unlock(theMutexPtr);
780 NdbMutex_Destroy(theMutexPtr);
781 #ifdef API_TRACE
782 signalLogger.setOutputStream(0);
783 #endif
784 DBUG_VOID_RETURN;
785 }
786
787 void
calculateSendLimit()788 TransporterFacade::calculateSendLimit()
789 {
790 Uint32 Ti;
791 Uint32 TthreadCount = 0;
792
793 Uint32 sz = m_threads.m_statusNext.size();
794 for (Ti = 0; Ti < sz; Ti++) {
795 if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
796 TthreadCount++;
797 m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
798 }
799 }
800 currentSendLimit = TthreadCount;
801 if (currentSendLimit == 0) {
802 currentSendLimit = 1;
803 }
804 checkCounter = currentSendLimit << 2;
805 }
806
807
808 //-------------------------------------------------
809 // Force sending but still report the sending to the
810 // adaptive algorithm.
811 //-------------------------------------------------
forceSend(Uint32 block_number)812 void TransporterFacade::forceSend(Uint32 block_number) {
813 checkCounter--;
814 m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
815 sendPerformedLastInterval = 1;
816 if (checkCounter < 0) {
817 calculateSendLimit();
818 }
819 theTransporterRegistry->forceSendCheck(0);
820 }
821
822 //-------------------------------------------------
823 // Improving API performance
824 //-------------------------------------------------
825 void
checkForceSend(Uint32 block_number)826 TransporterFacade::checkForceSend(Uint32 block_number) {
827 m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
828 //-------------------------------------------------
829 // This code is an adaptive algorithm to discover when
830 // the API should actually send its buffers. The reason
831 // is that the performance is highly dependent on the
832 // size of the writes over the communication network.
833 // Thus we try to ensure that the send size is as big
834 // as possible. At the same time we don't want response
835 // time to increase so therefore we have to keep track of
836 // how the users are performing adaptively.
837 //-------------------------------------------------
838
839 if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
840 sendPerformedLastInterval = 1;
841 }
842 checkCounter--;
843 if (checkCounter < 0) {
844 calculateSendLimit();
845 }
846 }
847
848
849 /******************************************************************************
850 * SEND SIGNAL METHODS
851 *****************************************************************************/
852 int
sendSignal(const NdbApiSignal * aSignal,NodeId aNode)853 TransporterFacade::sendSignal(const NdbApiSignal * aSignal, NodeId aNode)
854 {
855 const Uint32* tDataPtr = aSignal->getConstDataPtrSend();
856 Uint32 Tlen = aSignal->theLength;
857 Uint32 TBno = aSignal->theReceiversBlockNumber;
858 #ifdef API_TRACE
859 if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
860 SignalHeader tmp = * aSignal;
861 tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
862 LinearSectionPtr ptr[3];
863 signalLogger.sendSignal(tmp,
864 1,
865 tDataPtr,
866 aNode, ptr, 0);
867 signalLogger.flushSignalLog();
868 }
869 #endif
870 if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
871 SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
872 1, // JBB
873 tDataPtr,
874 aNode,
875 (LinearSectionPtr*)0);
876 //if (ss != SEND_OK) ndbout << ss << endl;
877 if (ss == SEND_OK)
878 {
879 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
880 aSignal->readSignalNumber() == GSN_API_REGREQ ||
881 (aSignal->readSignalNumber() == GSN_CONNECT_REP &&
882 aNode == ownId()));
883 }
884 return (ss == SEND_OK ? 0 : -1);
885 }
886 else
887 {
888 ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno;
889 ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl;
890 assert(0);
891 }//if
892 return -1; // Node Dead
893 }
894
895 /**
896 * FragmentedSectionIterator
897 * -------------------------
898 * This class acts as an adapter to a GenericSectionIterator
899 * instance, providing a sub-range iterator interface.
900 * It is used when long sections of a signal are fragmented
901 * across multiple actual signals - the user-supplied
902 * GenericSectionIterator is then adapted into a
903 * GenericSectionIterator that only returns a subset of
904 * the contained words for each signal fragment.
905 */
906 class FragmentedSectionIterator: public GenericSectionIterator
907 {
908 private :
909 GenericSectionIterator* realIterator; /* Real underlying iterator */
910 Uint32 realIterWords; /* Total size of underlying */
911 Uint32 realCurrPos; /* Current pos in underlying */
912 Uint32 rangeStart; /* Sub range start in underlying */
913 Uint32 rangeLen; /* Sub range len in underlying */
914 Uint32 rangeRemain; /* Remaining words in underlying */
915 const Uint32* lastReadPtr; /* Ptr to last chunk obtained from
916 * underlying */
917 Uint32 lastReadPtrLen; /* Remaining words in last chunk
918 * obtained from underlying */
919 public:
920 /* Constructor
921 * The instance is constructed with the sub-range set to be the
922 * full range of the underlying iterator
923 */
FragmentedSectionIterator(GenericSectionPtr ptr)924 FragmentedSectionIterator(GenericSectionPtr ptr)
925 {
926 realIterator= ptr.sectionIter;
927 realIterWords= ptr.sz;
928 realCurrPos= 0;
929 rangeStart= 0;
930 rangeLen= rangeRemain= realIterWords;
931 lastReadPtr= NULL;
932 lastReadPtrLen= 0;
933 moveToPos(0);
934
935 assert(checkInvariants());
936 }
937
938 private:
939 /**
940 * checkInvariants
941 * These class invariants must hold true at all stable states
942 * of the iterator
943 */
checkInvariants()944 bool checkInvariants()
945 {
946 assert( (realIterator != NULL) || (realIterWords == 0) );
947 assert( realCurrPos <= realIterWords );
948 assert( rangeStart <= realIterWords );
949 assert( (rangeStart+rangeLen) <= realIterWords);
950 assert( rangeRemain <= rangeLen );
951
952 /* Can only have a null readptr if nothing is left */
953 assert( (lastReadPtr != NULL) || (rangeRemain == 0));
954
955 /* If we have a non-null readptr and some remaining
956 * words the readptr must have some words
957 */
958 assert( (lastReadPtr == NULL) ||
959 ((rangeRemain == 0) || (lastReadPtrLen != 0)));
960 return true;
961 }
962
963 /**
964 * moveToPos
965 * This method is used when the iterator is reset(), to move
966 * to the start of the current sub-range.
967 * If the iterator is already in-position then this is efficient
968 * Otherwise, it has to reset() the underling iterator and
969 * advance it until the start position is reached.
970 */
moveToPos(Uint32 pos)971 void moveToPos(Uint32 pos)
972 {
973 assert(pos <= realIterWords);
974
975 if (pos < realCurrPos)
976 {
977 /* Need to reset, and advance from the start */
978 realIterator->reset();
979 realCurrPos= 0;
980 lastReadPtr= NULL;
981 lastReadPtrLen= 0;
982 }
983
984 if ((lastReadPtr == NULL) &&
985 (realIterWords != 0) &&
986 (pos != realIterWords))
987 lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
988
989 if (pos == realCurrPos)
990 return;
991
992 /* Advance until we get a chunk which contains the pos */
993 while (pos >= realCurrPos + lastReadPtrLen)
994 {
995 realCurrPos+= lastReadPtrLen;
996 lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
997 assert(lastReadPtr != NULL);
998 }
999
1000 const Uint32 chunkOffset= pos - realCurrPos;
1001 lastReadPtr+= chunkOffset;
1002 lastReadPtrLen-= chunkOffset;
1003 realCurrPos= pos;
1004 }
1005
1006 public:
1007 /**
1008 * setRange
1009 * Set the sub-range of the iterator. Must be within the
1010 * bounds of the underlying iterator
1011 * After the range is set, the iterator is reset() to the
1012 * start of the supplied subrange
1013 */
setRange(Uint32 start,Uint32 len)1014 bool setRange(Uint32 start, Uint32 len)
1015 {
1016 assert(checkInvariants());
1017 if (start+len > realIterWords)
1018 return false;
1019 moveToPos(start);
1020
1021 rangeStart= start;
1022 rangeLen= rangeRemain= len;
1023
1024 assert(checkInvariants());
1025 return true;
1026 }
1027
1028 /**
1029 * reset
1030 * (GenericSectionIterator)
1031 * Reset the iterator to the start of the current sub-range
1032 * Avoid calling as it could be expensive.
1033 */
reset()1034 void reset()
1035 {
1036 /* Reset iterator to last specified range */
1037 assert(checkInvariants());
1038 moveToPos(rangeStart);
1039 rangeRemain= rangeLen;
1040 assert(checkInvariants());
1041 }
1042
1043 /**
1044 * getNextWords
1045 * (GenericSectionIterator)
1046 * Get ptr and size of next contiguous words in subrange
1047 */
getNextWords(Uint32 & sz)1048 const Uint32* getNextWords(Uint32& sz)
1049 {
1050 assert(checkInvariants());
1051 const Uint32* currPtr= NULL;
1052
1053 if (rangeRemain)
1054 {
1055 assert(lastReadPtr != NULL);
1056 assert(lastReadPtrLen != 0);
1057 currPtr= lastReadPtr;
1058
1059 sz= MIN(rangeRemain, lastReadPtrLen);
1060
1061 if (sz == lastReadPtrLen)
1062 /* Will return everything in this chunk, move iterator to
1063 * next
1064 */
1065 lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
1066 else
1067 {
1068 /* Not returning all of this chunk, just advance within it */
1069 lastReadPtr+= sz;
1070 lastReadPtrLen-= sz;
1071 }
1072 realCurrPos+= sz;
1073 rangeRemain-= sz;
1074 }
1075 else
1076 {
1077 sz= 0;
1078 }
1079
1080 assert(checkInvariants());
1081 return currPtr;
1082 }
1083 };
1084
1085 /* Max fragmented signal chunk size (words) is max round number
1086 * of NDB_SECTION_SEGMENT_SZ words with some slack left for 'main'
1087 * part of signal etc.
1088 */
1089 #define CHUNK_SZ ((((MAX_SEND_MESSAGE_BYTESIZE >> 2) / NDB_SECTION_SEGMENT_SZ) - 2 ) \
1090 * NDB_SECTION_SEGMENT_SZ)
1091
1092 /**
1093 * sendFragmentedSignal (GenericSectionPtr variant)
1094 * ------------------------------------------------
1095 * This method will send a signal with attached long sections. If
1096 * the signal is longer than CHUNK_SZ, the signal will be split into
1097 * multiple CHUNK_SZ fragments.
1098 *
1099 * This is done by sending two or more long signals(fragments), with the
1100 * original GSN, but different signal data and with as much of the long
1101 * sections as will fit in each.
1102 *
1103 * Non-final fragment signals contain a fraginfo value in the header
1104 * (1= first fragment, 2= intermediate fragment, 3= final fragment)
1105 *
1106 * Fragment signals contain additional words in their signals :
1107 * 1..n words Mapping section numbers in fragment signal to original
1108 * signal section numbers
1109 * 1 word Fragmented signal unique id.
1110 *
1111 * Non final fragments (fraginfo=1/2) only have this data in them. Final
1112 * fragments have this data in addition to the normal signal data.
1113 *
1114 * Each fragment signal can transport one or more long sections, starting
1115 * with section 0. Sections are always split on NDB_SECTION_SEGMENT_SZ word
1116 * boundaries to simplify reassembly in the kernel.
1117 */
1118 int
sendFragmentedSignal(const NdbApiSignal * inputSignal,NodeId aNode,const GenericSectionPtr ptr[3],Uint32 secs)1119 TransporterFacade::sendFragmentedSignal(const NdbApiSignal* inputSignal,
1120 NodeId aNode,
1121 const GenericSectionPtr ptr[3],
1122 Uint32 secs)
1123 {
1124 NdbApiSignal copySignal(* inputSignal);
1125 NdbApiSignal* aSignal = ©Signal;
1126
1127 unsigned i;
1128 Uint32 totalSectionLength= 0;
1129 for (i= 0; i < secs; i++)
1130 totalSectionLength+= ptr[i].sz;
1131
1132 /* If there's no need to fragment, send normally */
1133 if (totalSectionLength <= CHUNK_SZ)
1134 return sendSignal(aSignal, aNode, ptr, secs);
1135
1136 // TODO : Consider tracing fragment signals?
1137 #ifdef API_TRACE
1138 if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1139 SignalHeader tmp = * aSignal;
1140 tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1141 signalLogger.sendSignal(tmp,
1142 1,
1143 aSignal->getConstDataPtrSend(),
1144 aNode, ptr, 0);
1145 signalLogger.flushSignalLog();
1146 for (Uint32 i = 0; i<secs; i++)
1147 ptr[i].sectionIter->reset();
1148 }
1149 #endif
1150
1151 NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
1152 GenericSectionPtr tmp_ptr[3];
1153 GenericSectionPtr empty= {0, NULL};
1154 Uint32 unique_id= m_fragmented_signal_id++; // next unique id
1155
1156 /* Init tmp_ptr array from ptr[] array, make sure we have
1157 * 0 length for missing sections
1158 */
1159 for (i= 0; i < 3; i++)
1160 tmp_ptr[i]= (i < secs)? ptr[i] : empty;
1161
1162 /* Create our section iterator adapters */
1163 FragmentedSectionIterator sec0(tmp_ptr[0]);
1164 FragmentedSectionIterator sec1(tmp_ptr[1]);
1165 FragmentedSectionIterator sec2(tmp_ptr[2]);
1166
1167 /* Replace caller's iterators with ours */
1168 tmp_ptr[0].sectionIter= &sec0;
1169 tmp_ptr[1].sectionIter= &sec1;
1170 tmp_ptr[2].sectionIter= &sec2;
1171
1172 unsigned start_i= 0;
1173 unsigned this_chunk_sz= 0;
1174 unsigned fragment_info= 0;
1175 Uint32 *tmp_signal_data= tmp_signal.getDataPtrSend();
1176 for (i= 0; i < secs;) {
1177 unsigned remaining_sec_sz= tmp_ptr[i].sz;
1178 tmp_signal_data[i-start_i]= i;
1179 if (this_chunk_sz + remaining_sec_sz <= CHUNK_SZ)
1180 {
1181 /* This section fits whole, move onto next */
1182 this_chunk_sz+= remaining_sec_sz;
1183 i++;
1184 }
1185 else
1186 {
1187 /* This section doesn't fit, truncate it */
1188 unsigned send_sz= CHUNK_SZ - this_chunk_sz;
1189 if (i != start_i)
1190 {
1191 /* We ensure that the first piece of a new section which is
1192 * being truncated is a multiple of NDB_SECTION_SEGMENT_SZ
1193 * (to simplify reassembly). Subsequent non-truncated pieces
1194 * will be CHUNK_SZ which is a multiple of NDB_SECTION_SEGMENT_SZ
1195 * The final piece does not need to be a multiple of
1196 * NDB_SECTION_SEGMENT_SZ
1197 *
1198 * Note that this can push this_chunk_sz above CHUNK_SZ
1199 * Should probably round-down, but need to be careful of
1200 * 'can't fit any' cases. Instead, CHUNK_SZ is defined
1201 * with some slack below MAX_SENT_MESSAGE_BYTESIZE
1202 */
1203 send_sz=
1204 NDB_SECTION_SEGMENT_SZ
1205 *((send_sz+NDB_SECTION_SEGMENT_SZ-1)
1206 /NDB_SECTION_SEGMENT_SZ);
1207 if (send_sz > remaining_sec_sz)
1208 send_sz= remaining_sec_sz;
1209 }
1210
1211 /* Modify tmp generic section ptr to describe truncated
1212 * section
1213 */
1214 tmp_ptr[i].sz= send_sz;
1215 FragmentedSectionIterator* fragIter=
1216 (FragmentedSectionIterator*) tmp_ptr[i].sectionIter;
1217 const Uint32 total_sec_sz= ptr[i].sz;
1218 const Uint32 start= (total_sec_sz - remaining_sec_sz);
1219 bool ok= fragIter->setRange(start, send_sz);
1220 assert(ok);
1221 if (!ok)
1222 return -1;
1223
1224 if (fragment_info < 2) // 1 = first fragment signal
1225 // 2 = middle fragments
1226 fragment_info++;
1227
1228 // send tmp_signal
1229 tmp_signal_data[i-start_i+1]= unique_id;
1230 tmp_signal.setLength(i-start_i+2);
1231 tmp_signal.m_fragmentInfo= fragment_info;
1232 tmp_signal.m_noOfSections= i-start_i+1;
1233 // do prepare send
1234 {
1235 SendStatus ss = theTransporterRegistry->prepareSend
1236 (&tmp_signal,
1237 1, /*JBB*/
1238 tmp_signal_data,
1239 aNode,
1240 &tmp_ptr[start_i]);
1241 assert(ss != SEND_MESSAGE_TOO_BIG);
1242 if (ss != SEND_OK) return -1;
1243 if (ss == SEND_OK)
1244 {
1245 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1246 tmp_signal.readSignalNumber() == GSN_API_REGREQ);
1247 }
1248 }
1249 // setup variables for next signal
1250 start_i= i;
1251 this_chunk_sz= 0;
1252 assert(remaining_sec_sz >= send_sz);
1253 Uint32 remaining= remaining_sec_sz - send_sz;
1254 tmp_ptr[i].sz= remaining;
1255 /* Set sub-range iterator to cover remaining words */
1256 ok= fragIter->setRange(start+send_sz, remaining);
1257 assert(ok);
1258 if (!ok)
1259 return -1;
1260
1261 if (remaining == 0)
1262 /* This section's done, move onto the next */
1263 i++;
1264 }
1265 }
1266
1267 unsigned a_sz= aSignal->getLength();
1268
1269 if (fragment_info > 0) {
1270 // update the original signal to include section info
1271 Uint32 *a_data= aSignal->getDataPtrSend();
1272 unsigned tmp_sz= i-start_i;
1273 memcpy(a_data+a_sz,
1274 tmp_signal_data,
1275 tmp_sz*sizeof(Uint32));
1276 a_data[a_sz+tmp_sz]= unique_id;
1277 aSignal->setLength(a_sz+tmp_sz+1);
1278
1279 // send last fragment
1280 aSignal->m_fragmentInfo= 3; // 3 = last fragment
1281 aSignal->m_noOfSections= i-start_i;
1282 } else {
1283 aSignal->m_noOfSections= secs;
1284 }
1285
1286 // send aSignal
1287 int ret;
1288 {
1289 SendStatus ss = theTransporterRegistry->prepareSend
1290 (aSignal,
1291 1/*JBB*/,
1292 aSignal->getConstDataPtrSend(),
1293 aNode,
1294 &tmp_ptr[start_i]);
1295 assert(ss != SEND_MESSAGE_TOO_BIG);
1296 if (ss == SEND_OK)
1297 {
1298 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1299 aSignal->readSignalNumber() == GSN_API_REGREQ);
1300 }
1301 ret = (ss == SEND_OK ? 0 : -1);
1302 }
1303 aSignal->m_noOfSections = 0;
1304 aSignal->m_fragmentInfo = 0;
1305 aSignal->setLength(a_sz);
1306 return ret;
1307 }
1308
1309 int
sendFragmentedSignal(const NdbApiSignal * aSignal,NodeId aNode,const LinearSectionPtr ptr[3],Uint32 secs)1310 TransporterFacade::sendFragmentedSignal(const NdbApiSignal* aSignal,
1311 NodeId aNode,
1312 const LinearSectionPtr ptr[3],
1313 Uint32 secs)
1314 {
1315 /* Use the GenericSection variant of sendFragmentedSignal */
1316 GenericSectionPtr tmpPtr[3];
1317 LinearSectionPtr linCopy[3];
1318 const LinearSectionPtr empty= {0, NULL};
1319
1320 /* Make sure all of linCopy is initialised */
1321 for (Uint32 j=0; j<3; j++)
1322 linCopy[j]= (j < secs)? ptr[j] : empty;
1323
1324 LinearSectionIterator zero (linCopy[0].p, linCopy[0].sz);
1325 LinearSectionIterator one (linCopy[1].p, linCopy[1].sz);
1326 LinearSectionIterator two (linCopy[2].p, linCopy[2].sz);
1327
1328 /* Build GenericSectionPtr array using iterators */
1329 tmpPtr[0].sz= linCopy[0].sz;
1330 tmpPtr[0].sectionIter= &zero;
1331 tmpPtr[1].sz= linCopy[1].sz;
1332 tmpPtr[1].sectionIter= &one;
1333 tmpPtr[2].sz= linCopy[2].sz;
1334 tmpPtr[2].sectionIter= &two;
1335
1336 return sendFragmentedSignal(aSignal, aNode, tmpPtr, secs);
1337 }
1338
1339
1340 int
sendSignal(const NdbApiSignal * aSignal,NodeId aNode,const LinearSectionPtr ptr[3],Uint32 secs)1341 TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
1342 const LinearSectionPtr ptr[3], Uint32 secs)
1343 {
1344 Uint32 save = aSignal->m_noOfSections;
1345 const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = secs;
1346 #ifdef API_TRACE
1347 if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1348 SignalHeader tmp = * aSignal;
1349 tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1350 signalLogger.sendSignal(tmp,
1351 1,
1352 aSignal->getConstDataPtrSend(),
1353 aNode, ptr, secs);
1354 signalLogger.flushSignalLog();
1355 }
1356 #endif
1357 SendStatus ss = theTransporterRegistry->prepareSend
1358 (aSignal,
1359 1, // JBB
1360 aSignal->getConstDataPtrSend(),
1361 aNode,
1362 ptr);
1363 assert(ss != SEND_MESSAGE_TOO_BIG);
1364 const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = save;
1365 if (ss == SEND_OK)
1366 {
1367 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1368 aSignal->readSignalNumber() == GSN_API_REGREQ);
1369 }
1370 return (ss == SEND_OK ? 0 : -1);
1371 }
1372
1373 int
sendSignal(const NdbApiSignal * aSignal,NodeId aNode,const GenericSectionPtr ptr[3],Uint32 secs)1374 TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
1375 const GenericSectionPtr ptr[3], Uint32 secs)
1376 {
1377 Uint32 save = aSignal->m_noOfSections;
1378 const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = secs;
1379 #ifdef API_TRACE
1380 if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1381 SignalHeader tmp = * aSignal;
1382 tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1383 signalLogger.sendSignal(tmp,
1384 1,
1385 aSignal->getConstDataPtrSend(),
1386 aNode, ptr, secs);
1387 signalLogger.flushSignalLog();
1388 for (Uint32 i = 0; i<secs; i++)
1389 ptr[i].sectionIter->reset();
1390 }
1391 #endif
1392 SendStatus ss = theTransporterRegistry->prepareSend
1393 (aSignal,
1394 1, // JBB
1395 aSignal->getConstDataPtrSend(),
1396 aNode,
1397 ptr);
1398 assert(ss != SEND_MESSAGE_TOO_BIG);
1399 const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = save;
1400 if (ss == SEND_OK)
1401 {
1402 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1403 aSignal->readSignalNumber() == GSN_API_REGREQ);
1404 }
1405 return (ss == SEND_OK ? 0 : -1);
1406 }
1407
1408 /******************************************************************************
1409 * CONNECTION METHODS Etc
1410 ******************************************************************************/
1411 void
doConnect(int aNodeId)1412 TransporterFacade::doConnect(int aNodeId){
1413 theTransporterRegistry->setIOState(aNodeId, NoHalt);
1414 theTransporterRegistry->do_connect(aNodeId);
1415 }
1416
1417 void
doDisconnect(int aNodeId)1418 TransporterFacade::doDisconnect(int aNodeId)
1419 {
1420 theTransporterRegistry->do_disconnect(aNodeId);
1421 }
1422
1423 void
reportConnected(int aNodeId)1424 TransporterFacade::reportConnected(int aNodeId)
1425 {
1426 theClusterMgr->reportConnected(aNodeId);
1427 return;
1428 }
1429
1430 void
reportDisconnected(int aNodeId)1431 TransporterFacade::reportDisconnected(int aNodeId)
1432 {
1433 theClusterMgr->reportDisconnected(aNodeId);
1434 return;
1435 }
1436
1437 NodeId
ownId() const1438 TransporterFacade::ownId() const
1439 {
1440 return theOwnId;
1441 }
1442
1443 bool
isConnected(NodeId aNodeId)1444 TransporterFacade::isConnected(NodeId aNodeId){
1445 return theTransporterRegistry->is_connected(aNodeId);
1446 }
1447
1448 NodeId
get_an_alive_node()1449 TransporterFacade::get_an_alive_node()
1450 {
1451 DBUG_ENTER("TransporterFacade::get_an_alive_node");
1452 DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
1453 #ifdef VM_TRACE
1454 const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
1455 if (p != 0 && *p != 0)
1456 return atoi(p);
1457 #endif
1458 NodeId i;
1459 for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
1460 if (get_node_alive(i)){
1461 DBUG_PRINT("info", ("Node %d is alive", i));
1462 theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1463 DBUG_RETURN(i);
1464 }
1465 }
1466 for (i = 1; i < theStartNodeId; i++) {
1467 if (get_node_alive(i)){
1468 DBUG_PRINT("info", ("Node %d is alive", i));
1469 theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1470 DBUG_RETURN(i);
1471 }
1472 }
1473 DBUG_RETURN((NodeId)0);
1474 }
1475
ThreadData(Uint32 size)1476 TransporterFacade::ThreadData::ThreadData(Uint32 size){
1477 m_use_cnt = 0;
1478 m_firstFree = END_OF_LIST;
1479 expand(size);
1480 }
1481
1482 void
expand(Uint32 size)1483 TransporterFacade::ThreadData::expand(Uint32 size){
1484 trp_client * oe = 0;
1485
1486 const Uint32 sz = m_statusNext.size();
1487 m_objectExecute.fill(sz + size, oe);
1488 for(Uint32 i = 0; i<size; i++){
1489 m_statusNext.push_back(sz + i + 1);
1490 }
1491
1492 m_statusNext.back() = m_firstFree;
1493 m_firstFree = m_statusNext.size() - size;
1494 }
1495
1496
1497 int
open(trp_client * clnt)1498 TransporterFacade::ThreadData::open(trp_client * clnt)
1499 {
1500 Uint32 nextFree = m_firstFree;
1501
1502 if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
1503 return -1;
1504 }
1505
1506 if(nextFree == END_OF_LIST){
1507 expand(10);
1508 nextFree = m_firstFree;
1509 }
1510
1511 m_use_cnt++;
1512 m_firstFree = m_statusNext[nextFree];
1513
1514 m_statusNext[nextFree] = INACTIVE;
1515 m_objectExecute[nextFree] = clnt;
1516
1517 return indexToNumber(nextFree);
1518 }
1519
1520 int
close(int number)1521 TransporterFacade::ThreadData::close(int number){
1522 number= numberToIndex(number);
1523 assert(m_objectExecute[number] != 0);
1524 m_statusNext[number] = m_firstFree;
1525 assert(m_use_cnt);
1526 m_use_cnt--;
1527 m_firstFree = number;
1528 m_objectExecute[number] = 0;
1529 return 0;
1530 }
1531
1532 Uint32
get_active_ndb_objects() const1533 TransporterFacade::get_active_ndb_objects() const
1534 {
1535 return m_threads.m_use_cnt;
1536 }
1537
1538
1539 void
start_poll(trp_client * clnt)1540 TransporterFacade::start_poll(trp_client* clnt)
1541 {
1542 lock_mutex();
1543 clnt->m_poll.m_locked = true;
1544 }
1545
1546 void
do_poll(trp_client * clnt,Uint32 wait_time)1547 TransporterFacade::do_poll(trp_client* clnt, Uint32 wait_time)
1548 {
1549 clnt->m_poll.m_waiting = true;
1550 assert(clnt->m_poll.m_locked == true);
1551 trp_client* owner = m_poll_owner;
1552 if (owner != NULL && owner != clnt)
1553 {
1554 /*
1555 We didn't get hold of the poll "right". We will sleep on a
1556 conditional mutex until the thread owning the poll "right"
1557 will wake us up after all data is received. If no data arrives
1558 we will wake up eventually due to the timeout.
1559 After receiving all data we take the object out of the cond wait
1560 queue if it hasn't happened already. It is usually already out of the
1561 queue but at time-out it could be that the object is still there.
1562 */
1563 assert(clnt->m_poll.m_poll_owner == false);
1564 add_to_poll_queue(clnt);
1565 NdbCondition_WaitTimeout(clnt->m_poll.m_condition, theMutexPtr,
1566 wait_time);
1567 if (clnt != m_poll_owner && clnt->m_poll.m_waiting)
1568 {
1569 remove_from_poll_queue(clnt);
1570 }
1571 }
1572 else
1573 {
1574 /*
1575 We got the poll "right" and we poll until data is received. After
1576 receiving data we will check if all data is received, if not we
1577 poll again.
1578 */
1579 assert(owner == clnt || clnt->m_poll.m_poll_owner == false);
1580 m_poll_owner = clnt;
1581 clnt->m_poll.m_poll_owner = true;
1582 external_poll(wait_time);
1583 }
1584 }
1585
1586 void
wakeup(trp_client * clnt)1587 TransporterFacade::wakeup(trp_client* clnt)
1588 {
1589 if (clnt->m_poll.m_waiting)
1590 {
1591 clnt->m_poll.m_waiting = false;
1592 if (m_poll_owner != clnt)
1593 {
1594 remove_from_poll_queue(clnt);
1595 NdbCondition_Signal(clnt->m_poll.m_condition);
1596 }
1597 }
1598 }
1599
1600 void
complete_poll(trp_client * clnt)1601 TransporterFacade::complete_poll(trp_client* clnt)
1602 {
1603 clnt->m_poll.m_waiting = false;
1604 if (!clnt->m_poll.m_locked)
1605 {
1606 assert(clnt->m_poll.m_poll_owner == false);
1607 return;
1608 }
1609
1610 /*
1611 When completing the poll for this thread we must return the poll
1612 ownership if we own it. We will give it to the last thread that
1613 came here (the most recent) which is likely to be the one also
1614 last to complete. We will remove that thread from the conditional
1615 wait queue and set him as the new owner of the poll "right".
1616 We will wait however with the signal until we have unlocked the
1617 mutex for performance reasons.
1618 See Stevens book on Unix NetworkProgramming: The Sockets Networking
1619 API Volume 1 Third Edition on page 703-704 for a discussion on this
1620 subject.
1621 */
1622 trp_client* new_owner = 0;
1623 if (m_poll_owner == clnt)
1624 {
1625 assert(clnt->m_poll.m_poll_owner == true);
1626 m_poll_owner = new_owner = remove_last_from_poll_queue();
1627 }
1628 if (new_owner)
1629 {
1630 assert(new_owner->m_poll.m_poll_owner == false);
1631 assert(new_owner->m_poll.m_locked == true);
1632 assert(new_owner->m_poll.m_waiting == true);
1633 NdbCondition_Signal(new_owner->m_poll.m_condition);
1634 new_owner->m_poll.m_poll_owner = true;
1635 }
1636 clnt->m_poll.m_locked = false;
1637 clnt->m_poll.m_poll_owner = false;
1638 unlock_mutex();
1639 }
1640
1641 void
add_to_poll_queue(trp_client * clnt)1642 TransporterFacade::add_to_poll_queue(trp_client* clnt)
1643 {
1644 assert(clnt != 0);
1645 assert(clnt->m_poll.m_prev == 0);
1646 assert(clnt->m_poll.m_next == 0);
1647 assert(clnt->m_poll.m_locked == true);
1648 assert(clnt->m_poll.m_poll_owner == false);
1649
1650 if (m_poll_queue_head == 0)
1651 {
1652 assert(m_poll_queue_tail == 0);
1653 m_poll_queue_head = clnt;
1654 m_poll_queue_tail = clnt;
1655 }
1656 else
1657 {
1658 assert(m_poll_queue_tail->m_poll.m_next == 0);
1659 m_poll_queue_tail->m_poll.m_next = clnt;
1660 clnt->m_poll.m_prev = m_poll_queue_tail;
1661 m_poll_queue_tail = clnt;
1662 }
1663 }
1664
1665 void
remove_from_poll_queue(trp_client * clnt)1666 TransporterFacade::remove_from_poll_queue(trp_client* clnt)
1667 {
1668 assert(clnt != 0);
1669 assert(clnt->m_poll.m_locked == true);
1670 assert(clnt->m_poll.m_poll_owner == false);
1671
1672 if (clnt->m_poll.m_prev != 0)
1673 {
1674 clnt->m_poll.m_prev->m_poll.m_next = clnt->m_poll.m_next;
1675 }
1676 else
1677 {
1678 assert(m_poll_queue_head == clnt);
1679 m_poll_queue_head = clnt->m_poll.m_next;
1680 }
1681
1682 if (clnt->m_poll.m_next != 0)
1683 {
1684 clnt->m_poll.m_next->m_poll.m_prev = clnt->m_poll.m_prev;
1685 }
1686 else
1687 {
1688 assert(m_poll_queue_tail == clnt);
1689 m_poll_queue_tail = clnt->m_poll.m_prev;
1690 }
1691
1692 if (m_poll_queue_head == 0)
1693 assert(m_poll_queue_tail == 0);
1694 else if (m_poll_queue_tail == 0)
1695 assert(m_poll_queue_head == 0);
1696
1697 clnt->m_poll.m_prev = 0;
1698 clnt->m_poll.m_next = 0;
1699 }
1700
1701 trp_client*
remove_last_from_poll_queue()1702 TransporterFacade::remove_last_from_poll_queue()
1703 {
1704 trp_client * clnt = m_poll_queue_tail;
1705 if (clnt == 0)
1706 return 0;
1707
1708 remove_from_poll_queue(clnt);
1709 return clnt;
1710 }
1711
1712 template class Vector<trp_client*>;
1713
1714 #include "SignalSender.hpp"
1715
1716 const Uint32*
getNextWords(Uint32 & sz)1717 SignalSectionIterator::getNextWords(Uint32& sz)
1718 {
1719 if (likely(currentSignal != NULL))
1720 {
1721 NdbApiSignal* signal= currentSignal;
1722 currentSignal= currentSignal->next();
1723 sz= signal->getLength();
1724 return signal->getDataPtrSend();
1725 }
1726 sz= 0;
1727 return NULL;
1728 }
1729
1730 #ifdef UNIT_TEST
1731
1732 // Unit test code starts
1733 #include <random.h>
1734
1735 #define VERIFY(x) if ((x) == 0) { printf("VERIFY failed at Line %u : %s\n",__LINE__, #x); return -1; }
1736
1737 /* Verify that word[n] == bias + n */
1738 int
verifyIteratorContents(GenericSectionIterator & gsi,int dataWords,int bias)1739 verifyIteratorContents(GenericSectionIterator& gsi, int dataWords, int bias)
1740 {
1741 int pos= 0;
1742
1743 while (pos < dataWords)
1744 {
1745 const Uint32* readPtr=NULL;
1746 Uint32 len= 0;
1747
1748 readPtr= gsi.getNextWords(len);
1749
1750 VERIFY(readPtr != NULL);
1751 VERIFY(len != 0);
1752 VERIFY(len <= (Uint32) (dataWords - pos));
1753
1754 for (int j=0; j < (int) len; j++)
1755 VERIFY(readPtr[j] == (Uint32) (bias ++));
1756
1757 pos += len;
1758 }
1759
1760 return 0;
1761 }
1762
1763 int
checkGenericSectionIterator(GenericSectionIterator & iter,int size,int bias)1764 checkGenericSectionIterator(GenericSectionIterator& iter, int size, int bias)
1765 {
1766 /* Verify contents */
1767 VERIFY(verifyIteratorContents(iter, size, bias) == 0);
1768
1769 Uint32 sz;
1770
1771 /* Check that iterator is empty now */
1772 VERIFY(iter.getNextWords(sz) == NULL);
1773 VERIFY(sz == 0);
1774
1775 VERIFY(iter.getNextWords(sz) == NULL);
1776 VERIFY(sz == 0);
1777
1778 iter.reset();
1779
1780 /* Verify reset put us back to the start */
1781 VERIFY(verifyIteratorContents(iter, size, bias) == 0);
1782
1783 /* Verify no more words available */
1784 VERIFY(iter.getNextWords(sz) == NULL);
1785 VERIFY(sz == 0);
1786
1787 return 0;
1788 }
1789
1790 int
checkIterator(GenericSectionIterator & iter,int size,int bias)1791 checkIterator(GenericSectionIterator& iter, int size, int bias)
1792 {
1793 /* Test iterator itself, and then FragmentedSectionIterator
1794 * adaptation
1795 */
1796 VERIFY(checkGenericSectionIterator(iter, size, bias) == 0);
1797
1798 /* Now we'll test the FragmentedSectionIterator on the iterator
1799 * we were passed
1800 */
1801 const int subranges= 20;
1802
1803 iter.reset();
1804 GenericSectionPtr ptr;
1805 ptr.sz= size;
1806 ptr.sectionIter= &iter;
1807 FragmentedSectionIterator fsi(ptr);
1808
1809 for (int s=0; s< subranges; s++)
1810 {
1811 Uint32 start= 0;
1812 Uint32 len= 0;
1813 if (size > 0)
1814 {
1815 start= (Uint32) myRandom48(size);
1816 if (0 != (size-start))
1817 len= (Uint32) myRandom48(size-start);
1818 }
1819
1820 /*
1821 printf("Range (0-%u) = (%u + %u)\n",
1822 size, start, len);
1823 */
1824 fsi.setRange(start, len);
1825 VERIFY(checkGenericSectionIterator(fsi, len, bias + start) == 0);
1826 }
1827
1828 return 0;
1829 }
1830
1831
1832
1833 int
testLinearSectionIterator()1834 testLinearSectionIterator()
1835 {
1836 /* Test Linear section iterator of various
1837 * lengths with section[n] == bias + n
1838 */
1839 const int totalSize= 200000;
1840 const int bias= 13;
1841
1842 Uint32 data[totalSize];
1843 for (int i=0; i<totalSize; i++)
1844 data[i]= bias + i;
1845
1846 for (int len= 0; len < 50000; len++)
1847 {
1848 LinearSectionIterator something(data, len);
1849
1850 VERIFY(checkIterator(something, len, bias) == 0);
1851 }
1852
1853 return 0;
1854 }
1855
1856 NdbApiSignal*
createSignalChain(NdbApiSignal * & poolHead,int length,int bias)1857 createSignalChain(NdbApiSignal*& poolHead, int length, int bias)
1858 {
1859 /* Create signal chain, with word[n] == bias+n */
1860 NdbApiSignal* chainHead= NULL;
1861 NdbApiSignal* chainTail= NULL;
1862 int pos= 0;
1863 int signals= 0;
1864
1865 while (pos < length)
1866 {
1867 int offset= pos % NdbApiSignal::MaxSignalWords;
1868
1869 if (offset == 0)
1870 {
1871 if (poolHead == NULL)
1872 return 0;
1873
1874 NdbApiSignal* newSig= poolHead;
1875 poolHead= poolHead->next();
1876 signals++;
1877
1878 newSig->next(NULL);
1879
1880 if (chainHead == NULL)
1881 {
1882 chainHead= chainTail= newSig;
1883 }
1884 else
1885 {
1886 chainTail->next(newSig);
1887 chainTail= newSig;
1888 }
1889 }
1890
1891 chainTail->getDataPtrSend()[offset]= (bias + pos);
1892 chainTail->setLength(offset + 1);
1893 pos ++;
1894 }
1895
1896 return chainHead;
1897 }
1898
1899 int
testSignalSectionIterator()1900 testSignalSectionIterator()
1901 {
1902 /* Create a pool of signals, build
1903 * signal chains from it, test
1904 * the iterator against the signal chains
1905 */
1906 const int totalNumSignals= 1000;
1907 NdbApiSignal* poolHead= NULL;
1908
1909 /* Allocate some signals */
1910 for (int i=0; i < totalNumSignals; i++)
1911 {
1912 NdbApiSignal* sig= new NdbApiSignal((BlockReference) 0);
1913
1914 if (poolHead == NULL)
1915 {
1916 poolHead= sig;
1917 sig->next(NULL);
1918 }
1919 else
1920 {
1921 sig->next(poolHead);
1922 poolHead= sig;
1923 }
1924 }
1925
1926 const int bias= 7;
1927 for (int dataWords= 1;
1928 dataWords <= (int)(totalNumSignals *
1929 NdbApiSignal::MaxSignalWords);
1930 dataWords ++)
1931 {
1932 NdbApiSignal* signalChain= NULL;
1933
1934 VERIFY((signalChain= createSignalChain(poolHead, dataWords, bias)) != NULL );
1935
1936 SignalSectionIterator ssi(signalChain);
1937
1938 VERIFY(checkIterator(ssi, dataWords, bias) == 0);
1939
1940 /* Now return the signals to the pool */
1941 while (signalChain != NULL)
1942 {
1943 NdbApiSignal* sig= signalChain;
1944 signalChain= signalChain->next();
1945
1946 sig->next(poolHead);
1947 poolHead= sig;
1948 }
1949 }
1950
1951 /* Free signals from pool */
1952 while (poolHead != NULL)
1953 {
1954 NdbApiSignal* sig= poolHead;
1955 poolHead= sig->next();
1956 delete(sig);
1957 }
1958
1959 return 0;
1960 }
1961
main(int arg,char ** argv)1962 int main(int arg, char** argv)
1963 {
1964 /* Test Section Iterators
1965 * ----------------------
1966 * To run this code :
1967 * cd storage/ndb/src/ndbapi
1968 * make testSectionIterators
1969 * ./testSectionIterators
1970 *
1971 * Will print "OK" in success case
1972 */
1973
1974
1975 VERIFY(testLinearSectionIterator() == 0);
1976 VERIFY(testSignalSectionIterator() == 0);
1977
1978 printf("OK\n");
1979
1980 return 0;
1981 }
1982 #endif
1983
1984 void
set_auto_reconnect(int val)1985 TransporterFacade::set_auto_reconnect(int val)
1986 {
1987 theClusterMgr->m_auto_reconnect = val;
1988 }
1989
1990 int
get_auto_reconnect() const1991 TransporterFacade::get_auto_reconnect() const
1992 {
1993 return theClusterMgr->m_auto_reconnect;
1994 }
1995
1996 void
ext_set_max_api_reg_req_interval(Uint32 interval)1997 TransporterFacade::ext_set_max_api_reg_req_interval(Uint32 interval)
1998 {
1999 theClusterMgr->set_max_api_reg_req_interval(interval);
2000 }
2001
2002 void
ext_update_connections()2003 TransporterFacade::ext_update_connections()
2004 {
2005 theClusterMgr->lock();
2006 theTransporterRegistry->update_connections();
2007 theClusterMgr->unlock();
2008 }
2009
2010 struct in_addr
ext_get_connect_address(Uint32 nodeId)2011 TransporterFacade::ext_get_connect_address(Uint32 nodeId)
2012 {
2013 return theTransporterRegistry->get_connect_address(nodeId);
2014 }
2015
2016 void
ext_forceHB()2017 TransporterFacade::ext_forceHB()
2018 {
2019 theClusterMgr->forceHB();
2020 }
2021
2022 bool
ext_isConnected(NodeId aNodeId)2023 TransporterFacade::ext_isConnected(NodeId aNodeId)
2024 {
2025 bool val;
2026 theClusterMgr->lock();
2027 val = theClusterMgr->theNodes[aNodeId].is_connected();
2028 theClusterMgr->unlock();
2029 return val;
2030 }
2031
2032 void
ext_doConnect(int aNodeId)2033 TransporterFacade::ext_doConnect(int aNodeId)
2034 {
2035 theClusterMgr->lock();
2036 assert(theClusterMgr->theNodes[aNodeId].is_connected() == false);
2037 doConnect(aNodeId);
2038 theClusterMgr->unlock();
2039 }
2040
2041