1 /*
2    Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 #include <ndb_limits.h>
27 #include "TransporterFacade.hpp"
28 #include "trp_client.hpp"
29 #include "ClusterMgr.hpp"
30 #include <IPCConfig.hpp>
31 #include <TransporterCallback.hpp>
32 #include <TransporterRegistry.hpp>
33 #include "NdbApiSignal.hpp"
34 #include "NdbWaiter.hpp"
35 #include <NdbOut.hpp>
36 #include <NdbEnv.h>
37 #include <NdbSleep.h>
38 
39 #include <kernel/GlobalSignalNumbers.h>
40 #include <mgmapi_config_parameters.h>
41 #include <mgmapi_configuration.hpp>
42 #include <NdbConfig.h>
43 #include <ndb_version.h>
44 #include <SignalLoggerManager.hpp>
45 #include <kernel/ndb_limits.h>
46 #include <signaldata/AlterTable.hpp>
47 #include <signaldata/SumaImpl.hpp>
48 #include <signaldata/AllocNodeId.hpp>
49 
50 //#define REPORT_TRANSPORTER
51 //#define API_TRACE
52 
numberToIndex(int number)53 static int numberToIndex(int number)
54 {
55   return number - MIN_API_BLOCK_NO;
56 }
57 
indexToNumber(int index)58 static int indexToNumber(int index)
59 {
60   return index + MIN_API_BLOCK_NO;
61 }
62 
63 #if defined DEBUG_TRANSPORTER
64 #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
65 #else
66 #define TRP_DEBUG(t)
67 #endif
68 
69 /*****************************************************************************
70  * Call back functions
71  *****************************************************************************/
72 void
reportError(NodeId nodeId,TransporterError errorCode,const char * info)73 TransporterFacade::reportError(NodeId nodeId,
74                                TransporterError errorCode, const char *info)
75 {
76 #ifdef REPORT_TRANSPORTER
77   ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s",
78 	   (int)nodeId, (int)errorCode, info ? info : "");
79 #endif
80   if(errorCode & TE_DO_DISCONNECT) {
81     ndbout_c("reportError (%d, %d) %s", (int)nodeId, (int)errorCode,
82 	     info ? info : "");
83     doDisconnect(nodeId);
84   }
85 }
86 
87 /**
88  * Report average send length in bytes (4096 last sends)
89  */
90 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)91 TransporterFacade::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
92 {
93 #ifdef REPORT_TRANSPORTER
94   ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)",
95 	   (int)nodeId, (Uint32)(bytes/count));
96 #endif
97   (void)nodeId;
98   (void)count;
99   (void)bytes;
100 }
101 
102 /**
103  * Report average receive length in bytes (4096 last receives)
104  */
105 void
reportReceiveLen(NodeId nodeId,Uint32 count,Uint64 bytes)106 TransporterFacade::reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes)
107 {
108 #ifdef REPORT_TRANSPORTER
109   ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)",
110 	   (int)nodeId, (Uint32)(bytes/count));
111 #endif
112   (void)nodeId;
113   (void)count;
114   (void)bytes;
115 }
116 
117 /**
118  * Report connection established
119  */
120 void
reportConnect(NodeId nodeId)121 TransporterFacade::reportConnect(NodeId nodeId)
122 {
123 #ifdef REPORT_TRANSPORTER
124   ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
125 #endif
126   reportConnected(nodeId);
127 }
128 
129 /**
130  * Report connection broken
131  */
132 void
reportDisconnect(NodeId nodeId,Uint32 error)133 TransporterFacade::reportDisconnect(NodeId nodeId, Uint32 error){
134 #ifdef REPORT_TRANSPORTER
135   ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
136 #endif
137   reportDisconnected(nodeId);
138 }
139 
140 void
transporter_recv_from(NodeId nodeId)141 TransporterFacade::transporter_recv_from(NodeId nodeId)
142 {
143   hb_received(nodeId);
144 }
145 
146 /****************************************************************************
147  *
148  *****************************************************************************/
149 
150 /**
151  * Report connection broken
152  */
153 int
checkJobBuffer()154 TransporterFacade::checkJobBuffer()
155 {
156   return 0;
157 }
158 
159 #ifdef API_TRACE
160 static const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";
161 static const char * apiSignalLog   = 0;
162 static SignalLoggerManager signalLogger;
163 static
164 inline
165 bool
setSignalLog()166 setSignalLog(){
167   signalLogger.flushSignalLog();
168 
169   const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);
170   if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
171     return true;
172   } else if(tmp == 0 && apiSignalLog == 0){
173     return false;
174   } else if(tmp == 0 && apiSignalLog != 0){
175     signalLogger.setOutputStream(0);
176     apiSignalLog = tmp;
177     return false;
178   } else if(tmp !=0){
179     if (strcmp(tmp, "-") == 0)
180         signalLogger.setOutputStream(stdout);
181 #ifndef DBUG_OFF
182     else if (strcmp(tmp, "+") == 0)
183         signalLogger.setOutputStream(DBUG_FILE);
184 #endif
185     else
186         signalLogger.setOutputStream(fopen(tmp, "w"));
187     apiSignalLog = tmp;
188     return true;
189   }
190   return false;
191 }
192 inline
193 bool
TRACE_GSN(Uint32 gsn)194 TRACE_GSN(Uint32 gsn)
195 {
196   switch(gsn){
197 #ifndef TRACE_APIREGREQ
198   case GSN_API_REGREQ:
199   case GSN_API_REGCONF:
200     return false;
201 #endif
202 #if 1
203   case GSN_SUB_GCP_COMPLETE_REP:
204   case GSN_SUB_GCP_COMPLETE_ACK:
205     return false;
206 #endif
207   default:
208     return true;
209   }
210 }
211 #endif
212 
213 /**
214  * The execute function : Handle received signal
215  */
216 void
deliver_signal(SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])217 TransporterFacade::deliver_signal(SignalHeader * const header,
218                                   Uint8 prio, Uint32 * const theData,
219                                   LinearSectionPtr ptr[3])
220 {
221   Uint32 tRecBlockNo = header->theReceiversBlockNumber;
222 
223 #ifdef API_TRACE
224   if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
225     signalLogger.executeSignal(* header,
226                                prio,
227                                theData,
228                                ownId(),
229                                ptr, header->m_noOfSections);
230     signalLogger.flushSignalLog();
231   }
232 #endif
233 
234   if (tRecBlockNo >= MIN_API_BLOCK_NO)
235   {
236     trp_client * clnt = m_threads.get(tRecBlockNo);
237     if (clnt != 0)
238     {
239       /**
240        * Handle received signal immediately to avoid any unnecessary
241        * copying of data, allocation of memory and other things. Copying
242        * of data could be interesting to support several priority levels
243        * and to support a special memory structure when executing the
244        * signals. Neither of those are interesting when receiving data
245        * in the NDBAPI. The NDBAPI will thus read signal data directly as
246        * it was written by the sender (SCI sender is other node, Shared
247        * memory sender is other process and TCP/IP sender is the OS that
248        * writes the TCP/IP message into a message buffer).
249        */
250       NdbApiSignal tmpSignal(*header);
251       NdbApiSignal * tSignal = &tmpSignal;
252       tSignal->setDataPtr(theData);
253       clnt->trp_deliver_signal(tSignal, ptr);
254     }//if
255   }
256   else if (tRecBlockNo == API_PACKED)
257   {
258     /**
259      * Block number == 2047 is used to signal a signal that consists of
260      * multiple instances of the same signal. This is an effort to
261      * package the signals so as to avoid unnecessary communication
262      * overhead since TCP/IP has a great performance impact.
263      */
264     Uint32 Tlength = header->theLength;
265     Uint32 Tsent = 0;
266     /**
267      * Since it contains at least two data packets we will first
268      * copy the signal data to safe place.
269      */
270     while (Tsent < Tlength) {
271       Uint32 Theader = theData[Tsent];
272       Tsent++;
273       Uint32 TpacketLen = (Theader & 0x1F) + 3;
274       tRecBlockNo = Theader >> 16;
275       if (TpacketLen <= 25)
276       {
277         if ((TpacketLen + Tsent) <= Tlength)
278         {
279           /**
280            * Set the data length of the signal and the receivers block
281            * reference and then call the API.
282            */
283           header->theLength = TpacketLen;
284           header->theReceiversBlockNumber = tRecBlockNo;
285           Uint32* tDataPtr = &theData[Tsent];
286           Tsent += TpacketLen;
287           if (tRecBlockNo >= MIN_API_BLOCK_NO)
288           {
289             trp_client * clnt = m_threads.get(tRecBlockNo);
290             if(clnt != 0)
291             {
292               NdbApiSignal tmpSignal(*header);
293               NdbApiSignal * tSignal = &tmpSignal;
294               tSignal->setDataPtr(tDataPtr);
295               clnt->trp_deliver_signal(tSignal, 0);
296             }
297           }
298         }
299       }
300     }
301     return;
302   }
303   else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
304            tRecBlockNo <= MAX_API_FIXED_BLOCK_NO)
305   {
306     Uint32 dynamic= m_fixed2dynamic[tRecBlockNo - MIN_API_FIXED_BLOCK_NO];
307     trp_client * clnt = m_threads.get(dynamic);
308     if (clnt != 0)
309     {
310       NdbApiSignal tmpSignal(*header);
311       NdbApiSignal * tSignal = &tmpSignal;
312       tSignal->setDataPtr(theData);
313       clnt->trp_deliver_signal(tSignal, ptr);
314     }//if
315   }
316   else
317   {
318     // Ignore all other block numbers.
319     if(header->theVerId_signalNumber != GSN_API_REGREQ)
320     {
321       TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
322       ndbout << "BLOCK NO: "  << tRecBlockNo << " sig "
323              << header->theVerId_signalNumber  << endl;
324       abort();
325     }
326   }
327 }
328 
329 // These symbols are needed, but not used in the API
330 void
printSegmentedSection(FILE *,const SignalHeader &,const SegmentedSectionPtr ptr[3],unsigned i)331 SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &,
332 					   const SegmentedSectionPtr ptr[3],
333 					   unsigned i){
334   abort();
335 }
336 
337 void
copy(Uint32 * & insertPtr,class SectionSegmentPool & thePool,const SegmentedSectionPtr & _ptr)338 copy(Uint32 * & insertPtr,
339      class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
340   abort();
341 }
342 
343 /**
344  * Note that this function needs no locking since it is
345  * only called from the constructor of Ndb (the NdbObject)
346  *
347  * Which is protected by a mutex
348  */
349 
350 int
start_instance(NodeId nodeId,const ndb_mgm_configuration * conf)351 TransporterFacade::start_instance(NodeId nodeId,
352                                   const ndb_mgm_configuration* conf)
353 {
354   assert(theOwnId == 0);
355   theOwnId = nodeId;
356 
357 #if defined SIGPIPE && !defined _WIN32
358   (void)signal(SIGPIPE, SIG_IGN);
359 #endif
360 
361   theTransporterRegistry = new TransporterRegistry(this);
362   if (theTransporterRegistry == NULL)
363     return -1;
364 
365   if (!theTransporterRegistry->init(nodeId))
366     return -1;
367 
368   if (theClusterMgr == NULL)
369     theClusterMgr = new ClusterMgr(*this);
370 
371   if (theClusterMgr == NULL)
372     return -1;
373 
374   if (!configure(nodeId, conf))
375     return -1;
376 
377   if (!theTransporterRegistry->start_service(m_socket_server))
378     return -1;
379 
380   theReceiveThread = NdbThread_Create(runReceiveResponse_C,
381                                       (void**)this,
382                                       0, // Use default stack size
383                                       "ndb_receive",
384                                       NDB_THREAD_PRIO_LOW);
385 
386   theSendThread = NdbThread_Create(runSendRequest_C,
387                                    (void**)this,
388                                    0, // Use default stack size
389                                    "ndb_send",
390                                    NDB_THREAD_PRIO_LOW);
391 
392   theClusterMgr->startThread();
393 
394   return 0;
395 }
396 
397 /**
398  * Note that this function need no locking since its
399  * only called from the destructor of Ndb (the NdbObject)
400  *
401  * Which is protected by a mutex
402  */
403 void
stop_instance()404 TransporterFacade::stop_instance(){
405   DBUG_ENTER("TransporterFacade::stop_instance");
406   doStop();
407   DBUG_VOID_RETURN;
408 }
409 
410 void
doStop()411 TransporterFacade::doStop(){
412   DBUG_ENTER("TransporterFacade::doStop");
413   /**
414    * First stop the ClusterMgr because it needs to send one more signal
415    * and also uses theFacadeInstance to lock/unlock theMutexPtr
416    */
417   if (theClusterMgr != NULL) theClusterMgr->doStop();
418 
419   /**
420    * Now stop the send and receive threads
421    */
422   void *status;
423   theStopReceive = 1;
424   if (theReceiveThread) {
425     NdbThread_WaitFor(theReceiveThread, &status);
426     NdbThread_Destroy(&theReceiveThread);
427   }
428   if (theSendThread) {
429     NdbThread_WaitFor(theSendThread, &status);
430     NdbThread_Destroy(&theSendThread);
431   }
432   DBUG_VOID_RETURN;
433 }
434 
435 extern "C"
436 void*
runSendRequest_C(void * me)437 runSendRequest_C(void * me)
438 {
439   ((TransporterFacade*) me)->threadMainSend();
440   return 0;
441 }
442 
threadMainSend(void)443 void TransporterFacade::threadMainSend(void)
444 {
445   theTransporterRegistry->startSending();
446   if (theTransporterRegistry->start_clients() == 0){
447     ndbout_c("Unable to start theTransporterRegistry->start_clients");
448     exit(0);
449   }
450 
451   m_socket_server.startServer();
452 
453   while(!theStopReceive) {
454     NdbSleep_MilliSleep(10);
455     NdbMutex_Lock(theMutexPtr);
456     if (sendPerformedLastInterval == 0) {
457       theTransporterRegistry->performSend();
458     }
459     sendPerformedLastInterval = 0;
460     NdbMutex_Unlock(theMutexPtr);
461   }
462   theTransporterRegistry->stopSending();
463 
464   m_socket_server.stopServer();
465   m_socket_server.stopSessions(true);
466 
467   theTransporterRegistry->stop_clients();
468 }
469 
470 extern "C"
471 void*
runReceiveResponse_C(void * me)472 runReceiveResponse_C(void * me)
473 {
474   ((TransporterFacade*) me)->threadMainReceive();
475   return 0;
476 }
477 
478 /*
479   The receiver thread is changed to only wake up once every 10 milliseconds
480   to poll. It will first check that nobody owns the poll "right" before
481   polling. This means that methods using the receiveResponse and
482   sendRecSignal will have a slightly longer response time if they are
483   executed without any parallel key lookups. Currently also scans are
484   affected but this is to be fixed.
485 */
threadMainReceive(void)486 void TransporterFacade::threadMainReceive(void)
487 {
488   theTransporterRegistry->startReceiving();
489 #ifdef NDB_SHM_TRANSPORTER
490   NdbThread_set_shm_sigmask(TRUE);
491 #endif
492   while(!theStopReceive)
493   {
494     theClusterMgr->lock();
495     theTransporterRegistry->update_connections();
496     theClusterMgr->unlock();
497     NdbSleep_MilliSleep(100);
498   }//while
499   theTransporterRegistry->stopReceiving();
500 }
501 /*
502   This method is called by worker thread that owns the poll "rights".
503   It waits for events and if something arrives it takes care of it
504   and returns to caller. It will quickly come back here if not all
505   data was received for the worker thread.
506 */
external_poll(Uint32 wait_time)507 void TransporterFacade::external_poll(Uint32 wait_time)
508 {
509   NdbMutex_Unlock(theMutexPtr);
510 
511 #ifdef NDB_SHM_TRANSPORTER
512   /*
513     If shared memory transporters are used we need to set our sigmask
514     such that we wake up also on interrupts on the shared memory
515     interrupt signal.
516   */
517   NdbThread_set_shm_sigmask(FALSE);
518 #endif
519 
520   const int res = theTransporterRegistry->pollReceive(wait_time);
521 
522 #ifdef NDB_SHM_TRANSPORTER
523   NdbThread_set_shm_sigmask(TRUE);
524 #endif
525 
526   NdbMutex_Lock(theMutexPtr);
527   if (res > 0)
528   {
529     theTransporterRegistry->performReceive();
530   }
531 }
532 
TransporterFacade(GlobalDictCache * cache)533 TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
534   m_poll_owner(NULL),
535   m_poll_queue_head(NULL),
536   m_poll_queue_tail(NULL),
537   theTransporterRegistry(0),
538   theOwnId(0),
539   theStartNodeId(1),
540   theClusterMgr(NULL),
541   checkCounter(4),
542   currentSendLimit(1),
543   theStopReceive(0),
544   theSendThread(NULL),
545   theReceiveThread(NULL),
546   m_fragmented_signal_id(0),
547   m_globalDictCache(cache)
548 {
549   DBUG_ENTER("TransporterFacade::TransporterFacade");
550   theMutexPtr = NdbMutex_CreateWithName("TTFM");
551   sendPerformedLastInterval = 0;
552 
553   for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
554     m_fixed2dynamic[i]= RNIL;
555 
556 #ifdef API_TRACE
557   apiSignalLog = 0;
558 #endif
559 
560   theClusterMgr = new ClusterMgr(*this);
561 
562   DBUG_VOID_RETURN;
563 }
564 
565 
566 /* Return true if node with "nodeId" is a MGM node */
is_mgmd(Uint32 nodeId,const ndb_mgm_configuration * conf)567 static bool is_mgmd(Uint32 nodeId,
568                     const ndb_mgm_configuration * conf)
569 {
570   ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_NODE);
571   if (iter.find(CFG_NODE_ID, nodeId))
572     abort();
573   Uint32 type;
574   if(iter.get(CFG_TYPE_OF_SECTION, &type))
575     abort();
576 
577   return (type == NODE_TYPE_MGM);
578 }
579 
580 
581 bool
do_connect_mgm(NodeId nodeId,const ndb_mgm_configuration * conf)582 TransporterFacade::do_connect_mgm(NodeId nodeId,
583                                   const ndb_mgm_configuration* conf)
584 {
585   // Allow other MGM nodes to connect
586   DBUG_ENTER("TransporterFacade::do_connect_mgm");
587   ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_CONNECTION);
588   for(iter.first(); iter.valid(); iter.next())
589   {
590     Uint32 nodeId1, nodeId2;
591     if (iter.get(CFG_CONNECTION_NODE_1, &nodeId1) ||
592         iter.get(CFG_CONNECTION_NODE_2, &nodeId2))
593       DBUG_RETURN(false);
594 
595     // Skip connections where this node is not involved
596     if (nodeId1 != nodeId && nodeId2 != nodeId)
597       continue;
598 
599     // If both sides are MGM, open connection
600     if(is_mgmd(nodeId1, conf) && is_mgmd(nodeId2, conf))
601     {
602       Uint32 remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
603       DBUG_PRINT("info", ("opening connection to node %d", remoteNodeId));
604       doConnect(remoteNodeId);
605     }
606   }
607 
608   DBUG_RETURN(true);
609 }
610 
611 bool
configure(NodeId nodeId,const ndb_mgm_configuration * conf)612 TransporterFacade::configure(NodeId nodeId,
613                              const ndb_mgm_configuration* conf)
614 {
615   DBUG_ENTER("TransporterFacade::configure");
616 
617   assert(theOwnId == nodeId);
618   assert(theTransporterRegistry);
619   assert(theClusterMgr);
620 
621   // Configure transporters
622   if (!IPCConfig::configureTransporters(nodeId,
623                                         * conf,
624                                         * theTransporterRegistry,
625                                         true))
626     DBUG_RETURN(false);
627 
628   // Configure cluster manager
629   theClusterMgr->configure(nodeId, conf);
630 
631   ndb_mgm_configuration_iterator iter(* conf, CFG_SECTION_NODE);
632   if(iter.find(CFG_NODE_ID, nodeId))
633     DBUG_RETURN(false);
634 
635   // Configure send buffers
636   Uint32 total_send_buffer = 0;
637   iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
638   theTransporterRegistry->allocate_send_buffers(total_send_buffer);
639 
640   Uint32 auto_reconnect=1;
641   iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);
642 
643   const char * priospec = 0;
644   if (iter.get(CFG_HB_THREAD_PRIO, &priospec) == 0)
645   {
646     NdbThread_SetHighPrioProperties(priospec);
647   }
648 
649   /**
650    * Keep value it set before connect (overriding config)
651    */
652   if (theClusterMgr->m_auto_reconnect == -1)
653   {
654     theClusterMgr->m_auto_reconnect = auto_reconnect;
655   }
656 
657 #ifdef API_TRACE
658   signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
659 #endif
660 
661   // Open connection between MGM servers
662   if (!do_connect_mgm(nodeId, conf))
663     DBUG_RETURN(false);
664 
665   /**
666    * Also setup Loopback Transporter
667    */
668   doConnect(nodeId);
669 
670   DBUG_RETURN(true);
671 }
672 
673 void
for_each(trp_client * sender,const NdbApiSignal * aSignal,const LinearSectionPtr ptr[3])674 TransporterFacade::for_each(trp_client* sender,
675                             const NdbApiSignal* aSignal,
676                             const LinearSectionPtr ptr[3])
677 {
678   Uint32 sz = m_threads.m_statusNext.size();
679   for (Uint32 i = 0; i < sz ; i ++)
680   {
681     trp_client * clnt = m_threads.m_objectExecute[i];
682     if (clnt != 0 && clnt != sender)
683     {
684       clnt->trp_deliver_signal(aSignal, ptr);
685     }
686   }
687 }
688 
689 void
connected()690 TransporterFacade::connected()
691 {
692   DBUG_ENTER("TransporterFacade::connected");
693   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theOwnId));
694   signal.theVerId_signalNumber = GSN_ALLOC_NODEID_CONF;
695   signal.theReceiversBlockNumber = 0;
696   signal.theTrace  = 0;
697   signal.theLength = AllocNodeIdConf::SignalLength;
698 
699   AllocNodeIdConf * rep = CAST_PTR(AllocNodeIdConf, signal.getDataPtrSend());
700   rep->senderRef = 0;
701   rep->senderData = 0;
702   rep->nodeId = theOwnId;
703   rep->secret_lo = 0;
704   rep->secret_hi = 0;
705 
706   Uint32 sz = m_threads.m_statusNext.size();
707   for (Uint32 i = 0; i < sz ; i ++)
708   {
709     trp_client * clnt = m_threads.m_objectExecute[i];
710     if (clnt != 0)
711     {
712       clnt->trp_deliver_signal(&signal, 0);
713     }
714   }
715   DBUG_VOID_RETURN;
716 }
717 
718 int
close_clnt(trp_client * clnt)719 TransporterFacade::close_clnt(trp_client* clnt)
720 {
721   int ret = -1;
722   if (clnt)
723   {
724     NdbMutex_Lock(theMutexPtr);
725     if (m_threads.get(clnt->m_blockNo) == clnt)
726     {
727       m_threads.close(clnt->m_blockNo);
728       ret = 0;
729     }
730     else
731     {
732       assert(0);
733     }
734     NdbMutex_Unlock(theMutexPtr);
735   }
736   return ret;
737 }
738 
739 Uint32
open_clnt(trp_client * clnt,int blockNo)740 TransporterFacade::open_clnt(trp_client * clnt, int blockNo)
741 {
742   DBUG_ENTER("TransporterFacade::open");
743   Guard g(theMutexPtr);
744   int r= m_threads.open(clnt);
745   if (r < 0)
746   {
747     DBUG_RETURN(0);
748   }
749 
750   if (unlikely(blockNo != -1))
751   {
752     // Using fixed block number, add fixed->dymamic mapping
753     Uint32 fixed_index = blockNo - MIN_API_FIXED_BLOCK_NO;
754 
755     assert(blockNo >= MIN_API_FIXED_BLOCK_NO &&
756            fixed_index <= NO_API_FIXED_BLOCKS);
757 
758     m_fixed2dynamic[fixed_index]= r;
759   }
760 
761   if (theOwnId > 0)
762   {
763     r = numberToRef(r, theOwnId);
764   }
765   else
766   {
767     r = numberToRef(r, 0);
768   }
769   DBUG_RETURN(r);
770 }
771 
~TransporterFacade()772 TransporterFacade::~TransporterFacade()
773 {
774   DBUG_ENTER("TransporterFacade::~TransporterFacade");
775 
776   delete theClusterMgr;
777   NdbMutex_Lock(theMutexPtr);
778   delete theTransporterRegistry;
779   NdbMutex_Unlock(theMutexPtr);
780   NdbMutex_Destroy(theMutexPtr);
781 #ifdef API_TRACE
782   signalLogger.setOutputStream(0);
783 #endif
784   DBUG_VOID_RETURN;
785 }
786 
787 void
calculateSendLimit()788 TransporterFacade::calculateSendLimit()
789 {
790   Uint32 Ti;
791   Uint32 TthreadCount = 0;
792 
793   Uint32 sz = m_threads.m_statusNext.size();
794   for (Ti = 0; Ti < sz; Ti++) {
795     if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
796       TthreadCount++;
797       m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
798     }
799   }
800   currentSendLimit = TthreadCount;
801   if (currentSendLimit == 0) {
802     currentSendLimit = 1;
803   }
804   checkCounter = currentSendLimit << 2;
805 }
806 
807 
808 //-------------------------------------------------
809 // Force sending but still report the sending to the
810 // adaptive algorithm.
811 //-------------------------------------------------
forceSend(Uint32 block_number)812 void TransporterFacade::forceSend(Uint32 block_number) {
813   checkCounter--;
814   m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
815   sendPerformedLastInterval = 1;
816   if (checkCounter < 0) {
817     calculateSendLimit();
818   }
819   theTransporterRegistry->forceSendCheck(0);
820 }
821 
822 //-------------------------------------------------
823 // Improving API performance
824 //-------------------------------------------------
825 void
checkForceSend(Uint32 block_number)826 TransporterFacade::checkForceSend(Uint32 block_number) {
827   m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
828   //-------------------------------------------------
829   // This code is an adaptive algorithm to discover when
830   // the API should actually send its buffers. The reason
831   // is that the performance is highly dependent on the
832   // size of the writes over the communication network.
833   // Thus we try to ensure that the send size is as big
834   // as possible. At the same time we don't want response
835   // time to increase so therefore we have to keep track of
836   // how the users are performing adaptively.
837   //-------------------------------------------------
838 
839   if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
840     sendPerformedLastInterval = 1;
841   }
842   checkCounter--;
843   if (checkCounter < 0) {
844     calculateSendLimit();
845   }
846 }
847 
848 
849 /******************************************************************************
850  * SEND SIGNAL METHODS
851  *****************************************************************************/
852 int
sendSignal(const NdbApiSignal * aSignal,NodeId aNode)853 TransporterFacade::sendSignal(const NdbApiSignal * aSignal, NodeId aNode)
854 {
855   const Uint32* tDataPtr = aSignal->getConstDataPtrSend();
856   Uint32 Tlen = aSignal->theLength;
857   Uint32 TBno = aSignal->theReceiversBlockNumber;
858 #ifdef API_TRACE
859   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
860     SignalHeader tmp = * aSignal;
861     tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
862     LinearSectionPtr ptr[3];
863     signalLogger.sendSignal(tmp,
864                             1,
865                             tDataPtr,
866                             aNode, ptr, 0);
867     signalLogger.flushSignalLog();
868   }
869 #endif
870   if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
871     SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
872                                                         1, // JBB
873                                                         tDataPtr,
874                                                         aNode,
875                                                         (LinearSectionPtr*)0);
876     //if (ss != SEND_OK) ndbout << ss << endl;
877     if (ss == SEND_OK)
878     {
879       assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
880              aSignal->readSignalNumber() == GSN_API_REGREQ ||
881              (aSignal->readSignalNumber() == GSN_CONNECT_REP &&
882               aNode == ownId()));
883     }
884     return (ss == SEND_OK ? 0 : -1);
885   }
886   else
887   {
888     ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno;
889     ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl;
890     assert(0);
891   }//if
892   return -1; // Node Dead
893 }
894 
895 /**
896  * FragmentedSectionIterator
897  * -------------------------
898  * This class acts as an adapter to a GenericSectionIterator
899  * instance, providing a sub-range iterator interface.
900  * It is used when long sections of a signal are fragmented
901  * across multiple actual signals - the user-supplied
902  * GenericSectionIterator is then adapted into a
903  * GenericSectionIterator that only returns a subset of
904  * the contained words for each signal fragment.
905  */
906 class FragmentedSectionIterator: public GenericSectionIterator
907 {
908 private :
909   GenericSectionIterator* realIterator; /* Real underlying iterator */
910   Uint32 realIterWords;                 /* Total size of underlying */
911   Uint32 realCurrPos;                   /* Current pos in underlying */
912   Uint32 rangeStart;                    /* Sub range start in underlying */
913   Uint32 rangeLen;                      /* Sub range len in underlying */
914   Uint32 rangeRemain;                   /* Remaining words in underlying */
915   const Uint32* lastReadPtr;            /* Ptr to last chunk obtained from
916                                          * underlying */
917   Uint32 lastReadPtrLen;                /* Remaining words in last chunk
918                                          * obtained from underlying */
919 public:
920   /* Constructor
921    * The instance is constructed with the sub-range set to be the
922    * full range of the underlying iterator
923    */
FragmentedSectionIterator(GenericSectionPtr ptr)924   FragmentedSectionIterator(GenericSectionPtr ptr)
925   {
926     realIterator= ptr.sectionIter;
927     realIterWords= ptr.sz;
928     realCurrPos= 0;
929     rangeStart= 0;
930     rangeLen= rangeRemain= realIterWords;
931     lastReadPtr= NULL;
932     lastReadPtrLen= 0;
933     moveToPos(0);
934 
935     assert(checkInvariants());
936   }
937 
938 private:
939   /**
940    * checkInvariants
941    * These class invariants must hold true at all stable states
942    * of the iterator
943    */
checkInvariants()944   bool checkInvariants()
945   {
946     assert( (realIterator != NULL) || (realIterWords == 0) );
947     assert( realCurrPos <= realIterWords );
948     assert( rangeStart <= realIterWords );
949     assert( (rangeStart+rangeLen) <= realIterWords);
950     assert( rangeRemain <= rangeLen );
951 
952     /* Can only have a null readptr if nothing is left */
953     assert( (lastReadPtr != NULL) || (rangeRemain == 0));
954 
955     /* If we have a non-null readptr and some remaining
956      * words the readptr must have some words
957      */
958     assert( (lastReadPtr == NULL) ||
959             ((rangeRemain == 0) || (lastReadPtrLen != 0)));
960     return true;
961   }
962 
963   /**
964    * moveToPos
965    * This method is used when the iterator is reset(), to move
966    * to the start of the current sub-range.
967    * If the iterator is already in-position then this is efficient
968    * Otherwise, it has to reset() the underling iterator and
969    * advance it until the start position is reached.
970    */
moveToPos(Uint32 pos)971   void moveToPos(Uint32 pos)
972   {
973     assert(pos <= realIterWords);
974 
975     if (pos < realCurrPos)
976     {
977       /* Need to reset, and advance from the start */
978       realIterator->reset();
979       realCurrPos= 0;
980       lastReadPtr= NULL;
981       lastReadPtrLen= 0;
982     }
983 
984     if ((lastReadPtr == NULL) &&
985         (realIterWords != 0) &&
986         (pos != realIterWords))
987       lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
988 
989     if (pos == realCurrPos)
990       return;
991 
992     /* Advance until we get a chunk which contains the pos */
993     while (pos >= realCurrPos + lastReadPtrLen)
994     {
995       realCurrPos+= lastReadPtrLen;
996       lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
997       assert(lastReadPtr != NULL);
998     }
999 
1000     const Uint32 chunkOffset= pos - realCurrPos;
1001     lastReadPtr+= chunkOffset;
1002     lastReadPtrLen-= chunkOffset;
1003     realCurrPos= pos;
1004   }
1005 
1006 public:
1007   /**
1008    * setRange
1009    * Set the sub-range of the iterator.  Must be within the
1010    * bounds of the underlying iterator
1011    * After the range is set, the iterator is reset() to the
1012    * start of the supplied subrange
1013    */
setRange(Uint32 start,Uint32 len)1014   bool setRange(Uint32 start, Uint32 len)
1015   {
1016     assert(checkInvariants());
1017     if (start+len > realIterWords)
1018       return false;
1019     moveToPos(start);
1020 
1021     rangeStart= start;
1022     rangeLen= rangeRemain= len;
1023 
1024     assert(checkInvariants());
1025     return true;
1026   }
1027 
1028   /**
1029    * reset
1030    * (GenericSectionIterator)
1031    * Reset the iterator to the start of the current sub-range
1032    * Avoid calling as it could be expensive.
1033    */
reset()1034   void reset()
1035   {
1036     /* Reset iterator to last specified range */
1037     assert(checkInvariants());
1038     moveToPos(rangeStart);
1039     rangeRemain= rangeLen;
1040     assert(checkInvariants());
1041   }
1042 
1043   /**
1044    * getNextWords
1045    * (GenericSectionIterator)
1046    * Get ptr and size of next contiguous words in subrange
1047    */
getNextWords(Uint32 & sz)1048   const Uint32* getNextWords(Uint32& sz)
1049   {
1050     assert(checkInvariants());
1051     const Uint32* currPtr= NULL;
1052 
1053     if (rangeRemain)
1054     {
1055       assert(lastReadPtr != NULL);
1056       assert(lastReadPtrLen != 0);
1057       currPtr= lastReadPtr;
1058 
1059       sz= MIN(rangeRemain, lastReadPtrLen);
1060 
1061       if (sz == lastReadPtrLen)
1062         /* Will return everything in this chunk, move iterator to
1063          * next
1064          */
1065         lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
1066       else
1067       {
1068         /* Not returning all of this chunk, just advance within it */
1069         lastReadPtr+= sz;
1070         lastReadPtrLen-= sz;
1071       }
1072       realCurrPos+= sz;
1073       rangeRemain-= sz;
1074     }
1075     else
1076     {
1077       sz= 0;
1078     }
1079 
1080     assert(checkInvariants());
1081     return currPtr;
1082   }
1083 };
1084 
1085 /* Max fragmented signal chunk size (words) is max round number
1086  * of NDB_SECTION_SEGMENT_SZ words with some slack left for 'main'
1087  * part of signal etc.
1088  */
1089 #define CHUNK_SZ ((((MAX_SEND_MESSAGE_BYTESIZE >> 2) / NDB_SECTION_SEGMENT_SZ) - 2 ) \
1090                   * NDB_SECTION_SEGMENT_SZ)
1091 
1092 /**
1093  * sendFragmentedSignal (GenericSectionPtr variant)
1094  * ------------------------------------------------
1095  * This method will send a signal with attached long sections.  If
1096  * the signal is longer than CHUNK_SZ, the signal will be split into
1097  * multiple CHUNK_SZ fragments.
1098  *
1099  * This is done by sending two or more long signals(fragments), with the
1100  * original GSN, but different signal data and with as much of the long
1101  * sections as will fit in each.
1102  *
1103  * Non-final fragment signals contain a fraginfo value in the header
1104  * (1= first fragment, 2= intermediate fragment, 3= final fragment)
1105  *
1106  * Fragment signals contain additional words in their signals :
1107  *   1..n words Mapping section numbers in fragment signal to original
1108  *              signal section numbers
1109  *   1 word     Fragmented signal unique id.
1110  *
1111  * Non final fragments (fraginfo=1/2) only have this data in them.  Final
1112  * fragments have this data in addition to the normal signal data.
1113  *
1114  * Each fragment signal can transport one or more long sections, starting
1115  * with section 0.  Sections are always split on NDB_SECTION_SEGMENT_SZ word
1116  * boundaries to simplify reassembly in the kernel.
1117  */
1118 int
sendFragmentedSignal(const NdbApiSignal * inputSignal,NodeId aNode,const GenericSectionPtr ptr[3],Uint32 secs)1119 TransporterFacade::sendFragmentedSignal(const NdbApiSignal* inputSignal,
1120                                         NodeId aNode,
1121                                         const GenericSectionPtr ptr[3],
1122                                         Uint32 secs)
1123 {
1124   NdbApiSignal copySignal(* inputSignal);
1125   NdbApiSignal* aSignal = &copySignal;
1126 
1127   unsigned i;
1128   Uint32 totalSectionLength= 0;
1129   for (i= 0; i < secs; i++)
1130     totalSectionLength+= ptr[i].sz;
1131 
1132   /* If there's no need to fragment, send normally */
1133   if (totalSectionLength <= CHUNK_SZ)
1134     return sendSignal(aSignal, aNode, ptr, secs);
1135 
1136   // TODO : Consider tracing fragment signals?
1137 #ifdef API_TRACE
1138   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1139     SignalHeader tmp = * aSignal;
1140     tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1141     signalLogger.sendSignal(tmp,
1142                             1,
1143                             aSignal->getConstDataPtrSend(),
1144                             aNode, ptr, 0);
1145     signalLogger.flushSignalLog();
1146     for (Uint32 i = 0; i<secs; i++)
1147       ptr[i].sectionIter->reset();
1148   }
1149 #endif
1150 
1151   NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
1152   GenericSectionPtr tmp_ptr[3];
1153   GenericSectionPtr empty= {0, NULL};
1154   Uint32 unique_id= m_fragmented_signal_id++; // next unique id
1155 
1156   /* Init tmp_ptr array from ptr[] array, make sure we have
1157    * 0 length for missing sections
1158    */
1159   for (i= 0; i < 3; i++)
1160     tmp_ptr[i]= (i < secs)? ptr[i] : empty;
1161 
1162   /* Create our section iterator adapters */
1163   FragmentedSectionIterator sec0(tmp_ptr[0]);
1164   FragmentedSectionIterator sec1(tmp_ptr[1]);
1165   FragmentedSectionIterator sec2(tmp_ptr[2]);
1166 
1167   /* Replace caller's iterators with ours */
1168   tmp_ptr[0].sectionIter= &sec0;
1169   tmp_ptr[1].sectionIter= &sec1;
1170   tmp_ptr[2].sectionIter= &sec2;
1171 
1172   unsigned start_i= 0;
1173   unsigned this_chunk_sz= 0;
1174   unsigned fragment_info= 0;
1175   Uint32 *tmp_signal_data= tmp_signal.getDataPtrSend();
1176   for (i= 0; i < secs;) {
1177     unsigned remaining_sec_sz= tmp_ptr[i].sz;
1178     tmp_signal_data[i-start_i]= i;
1179     if (this_chunk_sz + remaining_sec_sz <= CHUNK_SZ)
1180     {
1181       /* This section fits whole, move onto next */
1182       this_chunk_sz+= remaining_sec_sz;
1183       i++;
1184     }
1185     else
1186     {
1187       /* This section doesn't fit, truncate it */
1188       unsigned send_sz= CHUNK_SZ - this_chunk_sz;
1189       if (i != start_i)
1190       {
1191         /* We ensure that the first piece of a new section which is
1192          * being truncated is a multiple of NDB_SECTION_SEGMENT_SZ
1193          * (to simplify reassembly).  Subsequent non-truncated pieces
1194          * will be CHUNK_SZ which is a multiple of NDB_SECTION_SEGMENT_SZ
1195          * The final piece does not need to be a multiple of
1196          * NDB_SECTION_SEGMENT_SZ
1197          *
1198          * Note that this can push this_chunk_sz above CHUNK_SZ
1199          * Should probably round-down, but need to be careful of
1200          * 'can't fit any' cases.  Instead, CHUNK_SZ is defined
1201          * with some slack below MAX_SENT_MESSAGE_BYTESIZE
1202          */
1203 	send_sz=
1204 	  NDB_SECTION_SEGMENT_SZ
1205 	  *((send_sz+NDB_SECTION_SEGMENT_SZ-1)
1206             /NDB_SECTION_SEGMENT_SZ);
1207         if (send_sz > remaining_sec_sz)
1208 	  send_sz= remaining_sec_sz;
1209       }
1210 
1211       /* Modify tmp generic section ptr to describe truncated
1212        * section
1213        */
1214       tmp_ptr[i].sz= send_sz;
1215       FragmentedSectionIterator* fragIter=
1216         (FragmentedSectionIterator*) tmp_ptr[i].sectionIter;
1217       const Uint32 total_sec_sz= ptr[i].sz;
1218       const Uint32 start= (total_sec_sz - remaining_sec_sz);
1219       bool ok= fragIter->setRange(start, send_sz);
1220       assert(ok);
1221       if (!ok)
1222         return -1;
1223 
1224       if (fragment_info < 2) // 1 = first fragment signal
1225                              // 2 = middle fragments
1226 	fragment_info++;
1227 
1228       // send tmp_signal
1229       tmp_signal_data[i-start_i+1]= unique_id;
1230       tmp_signal.setLength(i-start_i+2);
1231       tmp_signal.m_fragmentInfo= fragment_info;
1232       tmp_signal.m_noOfSections= i-start_i+1;
1233       // do prepare send
1234       {
1235 	SendStatus ss = theTransporterRegistry->prepareSend
1236 	  (&tmp_signal,
1237 	   1, /*JBB*/
1238 	   tmp_signal_data,
1239 	   aNode,
1240 	   &tmp_ptr[start_i]);
1241 	assert(ss != SEND_MESSAGE_TOO_BIG);
1242 	if (ss != SEND_OK) return -1;
1243         if (ss == SEND_OK)
1244         {
1245           assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1246                  tmp_signal.readSignalNumber() == GSN_API_REGREQ);
1247         }
1248       }
1249       // setup variables for next signal
1250       start_i= i;
1251       this_chunk_sz= 0;
1252       assert(remaining_sec_sz >= send_sz);
1253       Uint32 remaining= remaining_sec_sz - send_sz;
1254       tmp_ptr[i].sz= remaining;
1255       /* Set sub-range iterator to cover remaining words */
1256       ok= fragIter->setRange(start+send_sz, remaining);
1257       assert(ok);
1258       if (!ok)
1259         return -1;
1260 
1261       if (remaining == 0)
1262         /* This section's done, move onto the next */
1263 	i++;
1264     }
1265   }
1266 
1267   unsigned a_sz= aSignal->getLength();
1268 
1269   if (fragment_info > 0) {
1270     // update the original signal to include section info
1271     Uint32 *a_data= aSignal->getDataPtrSend();
1272     unsigned tmp_sz= i-start_i;
1273     memcpy(a_data+a_sz,
1274 	   tmp_signal_data,
1275 	   tmp_sz*sizeof(Uint32));
1276     a_data[a_sz+tmp_sz]= unique_id;
1277     aSignal->setLength(a_sz+tmp_sz+1);
1278 
1279     // send last fragment
1280     aSignal->m_fragmentInfo= 3; // 3 = last fragment
1281     aSignal->m_noOfSections= i-start_i;
1282   } else {
1283     aSignal->m_noOfSections= secs;
1284   }
1285 
1286   // send aSignal
1287   int ret;
1288   {
1289     SendStatus ss = theTransporterRegistry->prepareSend
1290       (aSignal,
1291        1/*JBB*/,
1292        aSignal->getConstDataPtrSend(),
1293        aNode,
1294        &tmp_ptr[start_i]);
1295     assert(ss != SEND_MESSAGE_TOO_BIG);
1296     if (ss == SEND_OK)
1297     {
1298       assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1299              aSignal->readSignalNumber() == GSN_API_REGREQ);
1300     }
1301     ret = (ss == SEND_OK ? 0 : -1);
1302   }
1303   aSignal->m_noOfSections = 0;
1304   aSignal->m_fragmentInfo = 0;
1305   aSignal->setLength(a_sz);
1306   return ret;
1307 }
1308 
1309 int
sendFragmentedSignal(const NdbApiSignal * aSignal,NodeId aNode,const LinearSectionPtr ptr[3],Uint32 secs)1310 TransporterFacade::sendFragmentedSignal(const NdbApiSignal* aSignal,
1311                                         NodeId aNode,
1312                                         const LinearSectionPtr ptr[3],
1313                                         Uint32 secs)
1314 {
1315   /* Use the GenericSection variant of sendFragmentedSignal */
1316   GenericSectionPtr tmpPtr[3];
1317   LinearSectionPtr linCopy[3];
1318   const LinearSectionPtr empty= {0, NULL};
1319 
1320   /* Make sure all of linCopy is initialised */
1321   for (Uint32 j=0; j<3; j++)
1322     linCopy[j]= (j < secs)? ptr[j] : empty;
1323 
1324   LinearSectionIterator zero (linCopy[0].p, linCopy[0].sz);
1325   LinearSectionIterator one  (linCopy[1].p, linCopy[1].sz);
1326   LinearSectionIterator two  (linCopy[2].p, linCopy[2].sz);
1327 
1328   /* Build GenericSectionPtr array using iterators */
1329   tmpPtr[0].sz= linCopy[0].sz;
1330   tmpPtr[0].sectionIter= &zero;
1331   tmpPtr[1].sz= linCopy[1].sz;
1332   tmpPtr[1].sectionIter= &one;
1333   tmpPtr[2].sz= linCopy[2].sz;
1334   tmpPtr[2].sectionIter= &two;
1335 
1336   return sendFragmentedSignal(aSignal, aNode, tmpPtr, secs);
1337 }
1338 
1339 
1340 int
sendSignal(const NdbApiSignal * aSignal,NodeId aNode,const LinearSectionPtr ptr[3],Uint32 secs)1341 TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
1342                               const LinearSectionPtr ptr[3], Uint32 secs)
1343 {
1344   Uint32 save = aSignal->m_noOfSections;
1345   const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = secs;
1346 #ifdef API_TRACE
1347   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1348     SignalHeader tmp = * aSignal;
1349     tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1350     signalLogger.sendSignal(tmp,
1351                             1,
1352                             aSignal->getConstDataPtrSend(),
1353                             aNode, ptr, secs);
1354     signalLogger.flushSignalLog();
1355   }
1356 #endif
1357   SendStatus ss = theTransporterRegistry->prepareSend
1358     (aSignal,
1359      1, // JBB
1360      aSignal->getConstDataPtrSend(),
1361      aNode,
1362      ptr);
1363   assert(ss != SEND_MESSAGE_TOO_BIG);
1364   const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = save;
1365   if (ss == SEND_OK)
1366   {
1367     assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1368            aSignal->readSignalNumber() == GSN_API_REGREQ);
1369   }
1370   return (ss == SEND_OK ? 0 : -1);
1371 }
1372 
1373 int
sendSignal(const NdbApiSignal * aSignal,NodeId aNode,const GenericSectionPtr ptr[3],Uint32 secs)1374 TransporterFacade::sendSignal(const NdbApiSignal* aSignal, NodeId aNode,
1375                               const GenericSectionPtr ptr[3], Uint32 secs)
1376 {
1377   Uint32 save = aSignal->m_noOfSections;
1378   const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = secs;
1379 #ifdef API_TRACE
1380   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1381     SignalHeader tmp = * aSignal;
1382     tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1383     signalLogger.sendSignal(tmp,
1384                             1,
1385                             aSignal->getConstDataPtrSend(),
1386                             aNode, ptr, secs);
1387     signalLogger.flushSignalLog();
1388     for (Uint32 i = 0; i<secs; i++)
1389       ptr[i].sectionIter->reset();
1390   }
1391 #endif
1392   SendStatus ss = theTransporterRegistry->prepareSend
1393     (aSignal,
1394      1, // JBB
1395      aSignal->getConstDataPtrSend(),
1396      aNode,
1397      ptr);
1398   assert(ss != SEND_MESSAGE_TOO_BIG);
1399   const_cast<NdbApiSignal*>(aSignal)->m_noOfSections = save;
1400   if (ss == SEND_OK)
1401   {
1402     assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1403            aSignal->readSignalNumber() == GSN_API_REGREQ);
1404   }
1405   return (ss == SEND_OK ? 0 : -1);
1406 }
1407 
1408 /******************************************************************************
1409  * CONNECTION METHODS  Etc
1410  ******************************************************************************/
1411 void
doConnect(int aNodeId)1412 TransporterFacade::doConnect(int aNodeId){
1413   theTransporterRegistry->setIOState(aNodeId, NoHalt);
1414   theTransporterRegistry->do_connect(aNodeId);
1415 }
1416 
1417 void
doDisconnect(int aNodeId)1418 TransporterFacade::doDisconnect(int aNodeId)
1419 {
1420   theTransporterRegistry->do_disconnect(aNodeId);
1421 }
1422 
1423 void
reportConnected(int aNodeId)1424 TransporterFacade::reportConnected(int aNodeId)
1425 {
1426   theClusterMgr->reportConnected(aNodeId);
1427   return;
1428 }
1429 
1430 void
reportDisconnected(int aNodeId)1431 TransporterFacade::reportDisconnected(int aNodeId)
1432 {
1433   theClusterMgr->reportDisconnected(aNodeId);
1434   return;
1435 }
1436 
1437 NodeId
ownId() const1438 TransporterFacade::ownId() const
1439 {
1440   return theOwnId;
1441 }
1442 
1443 bool
isConnected(NodeId aNodeId)1444 TransporterFacade::isConnected(NodeId aNodeId){
1445   return theTransporterRegistry->is_connected(aNodeId);
1446 }
1447 
1448 NodeId
get_an_alive_node()1449 TransporterFacade::get_an_alive_node()
1450 {
1451   DBUG_ENTER("TransporterFacade::get_an_alive_node");
1452   DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
1453 #ifdef VM_TRACE
1454   const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
1455   if (p != 0 && *p != 0)
1456     return atoi(p);
1457 #endif
1458   NodeId i;
1459   for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
1460     if (get_node_alive(i)){
1461       DBUG_PRINT("info", ("Node %d is alive", i));
1462       theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1463       DBUG_RETURN(i);
1464     }
1465   }
1466   for (i = 1; i < theStartNodeId; i++) {
1467     if (get_node_alive(i)){
1468       DBUG_PRINT("info", ("Node %d is alive", i));
1469       theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1470       DBUG_RETURN(i);
1471     }
1472   }
1473   DBUG_RETURN((NodeId)0);
1474 }
1475 
ThreadData(Uint32 size)1476 TransporterFacade::ThreadData::ThreadData(Uint32 size){
1477   m_use_cnt = 0;
1478   m_firstFree = END_OF_LIST;
1479   expand(size);
1480 }
1481 
1482 void
expand(Uint32 size)1483 TransporterFacade::ThreadData::expand(Uint32 size){
1484   trp_client * oe = 0;
1485 
1486   const Uint32 sz = m_statusNext.size();
1487   m_objectExecute.fill(sz + size, oe);
1488   for(Uint32 i = 0; i<size; i++){
1489     m_statusNext.push_back(sz + i + 1);
1490   }
1491 
1492   m_statusNext.back() = m_firstFree;
1493   m_firstFree = m_statusNext.size() - size;
1494 }
1495 
1496 
1497 int
open(trp_client * clnt)1498 TransporterFacade::ThreadData::open(trp_client * clnt)
1499 {
1500   Uint32 nextFree = m_firstFree;
1501 
1502   if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
1503     return -1;
1504   }
1505 
1506   if(nextFree == END_OF_LIST){
1507     expand(10);
1508     nextFree = m_firstFree;
1509   }
1510 
1511   m_use_cnt++;
1512   m_firstFree = m_statusNext[nextFree];
1513 
1514   m_statusNext[nextFree] = INACTIVE;
1515   m_objectExecute[nextFree] = clnt;
1516 
1517   return indexToNumber(nextFree);
1518 }
1519 
1520 int
close(int number)1521 TransporterFacade::ThreadData::close(int number){
1522   number= numberToIndex(number);
1523   assert(m_objectExecute[number] != 0);
1524   m_statusNext[number] = m_firstFree;
1525   assert(m_use_cnt);
1526   m_use_cnt--;
1527   m_firstFree = number;
1528   m_objectExecute[number] = 0;
1529   return 0;
1530 }
1531 
1532 Uint32
get_active_ndb_objects() const1533 TransporterFacade::get_active_ndb_objects() const
1534 {
1535   return m_threads.m_use_cnt;
1536 }
1537 
1538 
1539 void
start_poll(trp_client * clnt)1540 TransporterFacade::start_poll(trp_client* clnt)
1541 {
1542   lock_mutex();
1543   clnt->m_poll.m_locked = true;
1544 }
1545 
1546 void
do_poll(trp_client * clnt,Uint32 wait_time)1547 TransporterFacade::do_poll(trp_client* clnt, Uint32 wait_time)
1548 {
1549   clnt->m_poll.m_waiting = true;
1550   assert(clnt->m_poll.m_locked == true);
1551   trp_client* owner = m_poll_owner;
1552   if (owner != NULL && owner != clnt)
1553   {
1554     /*
1555       We didn't get hold of the poll "right". We will sleep on a
1556       conditional mutex until the thread owning the poll "right"
1557       will wake us up after all data is received. If no data arrives
1558       we will wake up eventually due to the timeout.
1559       After receiving all data we take the object out of the cond wait
1560       queue if it hasn't happened already. It is usually already out of the
1561       queue but at time-out it could be that the object is still there.
1562     */
1563     assert(clnt->m_poll.m_poll_owner == false);
1564     add_to_poll_queue(clnt);
1565     NdbCondition_WaitTimeout(clnt->m_poll.m_condition, theMutexPtr,
1566                              wait_time);
1567     if (clnt != m_poll_owner && clnt->m_poll.m_waiting)
1568     {
1569       remove_from_poll_queue(clnt);
1570     }
1571   }
1572   else
1573   {
1574     /*
1575       We got the poll "right" and we poll until data is received. After
1576       receiving data we will check if all data is received, if not we
1577       poll again.
1578     */
1579     assert(owner == clnt || clnt->m_poll.m_poll_owner == false);
1580     m_poll_owner = clnt;
1581     clnt->m_poll.m_poll_owner = true;
1582     external_poll(wait_time);
1583   }
1584 }
1585 
1586 void
wakeup(trp_client * clnt)1587 TransporterFacade::wakeup(trp_client* clnt)
1588 {
1589   if (clnt->m_poll.m_waiting)
1590   {
1591     clnt->m_poll.m_waiting = false;
1592     if (m_poll_owner != clnt)
1593     {
1594       remove_from_poll_queue(clnt);
1595       NdbCondition_Signal(clnt->m_poll.m_condition);
1596     }
1597   }
1598 }
1599 
1600 void
complete_poll(trp_client * clnt)1601 TransporterFacade::complete_poll(trp_client* clnt)
1602 {
1603   clnt->m_poll.m_waiting = false;
1604   if (!clnt->m_poll.m_locked)
1605   {
1606     assert(clnt->m_poll.m_poll_owner == false);
1607     return;
1608   }
1609 
1610   /*
1611    When completing the poll for this thread we must return the poll
1612    ownership if we own it. We will give it to the last thread that
1613    came here (the most recent) which is likely to be the one also
1614    last to complete. We will remove that thread from the conditional
1615    wait queue and set him as the new owner of the poll "right".
1616    We will wait however with the signal until we have unlocked the
1617    mutex for performance reasons.
1618    See Stevens book on Unix NetworkProgramming: The Sockets Networking
1619    API Volume 1 Third Edition on page 703-704 for a discussion on this
1620    subject.
1621   */
1622   trp_client* new_owner = 0;
1623   if (m_poll_owner == clnt)
1624   {
1625     assert(clnt->m_poll.m_poll_owner == true);
1626     m_poll_owner = new_owner = remove_last_from_poll_queue();
1627   }
1628   if (new_owner)
1629   {
1630     assert(new_owner->m_poll.m_poll_owner == false);
1631     assert(new_owner->m_poll.m_locked == true);
1632     assert(new_owner->m_poll.m_waiting == true);
1633     NdbCondition_Signal(new_owner->m_poll.m_condition);
1634     new_owner->m_poll.m_poll_owner = true;
1635   }
1636   clnt->m_poll.m_locked = false;
1637   clnt->m_poll.m_poll_owner = false;
1638   unlock_mutex();
1639 }
1640 
1641 void
add_to_poll_queue(trp_client * clnt)1642 TransporterFacade::add_to_poll_queue(trp_client* clnt)
1643 {
1644   assert(clnt != 0);
1645   assert(clnt->m_poll.m_prev == 0);
1646   assert(clnt->m_poll.m_next == 0);
1647   assert(clnt->m_poll.m_locked == true);
1648   assert(clnt->m_poll.m_poll_owner == false);
1649 
1650   if (m_poll_queue_head == 0)
1651   {
1652     assert(m_poll_queue_tail == 0);
1653     m_poll_queue_head = clnt;
1654     m_poll_queue_tail = clnt;
1655   }
1656   else
1657   {
1658     assert(m_poll_queue_tail->m_poll.m_next == 0);
1659     m_poll_queue_tail->m_poll.m_next = clnt;
1660     clnt->m_poll.m_prev = m_poll_queue_tail;
1661     m_poll_queue_tail = clnt;
1662   }
1663 }
1664 
1665 void
remove_from_poll_queue(trp_client * clnt)1666 TransporterFacade::remove_from_poll_queue(trp_client* clnt)
1667 {
1668   assert(clnt != 0);
1669   assert(clnt->m_poll.m_locked == true);
1670   assert(clnt->m_poll.m_poll_owner == false);
1671 
1672   if (clnt->m_poll.m_prev != 0)
1673   {
1674     clnt->m_poll.m_prev->m_poll.m_next = clnt->m_poll.m_next;
1675   }
1676   else
1677   {
1678     assert(m_poll_queue_head == clnt);
1679     m_poll_queue_head = clnt->m_poll.m_next;
1680   }
1681 
1682   if (clnt->m_poll.m_next != 0)
1683   {
1684     clnt->m_poll.m_next->m_poll.m_prev = clnt->m_poll.m_prev;
1685   }
1686   else
1687   {
1688     assert(m_poll_queue_tail == clnt);
1689     m_poll_queue_tail = clnt->m_poll.m_prev;
1690   }
1691 
1692   if (m_poll_queue_head == 0)
1693     assert(m_poll_queue_tail == 0);
1694   else if (m_poll_queue_tail == 0)
1695     assert(m_poll_queue_head == 0);
1696 
1697   clnt->m_poll.m_prev = 0;
1698   clnt->m_poll.m_next = 0;
1699 }
1700 
1701 trp_client*
remove_last_from_poll_queue()1702 TransporterFacade::remove_last_from_poll_queue()
1703 {
1704   trp_client * clnt = m_poll_queue_tail;
1705   if (clnt == 0)
1706     return 0;
1707 
1708   remove_from_poll_queue(clnt);
1709   return clnt;
1710 }
1711 
1712 template class Vector<trp_client*>;
1713 
1714 #include "SignalSender.hpp"
1715 
1716 const Uint32*
getNextWords(Uint32 & sz)1717 SignalSectionIterator::getNextWords(Uint32& sz)
1718 {
1719   if (likely(currentSignal != NULL))
1720   {
1721     NdbApiSignal* signal= currentSignal;
1722     currentSignal= currentSignal->next();
1723     sz= signal->getLength();
1724     return signal->getDataPtrSend();
1725   }
1726   sz= 0;
1727   return NULL;
1728 }
1729 
1730 #ifdef UNIT_TEST
1731 
1732 // Unit test code starts
1733 #include <random.h>
1734 
1735 #define VERIFY(x) if ((x) == 0) { printf("VERIFY failed at Line %u : %s\n",__LINE__, #x);  return -1; }
1736 
1737 /* Verify that word[n] == bias + n */
1738 int
verifyIteratorContents(GenericSectionIterator & gsi,int dataWords,int bias)1739 verifyIteratorContents(GenericSectionIterator& gsi, int dataWords, int bias)
1740 {
1741   int pos= 0;
1742 
1743   while (pos < dataWords)
1744   {
1745     const Uint32* readPtr=NULL;
1746     Uint32 len= 0;
1747 
1748     readPtr= gsi.getNextWords(len);
1749 
1750     VERIFY(readPtr != NULL);
1751     VERIFY(len != 0);
1752     VERIFY(len <= (Uint32) (dataWords - pos));
1753 
1754     for (int j=0; j < (int) len; j++)
1755       VERIFY(readPtr[j] == (Uint32) (bias ++));
1756 
1757     pos += len;
1758   }
1759 
1760   return 0;
1761 }
1762 
1763 int
checkGenericSectionIterator(GenericSectionIterator & iter,int size,int bias)1764 checkGenericSectionIterator(GenericSectionIterator& iter, int size, int bias)
1765 {
1766   /* Verify contents */
1767   VERIFY(verifyIteratorContents(iter, size, bias) == 0);
1768 
1769   Uint32 sz;
1770 
1771   /* Check that iterator is empty now */
1772   VERIFY(iter.getNextWords(sz) == NULL);
1773   VERIFY(sz == 0);
1774 
1775   VERIFY(iter.getNextWords(sz) == NULL);
1776   VERIFY(sz == 0);
1777 
1778   iter.reset();
1779 
1780   /* Verify reset put us back to the start */
1781   VERIFY(verifyIteratorContents(iter, size, bias) == 0);
1782 
1783   /* Verify no more words available */
1784   VERIFY(iter.getNextWords(sz) == NULL);
1785   VERIFY(sz == 0);
1786 
1787   return 0;
1788 }
1789 
1790 int
checkIterator(GenericSectionIterator & iter,int size,int bias)1791 checkIterator(GenericSectionIterator& iter, int size, int bias)
1792 {
1793   /* Test iterator itself, and then FragmentedSectionIterator
1794    * adaptation
1795    */
1796   VERIFY(checkGenericSectionIterator(iter, size, bias) == 0);
1797 
1798   /* Now we'll test the FragmentedSectionIterator on the iterator
1799    * we were passed
1800    */
1801   const int subranges= 20;
1802 
1803   iter.reset();
1804   GenericSectionPtr ptr;
1805   ptr.sz= size;
1806   ptr.sectionIter= &iter;
1807   FragmentedSectionIterator fsi(ptr);
1808 
1809   for (int s=0; s< subranges; s++)
1810   {
1811     Uint32 start= 0;
1812     Uint32 len= 0;
1813     if (size > 0)
1814     {
1815       start= (Uint32) myRandom48(size);
1816       if (0 != (size-start))
1817         len= (Uint32) myRandom48(size-start);
1818     }
1819 
1820     /*
1821       printf("Range (0-%u) = (%u + %u)\n",
1822               size, start, len);
1823     */
1824     fsi.setRange(start, len);
1825     VERIFY(checkGenericSectionIterator(fsi, len, bias + start) == 0);
1826   }
1827 
1828   return 0;
1829 }
1830 
1831 
1832 
1833 int
testLinearSectionIterator()1834 testLinearSectionIterator()
1835 {
1836   /* Test Linear section iterator of various
1837    * lengths with section[n] == bias + n
1838    */
1839   const int totalSize= 200000;
1840   const int bias= 13;
1841 
1842   Uint32 data[totalSize];
1843   for (int i=0; i<totalSize; i++)
1844     data[i]= bias + i;
1845 
1846   for (int len= 0; len < 50000; len++)
1847   {
1848     LinearSectionIterator something(data, len);
1849 
1850     VERIFY(checkIterator(something, len, bias) == 0);
1851   }
1852 
1853   return 0;
1854 }
1855 
1856 NdbApiSignal*
createSignalChain(NdbApiSignal * & poolHead,int length,int bias)1857 createSignalChain(NdbApiSignal*& poolHead, int length, int bias)
1858 {
1859   /* Create signal chain, with word[n] == bias+n */
1860   NdbApiSignal* chainHead= NULL;
1861   NdbApiSignal* chainTail= NULL;
1862   int pos= 0;
1863   int signals= 0;
1864 
1865   while (pos < length)
1866   {
1867     int offset= pos % NdbApiSignal::MaxSignalWords;
1868 
1869     if (offset == 0)
1870     {
1871       if (poolHead == NULL)
1872         return 0;
1873 
1874       NdbApiSignal* newSig= poolHead;
1875       poolHead= poolHead->next();
1876       signals++;
1877 
1878       newSig->next(NULL);
1879 
1880       if (chainHead == NULL)
1881       {
1882         chainHead= chainTail= newSig;
1883       }
1884       else
1885       {
1886         chainTail->next(newSig);
1887         chainTail= newSig;
1888       }
1889     }
1890 
1891     chainTail->getDataPtrSend()[offset]= (bias + pos);
1892     chainTail->setLength(offset + 1);
1893     pos ++;
1894   }
1895 
1896   return chainHead;
1897 }
1898 
1899 int
testSignalSectionIterator()1900 testSignalSectionIterator()
1901 {
1902   /* Create a pool of signals, build
1903    * signal chains from it, test
1904    * the iterator against the signal chains
1905    */
1906   const int totalNumSignals= 1000;
1907   NdbApiSignal* poolHead= NULL;
1908 
1909   /* Allocate some signals */
1910   for (int i=0; i < totalNumSignals; i++)
1911   {
1912     NdbApiSignal* sig= new NdbApiSignal((BlockReference) 0);
1913 
1914     if (poolHead == NULL)
1915     {
1916       poolHead= sig;
1917       sig->next(NULL);
1918     }
1919     else
1920     {
1921       sig->next(poolHead);
1922       poolHead= sig;
1923     }
1924   }
1925 
1926   const int bias= 7;
1927   for (int dataWords= 1;
1928        dataWords <= (int)(totalNumSignals *
1929                           NdbApiSignal::MaxSignalWords);
1930        dataWords ++)
1931   {
1932     NdbApiSignal* signalChain= NULL;
1933 
1934     VERIFY((signalChain= createSignalChain(poolHead, dataWords, bias)) != NULL );
1935 
1936     SignalSectionIterator ssi(signalChain);
1937 
1938     VERIFY(checkIterator(ssi, dataWords, bias) == 0);
1939 
1940     /* Now return the signals to the pool */
1941     while (signalChain != NULL)
1942     {
1943       NdbApiSignal* sig= signalChain;
1944       signalChain= signalChain->next();
1945 
1946       sig->next(poolHead);
1947       poolHead= sig;
1948     }
1949   }
1950 
1951   /* Free signals from pool */
1952   while (poolHead != NULL)
1953   {
1954     NdbApiSignal* sig= poolHead;
1955     poolHead= sig->next();
1956     delete(sig);
1957   }
1958 
1959   return 0;
1960 }
1961 
main(int arg,char ** argv)1962 int main(int arg, char** argv)
1963 {
1964   /* Test Section Iterators
1965    * ----------------------
1966    * To run this code :
1967    *   cd storage/ndb/src/ndbapi
1968    *   make testSectionIterators
1969    *   ./testSectionIterators
1970    *
1971    * Will print "OK" in success case
1972    */
1973 
1974 
1975   VERIFY(testLinearSectionIterator() == 0);
1976   VERIFY(testSignalSectionIterator() == 0);
1977 
1978   printf("OK\n");
1979 
1980   return 0;
1981 }
1982 #endif
1983 
1984 void
set_auto_reconnect(int val)1985 TransporterFacade::set_auto_reconnect(int val)
1986 {
1987   theClusterMgr->m_auto_reconnect = val;
1988 }
1989 
1990 int
get_auto_reconnect() const1991 TransporterFacade::get_auto_reconnect() const
1992 {
1993   return theClusterMgr->m_auto_reconnect;
1994 }
1995 
1996 void
ext_set_max_api_reg_req_interval(Uint32 interval)1997 TransporterFacade::ext_set_max_api_reg_req_interval(Uint32 interval)
1998 {
1999   theClusterMgr->set_max_api_reg_req_interval(interval);
2000 }
2001 
2002 void
ext_update_connections()2003 TransporterFacade::ext_update_connections()
2004 {
2005   theClusterMgr->lock();
2006   theTransporterRegistry->update_connections();
2007   theClusterMgr->unlock();
2008 }
2009 
2010 struct in_addr
ext_get_connect_address(Uint32 nodeId)2011 TransporterFacade::ext_get_connect_address(Uint32 nodeId)
2012 {
2013   return theTransporterRegistry->get_connect_address(nodeId);
2014 }
2015 
2016 void
ext_forceHB()2017 TransporterFacade::ext_forceHB()
2018 {
2019   theClusterMgr->forceHB();
2020 }
2021 
2022 bool
ext_isConnected(NodeId aNodeId)2023 TransporterFacade::ext_isConnected(NodeId aNodeId)
2024 {
2025   bool val;
2026   theClusterMgr->lock();
2027   val = theClusterMgr->theNodes[aNodeId].is_connected();
2028   theClusterMgr->unlock();
2029   return val;
2030 }
2031 
2032 void
ext_doConnect(int aNodeId)2033 TransporterFacade::ext_doConnect(int aNodeId)
2034 {
2035   theClusterMgr->lock();
2036   assert(theClusterMgr->theNodes[aNodeId].is_connected() == false);
2037   doConnect(aNodeId);
2038   theClusterMgr->unlock();
2039 }
2040 
2041