1 /* Copyright (c) 2003-2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #include <ndb_global.h>
18 #include <my_pthread.h>
19 
20 #include <TransporterRegistry.hpp>
21 #include "TransporterInternalDefinitions.hpp"
22 
23 #include "Transporter.hpp"
24 #include <SocketAuthenticator.hpp>
25 
26 #ifdef NDB_TCP_TRANSPORTER
27 #include "TCP_Transporter.hpp"
28 #endif
29 
30 #ifdef NDB_SCI_TRANSPORTER
31 #include "SCI_Transporter.hpp"
32 #endif
33 
34 #ifdef NDB_SHM_TRANSPORTER
35 #include "SHM_Transporter.hpp"
36 extern int g_ndb_shm_signum;
37 #endif
38 
39 #include "TransporterCallback.hpp"
40 #include "NdbOut.hpp"
41 #include <NdbSleep.h>
42 #include <NdbTick.h>
43 #include <InputStream.hpp>
44 #include <OutputStream.hpp>
45 
46 #include <mgmapi/mgmapi.h>
47 #include <mgmapi_internal.h>
48 #include <mgmapi/mgmapi_debug.h>
49 
50 #include <EventLogger.hpp>
51 extern EventLogger g_eventLogger;
52 
53 struct in_addr
get_connect_address(NodeId node_id) const54 TransporterRegistry::get_connect_address(NodeId node_id) const
55 {
56   return theTransporters[node_id]->m_connect_address;
57 }
58 
newSession(NDB_SOCKET_TYPE sockfd)59 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
60 {
61   DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
62   if (m_auth && !m_auth->server_authenticate(sockfd)){
63     NDB_CLOSE_SOCKET(sockfd);
64     DBUG_RETURN(0);
65   }
66 
67   if (!m_transporter_registry->connect_server(sockfd))
68   {
69     NDB_CLOSE_SOCKET(sockfd);
70     DBUG_RETURN(0);
71   }
72 
73   DBUG_RETURN(0);
74 }
75 
TransporterRegistry(void * callback,unsigned _maxTransporters,unsigned sizeOfLongSignalMemory)76 TransporterRegistry::TransporterRegistry(void * callback,
77 					 unsigned _maxTransporters,
78 					 unsigned sizeOfLongSignalMemory) :
79   m_mgm_handle(0),
80   m_transp_count(0)
81 {
82   DBUG_ENTER("TransporterRegistry::TransporterRegistry");
83 
84   nodeIdSpecified = false;
85   maxTransporters = _maxTransporters;
86   sendCounter = 1;
87 
88   callbackObj=callback;
89 
90   theTCPTransporters  = new TCP_Transporter * [maxTransporters];
91   theSCITransporters  = new SCI_Transporter * [maxTransporters];
92   theSHMTransporters  = new SHM_Transporter * [maxTransporters];
93   theTransporterTypes = new TransporterType   [maxTransporters];
94   theTransporters     = new Transporter     * [maxTransporters];
95   performStates       = new PerformState      [maxTransporters];
96   ioStates            = new IOState           [maxTransporters];
97 
98   // Initialize member variables
99   nTransporters    = 0;
100   nTCPTransporters = 0;
101   nSCITransporters = 0;
102   nSHMTransporters = 0;
103 
104   // Initialize the transporter arrays
105   for (unsigned i=0; i<maxTransporters; i++) {
106     theTCPTransporters[i] = NULL;
107     theSCITransporters[i] = NULL;
108     theSHMTransporters[i] = NULL;
109     theTransporters[i]    = NULL;
110     performStates[i]      = DISCONNECTED;
111     ioStates[i]           = NoHalt;
112   }
113 
114   DBUG_VOID_RETURN;
115 }
116 
set_mgm_handle(NdbMgmHandle h)117 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
118 {
119   DBUG_ENTER("TransporterRegistry::set_mgm_handle");
120   if (m_mgm_handle)
121     ndb_mgm_destroy_handle(&m_mgm_handle);
122   m_mgm_handle= h;
123   ndb_mgm_set_timeout(m_mgm_handle, 5000);
124 #ifndef DBUG_OFF
125   if (h)
126   {
127     char buf[256];
128     DBUG_PRINT("info",("handle set with connectstring: %s",
129 		       ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
130   }
131   else
132   {
133     DBUG_PRINT("info",("handle set to NULL"));
134   }
135 #endif
136   DBUG_VOID_RETURN;
137 }
138 
~TransporterRegistry()139 TransporterRegistry::~TransporterRegistry()
140 {
141   DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
142 
143   removeAll();
144 
145   delete[] theTCPTransporters;
146   delete[] theSCITransporters;
147   delete[] theSHMTransporters;
148   delete[] theTransporterTypes;
149   delete[] theTransporters;
150   delete[] performStates;
151   delete[] ioStates;
152 
153   if (m_mgm_handle)
154     ndb_mgm_destroy_handle(&m_mgm_handle);
155 
156   DBUG_VOID_RETURN;
157 }
158 
159 void
removeAll()160 TransporterRegistry::removeAll(){
161   for(unsigned i = 0; i<maxTransporters; i++){
162     if(theTransporters[i] != NULL)
163       removeTransporter(theTransporters[i]->getRemoteNodeId());
164   }
165 }
166 
167 void
disconnectAll()168 TransporterRegistry::disconnectAll(){
169   for(unsigned i = 0; i<maxTransporters; i++){
170     if(theTransporters[i] != NULL)
171       theTransporters[i]->doDisconnect();
172   }
173 }
174 
175 bool
init(NodeId nodeId)176 TransporterRegistry::init(NodeId nodeId) {
177   DBUG_ENTER("TransporterRegistry::init");
178   nodeIdSpecified = true;
179   localNodeId = nodeId;
180 
181   DEBUG("TransporterRegistry started node: " << localNodeId);
182 
183   DBUG_RETURN(true);
184 }
185 
186 bool
connect_server(NDB_SOCKET_TYPE sockfd)187 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
188 {
189   DBUG_ENTER("TransporterRegistry::connect_server");
190 
191   // read node id from client
192   // read transporter type
193   int nodeId, remote_transporter_type= -1;
194   SocketInputStream s_input(sockfd);
195   char buf[256];
196   if (s_input.gets(buf, 256) == 0) {
197     DBUG_PRINT("error", ("Could not get node id from client"));
198     DBUG_RETURN(false);
199   }
200   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
201   switch (r) {
202   case 2:
203     break;
204   case 1:
205     // we're running version prior to 4.1.9
206     // ok, but with no checks on transporter configuration compatability
207     break;
208   default:
209     DBUG_PRINT("error", ("Error in node id from client"));
210     DBUG_RETURN(false);
211   }
212 
213   DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
214 		      nodeId,remote_transporter_type));
215 
216   //check that nodeid is valid and that there is an allocated transporter
217   if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
218     DBUG_PRINT("error", ("Node id out of range from client"));
219     DBUG_RETURN(false);
220   }
221   if (theTransporters[nodeId] == 0) {
222       DBUG_PRINT("error", ("No transporter for this node id from client"));
223       DBUG_RETURN(false);
224   }
225 
226   //check that the transporter should be connected
227   if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
228     DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
229     DBUG_RETURN(false);
230   }
231 
232   Transporter *t= theTransporters[nodeId];
233 
234   // send info about own id (just as response to acknowledge connection)
235   // send info on own transporter type
236   SocketOutputStream s_output(sockfd);
237   s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
238 
239   if (remote_transporter_type != -1)
240   {
241     if (remote_transporter_type != t->m_type)
242     {
243       DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
244 			   t->m_type, remote_transporter_type));
245       g_eventLogger.error("Incompatible configuration: Transporter type "
246 			  "mismatch with node %d", nodeId);
247 
248       // wait for socket close for 1 second to let message arrive at client
249       {
250 	fd_set a_set;
251 	FD_ZERO(&a_set);
252 	FD_SET(sockfd, &a_set);
253 	struct timeval timeout;
254 	timeout.tv_sec  = 1; timeout.tv_usec = 0;
255 	select(sockfd+1, &a_set, 0, 0, &timeout);
256       }
257       DBUG_RETURN(false);
258     }
259   }
260   else if (t->m_type == tt_SHM_TRANSPORTER)
261   {
262     g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
263   }
264 
265   // setup transporter (transporter responsible for closing sockfd)
266   t->connect_server(sockfd);
267 
268   DBUG_RETURN(true);
269 }
270 
271 bool
createTCPTransporter(TransporterConfiguration * config)272 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
273 #ifdef NDB_TCP_TRANSPORTER
274 
275   if(!nodeIdSpecified){
276     init(config->localNodeId);
277   }
278 
279   if(config->localNodeId != localNodeId)
280     return false;
281 
282   if(theTransporters[config->remoteNodeId] != NULL)
283     return false;
284 
285   TCP_Transporter * t = new TCP_Transporter(*this,
286 					    config->tcp.sendBufferSize,
287 					    config->tcp.maxReceiveSize,
288 					    config->localHostName,
289 					    config->remoteHostName,
290 					    config->s_port,
291 					    config->isMgmConnection,
292 					    localNodeId,
293 					    config->remoteNodeId,
294 					    config->serverNodeId,
295 					    config->checksum,
296 					    config->signalId);
297   if (t == NULL)
298     return false;
299   else if (!t->initTransporter()) {
300     delete t;
301     return false;
302   }
303 
304   // Put the transporter in the transporter arrays
305   theTCPTransporters[nTCPTransporters]      = t;
306   theTransporters[t->getRemoteNodeId()]     = t;
307   theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
308   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
309   nTransporters++;
310   nTCPTransporters++;
311 
312   return true;
313 #else
314   return false;
315 #endif
316 }
317 
318 bool
createSCITransporter(TransporterConfiguration * config)319 TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
320 #ifdef NDB_SCI_TRANSPORTER
321 
322   if(!SCI_Transporter::initSCI())
323     abort();
324 
325   if(!nodeIdSpecified){
326     init(config->localNodeId);
327   }
328 
329   if(config->localNodeId != localNodeId)
330     return false;
331 
332   if(theTransporters[config->remoteNodeId] != NULL)
333     return false;
334 
335   SCI_Transporter * t = new SCI_Transporter(*this,
336                                             config->localHostName,
337                                             config->remoteHostName,
338                                             config->s_port,
339 					    config->isMgmConnection,
340                                             config->sci.sendLimit,
341 					    config->sci.bufferSize,
342 					    config->sci.nLocalAdapters,
343 					    config->sci.remoteSciNodeId0,
344 					    config->sci.remoteSciNodeId1,
345 					    localNodeId,
346 					    config->remoteNodeId,
347 					    config->serverNodeId,
348 					    config->checksum,
349 					    config->signalId);
350 
351   if (t == NULL)
352     return false;
353   else if (!t->initTransporter()) {
354     delete t;
355     return false;
356   }
357   // Put the transporter in the transporter arrays
358   theSCITransporters[nSCITransporters]      = t;
359   theTransporters[t->getRemoteNodeId()]     = t;
360   theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
361   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
362   nTransporters++;
363   nSCITransporters++;
364 
365   return true;
366 #else
367   return false;
368 #endif
369 }
370 
371 bool
createSHMTransporter(TransporterConfiguration * config)372 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
373   DBUG_ENTER("TransporterRegistry::createTransporter SHM");
374 #ifdef NDB_SHM_TRANSPORTER
375   if(!nodeIdSpecified){
376     init(config->localNodeId);
377   }
378 
379   if(config->localNodeId != localNodeId)
380     return false;
381 
382   if (!g_ndb_shm_signum) {
383     g_ndb_shm_signum= config->shm.signum;
384     DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
385     /**
386      * Make sure to block g_ndb_shm_signum
387      *   TransporterRegistry::init is run from "main" thread
388      */
389     NdbThread_set_shm_sigmask(TRUE);
390   }
391 
392   if(config->shm.signum != g_ndb_shm_signum)
393     return false;
394 
395   if(theTransporters[config->remoteNodeId] != NULL)
396     return false;
397 
398   SHM_Transporter * t = new SHM_Transporter(*this,
399 					    config->localHostName,
400 					    config->remoteHostName,
401 					    config->s_port,
402 					    config->isMgmConnection,
403 					    localNodeId,
404 					    config->remoteNodeId,
405 					    config->serverNodeId,
406 					    config->checksum,
407 					    config->signalId,
408 					    config->shm.shmKey,
409 					    config->shm.shmSize
410 					    );
411   if (t == NULL)
412     return false;
413   else if (!t->initTransporter()) {
414     delete t;
415     return false;
416   }
417   // Put the transporter in the transporter arrays
418   theSHMTransporters[nSHMTransporters]      = t;
419   theTransporters[t->getRemoteNodeId()]     = t;
420   theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
421   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
422 
423   nTransporters++;
424   nSHMTransporters++;
425 
426   DBUG_RETURN(true);
427 #else
428   DBUG_RETURN(false);
429 #endif
430 }
431 
432 
433 void
removeTransporter(NodeId nodeId)434 TransporterRegistry::removeTransporter(NodeId nodeId) {
435 
436   DEBUG("Removing transporter from " << localNodeId
437 	<< " to " << nodeId);
438 
439   if(theTransporters[nodeId] == NULL)
440     return;
441 
442   theTransporters[nodeId]->doDisconnect();
443 
444   const TransporterType type = theTransporterTypes[nodeId];
445 
446   int ind = 0;
447   switch(type){
448   case tt_TCP_TRANSPORTER:
449 #ifdef NDB_TCP_TRANSPORTER
450     for(; ind < nTCPTransporters; ind++)
451       if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
452 	break;
453     ind++;
454     for(; ind<nTCPTransporters; ind++)
455       theTCPTransporters[ind-1] = theTCPTransporters[ind];
456     nTCPTransporters --;
457 #endif
458     break;
459   case tt_SCI_TRANSPORTER:
460 #ifdef NDB_SCI_TRANSPORTER
461     for(; ind < nSCITransporters; ind++)
462       if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
463 	break;
464     ind++;
465     for(; ind<nSCITransporters; ind++)
466       theSCITransporters[ind-1] = theSCITransporters[ind];
467     nSCITransporters --;
468 #endif
469     break;
470   case tt_SHM_TRANSPORTER:
471 #ifdef NDB_SHM_TRANSPORTER
472     for(; ind < nSHMTransporters; ind++)
473       if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
474 	break;
475     ind++;
476     for(; ind<nSHMTransporters; ind++)
477       theSHMTransporters[ind-1] = theSHMTransporters[ind];
478     nSHMTransporters --;
479 #endif
480     break;
481   }
482 
483   nTransporters--;
484 
485   // Delete the transporter and remove it from theTransporters array
486   delete theTransporters[nodeId];
487   theTransporters[nodeId] = NULL;
488 }
489 
490 Uint32
get_free_buffer(Uint32 node) const491 TransporterRegistry::get_free_buffer(Uint32 node) const
492 {
493   Transporter *t;
494   if(likely((t = theTransporters[node]) != 0))
495   {
496     return t->get_free_buffer();
497   }
498   return 0;
499 }
500 
501 
502 SendStatus
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const LinearSectionPtr ptr[3])503 TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
504 				 Uint8 prio,
505 				 const Uint32 * const signalData,
506 				 NodeId nodeId,
507 				 const LinearSectionPtr ptr[3]){
508 
509 
510   Transporter *t = theTransporters[nodeId];
511   if(t != NULL &&
512      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
513       ((signalHeader->theReceiversBlockNumber == 252) ||
514        (signalHeader->theReceiversBlockNumber == 4002)))) {
515 
516     if(t->isConnected()){
517       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
518       if(lenBytes <= MAX_MESSAGE_SIZE){
519 	Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
520 	if(insertPtr != 0){
521 	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
522 	  t->updateWritePtr(lenBytes, prio);
523 	  return SEND_OK;
524 	}
525 
526 	int sleepTime = 2;
527 
528 	/**
529 	 * @note: on linux/i386 the granularity is 10ms
530 	 *        so sleepTime = 2 generates a 10 ms sleep.
531 	 */
532 	for(int i = 0; i<50; i++){
533 	  if((nSHMTransporters+nSCITransporters) == 0)
534 	    NdbSleep_MilliSleep(sleepTime);
535 	  insertPtr = t->getWritePtr(lenBytes, prio);
536 	  if(insertPtr != 0){
537 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
538 	    t->updateWritePtr(lenBytes, prio);
539 	    break;
540 	  }
541 	}
542 
543 	if(insertPtr != 0){
544 	  /**
545 	   * Send buffer full, but resend works
546 	   */
547 	  reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
548 	  return SEND_OK;
549 	}
550 
551 	WARNING("Signal to " << nodeId << " lost(buffer)");
552 	reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
553 	return SEND_BUFFER_FULL;
554       } else {
555 	return SEND_MESSAGE_TOO_BIG;
556       }
557     } else {
558       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
559       return SEND_DISCONNECTED;
560     }
561   } else {
562     DEBUG("Discarding message to block: "
563 	  << signalHeader->theReceiversBlockNumber
564 	  << " node: " << nodeId);
565 
566     if(t == NULL)
567       return SEND_UNKNOWN_NODE;
568 
569     return SEND_BLOCKED;
570   }
571 }
572 
573 SendStatus
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3])574 TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
575 				 Uint8 prio,
576 				 const Uint32 * const signalData,
577 				 NodeId nodeId,
578 				 class SectionSegmentPool & thePool,
579 				 const SegmentedSectionPtr ptr[3]){
580 
581 
582   Transporter *t = theTransporters[nodeId];
583   if(t != NULL &&
584      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
585       ((signalHeader->theReceiversBlockNumber == 252)||
586        (signalHeader->theReceiversBlockNumber == 4002)))) {
587 
588     if(t->isConnected()){
589       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
590       if(lenBytes <= MAX_MESSAGE_SIZE){
591 	Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
592 	if(insertPtr != 0){
593 	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
594 	  t->updateWritePtr(lenBytes, prio);
595 	  return SEND_OK;
596 	}
597 
598 
599 	/**
600 	 * @note: on linux/i386 the granularity is 10ms
601 	 *        so sleepTime = 2 generates a 10 ms sleep.
602 	 */
603 	int sleepTime = 2;
604 	for(int i = 0; i<50; i++){
605 	  if((nSHMTransporters+nSCITransporters) == 0)
606 	    NdbSleep_MilliSleep(sleepTime);
607 	  insertPtr = t->getWritePtr(lenBytes, prio);
608 	  if(insertPtr != 0){
609 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
610 	    t->updateWritePtr(lenBytes, prio);
611 	    break;
612 	  }
613 	}
614 
615 	if(insertPtr != 0){
616 	  /**
617 	   * Send buffer full, but resend works
618 	   */
619 	  reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
620 	  return SEND_OK;
621 	}
622 
623 	WARNING("Signal to " << nodeId << " lost(buffer)");
624 	reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
625 	return SEND_BUFFER_FULL;
626       } else {
627 	return SEND_MESSAGE_TOO_BIG;
628       }
629     } else {
630       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
631       return SEND_DISCONNECTED;
632     }
633   } else {
634     DEBUG("Discarding message to block: "
635 	  << signalHeader->theReceiversBlockNumber
636 	  << " node: " << nodeId);
637 
638     if(t == NULL)
639       return SEND_UNKNOWN_NODE;
640 
641     return SEND_BLOCKED;
642   }
643 }
644 
645 void
external_IO(Uint32 timeOutMillis)646 TransporterRegistry::external_IO(Uint32 timeOutMillis) {
647   //-----------------------------------------------------------
648   // Most of the time we will send the buffers here and then wait
649   // for new signals. Thus we start by sending without timeout
650   // followed by the receive part where we expect to sleep for
651   // a while.
652   //-----------------------------------------------------------
653   if(pollReceive(timeOutMillis)){
654     performReceive();
655   }
656   performSend();
657 }
658 
659 Uint32
pollReceive(Uint32 timeOutMillis)660 TransporterRegistry::pollReceive(Uint32 timeOutMillis){
661   Uint32 retVal = 0;
662 
663   if((nSCITransporters) > 0)
664   {
665     timeOutMillis=0;
666   }
667 
668 #ifdef NDB_SHM_TRANSPORTER
669   if(nSHMTransporters > 0)
670   {
671     Uint32 res = poll_SHM(0);
672     if(res)
673     {
674       retVal |= res;
675       timeOutMillis = 0;
676     }
677   }
678 #endif
679 
680 #ifdef NDB_TCP_TRANSPORTER
681   if(nTCPTransporters > 0 || retVal == 0)
682   {
683     retVal |= poll_TCP(timeOutMillis);
684   }
685   else
686     tcpReadSelectReply = 0;
687 #endif
688 #ifdef NDB_SCI_TRANSPORTER
689   if(nSCITransporters > 0)
690     retVal |= poll_SCI(timeOutMillis);
691 #endif
692 #ifdef NDB_SHM_TRANSPORTER
693   if(nSHMTransporters > 0 && retVal == 0)
694   {
695     int res = poll_SHM(0);
696     retVal |= res;
697   }
698 #endif
699   return retVal;
700 }
701 
702 
703 #ifdef NDB_SCI_TRANSPORTER
704 Uint32
poll_SCI(Uint32 timeOutMillis)705 TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
706 {
707   for (int i=0; i<nSCITransporters; i++) {
708     SCI_Transporter * t = theSCITransporters[i];
709     if (t->isConnected()) {
710       if(t->hasDataToRead())
711 	return 1;
712     }
713   }
714   return 0;
715 }
716 #endif
717 
718 
719 #ifdef NDB_SHM_TRANSPORTER
720 static int g_shm_counter = 0;
721 Uint32
poll_SHM(Uint32 timeOutMillis)722 TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
723 {
724   for(int j=0; j < 100; j++)
725   {
726     for (int i=0; i<nSHMTransporters; i++) {
727       SHM_Transporter * t = theSHMTransporters[i];
728       if (t->isConnected()) {
729 	if(t->hasDataToRead()) {
730 	  return 1;
731 	}
732       }
733     }
734   }
735   return 0;
736 }
737 #endif
738 
739 #ifdef NDB_TCP_TRANSPORTER
740 Uint32
poll_TCP(Uint32 timeOutMillis)741 TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
742 {
743   bool hasdata = false;
744   if (false && nTCPTransporters == 0)
745   {
746     tcpReadSelectReply = 0;
747     return 0;
748   }
749 
750   NDB_SOCKET_TYPE maxSocketValue = -1;
751 
752   // Needed for TCP/IP connections
753   // The read- and writeset are used by select
754 
755   FD_ZERO(&tcpReadset);
756 
757   // Prepare for sending and receiving
758   for (int i = 0; i < nTCPTransporters; i++) {
759     TCP_Transporter * t = theTCPTransporters[i];
760 
761     // If the transporter is connected
762     NodeId nodeId = t->getRemoteNodeId();
763     if (is_connected(nodeId) && t->isConnected()) {
764 
765       const NDB_SOCKET_TYPE socket = t->getSocket();
766       // Find the highest socket value. It will be used by select
767       if (socket > maxSocketValue)
768 	maxSocketValue = socket;
769 
770       // Put the connected transporters in the socket read-set
771       FD_SET(socket, &tcpReadset);
772     }
773     hasdata |= t->hasReceiveData();
774   }
775 
776   timeOutMillis = hasdata ? 0 : timeOutMillis;
777 
778   struct timeval timeout;
779   timeout.tv_sec  = timeOutMillis / 1000;
780   timeout.tv_usec = (timeOutMillis % 1000) * 1000;
781 
782   // The highest socket value plus one
783   maxSocketValue++;
784 
785   tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
786   if(false && tcpReadSelectReply == -1 && errno == EINTR)
787     g_eventLogger.info("woke-up by signal");
788 
789 #ifdef NDB_WIN32
790   if(tcpReadSelectReply == SOCKET_ERROR)
791   {
792     NdbSleep_MilliSleep(timeOutMillis);
793   }
794 #endif
795 
796   return tcpReadSelectReply || hasdata;
797 }
798 #endif
799 
800 
801 void
performReceive()802 TransporterRegistry::performReceive()
803 {
804 #ifdef NDB_TCP_TRANSPORTER
805   for (int i=0; i<nTCPTransporters; i++)
806   {
807     checkJobBuffer();
808     TCP_Transporter *t = theTCPTransporters[i];
809     const NodeId nodeId = t->getRemoteNodeId();
810     const NDB_SOCKET_TYPE socket    = t->getSocket();
811     if(is_connected(nodeId)){
812       if(t->isConnected())
813       {
814         if (FD_ISSET(socket, &tcpReadset))
815 	{
816 	  t->doReceive();
817         }
818 
819         if (t->hasReceiveData())
820         {
821           Uint32 * ptr;
822           Uint32 sz = t->getReceiveData(&ptr);
823           transporter_recv_from(callbackObj, nodeId);
824           Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
825           t->updateReceiveDataPtr(szUsed);
826 	}
827       }
828     }
829   }
830 #endif
831 
832 #ifdef NDB_SCI_TRANSPORTER
833   //performReceive
834   //do prepareReceive on the SCI transporters  (prepareReceive(t,,,,))
835   for (int i=0; i<nSCITransporters; i++)
836   {
837     checkJobBuffer();
838     SCI_Transporter  *t = theSCITransporters[i];
839     const NodeId nodeId = t->getRemoteNodeId();
840     if(is_connected(nodeId))
841     {
842       if(t->isConnected() && t->checkConnected())
843       {
844 	Uint32 * readPtr, * eodPtr;
845 	t->getReceivePtr(&readPtr, &eodPtr);
846 	transporter_recv_from(callbackObj, nodeId);
847 	Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
848 	t->updateReceivePtr(newPtr);
849       }
850     }
851   }
852 #endif
853 #ifdef NDB_SHM_TRANSPORTER
854   for (int i=0; i<nSHMTransporters; i++)
855   {
856     checkJobBuffer();
857     SHM_Transporter *t = theSHMTransporters[i];
858     const NodeId nodeId = t->getRemoteNodeId();
859     if(is_connected(nodeId)){
860       if(t->isConnected() && t->checkConnected())
861       {
862 	Uint32 * readPtr, * eodPtr;
863 	t->getReceivePtr(&readPtr, &eodPtr);
864 	transporter_recv_from(callbackObj, nodeId);
865 	Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
866 	t->updateReceivePtr(newPtr);
867       }
868     }
869   }
870 #endif
871 }
872 
873 void
performSend()874 TransporterRegistry::performSend()
875 {
876   int i;
877   sendCounter = 1;
878 
879 #ifdef NDB_TCP_TRANSPORTER
880   for (i = m_transp_count; i < nTCPTransporters; i++)
881   {
882     TCP_Transporter *t = theTCPTransporters[i];
883     if (t && t->hasDataToSend() && t->isConnected() &&
884 	is_connected(t->getRemoteNodeId()))
885     {
886       t->doSend();
887     }
888   }
889   for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
890   {
891     TCP_Transporter *t = theTCPTransporters[i];
892     if (t && t->hasDataToSend() && t->isConnected() &&
893 	is_connected(t->getRemoteNodeId()))
894     {
895       t->doSend();
896     }
897   }
898   m_transp_count++;
899   if (m_transp_count == nTCPTransporters) m_transp_count = 0;
900 #endif
901 #ifdef NDB_SCI_TRANSPORTER
902   //scroll through the SCI transporters,
903   // get each transporter, check if connected, send data
904   for (i=0; i<nSCITransporters; i++) {
905     SCI_Transporter  *t = theSCITransporters[i];
906     const NodeId nodeId = t->getRemoteNodeId();
907 
908     if(is_connected(nodeId))
909     {
910       if(t->isConnected() && t->hasDataToSend()) {
911 	t->doSend();
912       } //if
913     } //if
914   }
915 #endif
916 
917 #ifdef NDB_SHM_TRANSPORTER
918   for (i=0; i<nSHMTransporters; i++)
919   {
920     SHM_Transporter  *t = theSHMTransporters[i];
921     const NodeId nodeId = t->getRemoteNodeId();
922     if(is_connected(nodeId))
923     {
924       if(t->isConnected())
925       {
926 	t->doSend();
927       }
928     }
929   }
930 #endif
931 }
932 
933 int
forceSendCheck(int sendLimit)934 TransporterRegistry::forceSendCheck(int sendLimit){
935   int tSendCounter = sendCounter;
936   sendCounter = tSendCounter + 1;
937   if (tSendCounter >= sendLimit) {
938     performSend();
939     sendCounter = 1;
940     return 1;
941   }//if
942   return 0;
943 }//TransporterRegistry::forceSendCheck()
944 
945 #ifdef DEBUG_TRANSPORTER
946 void
printState()947 TransporterRegistry::printState(){
948   ndbout << "-- TransporterRegistry -- " << endl << endl
949 	 << "Transporters = " << nTransporters << endl;
950   for(int i = 0; i<maxTransporters; i++)
951     if(theTransporters[i] != NULL){
952       const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
953       ndbout << "Transporter: " << remoteNodeId
954 	     << " PerformState: " << performStates[remoteNodeId]
955 	     << " IOState: " << ioStates[remoteNodeId] << endl;
956     }
957 }
958 #endif
959 
960 IOState
ioState(NodeId nodeId)961 TransporterRegistry::ioState(NodeId nodeId) {
962   return ioStates[nodeId];
963 }
964 
965 void
setIOState(NodeId nodeId,IOState state)966 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
967   DEBUG("TransporterRegistry::setIOState("
968 	<< nodeId << ", " << state << ")");
969   ioStates[nodeId] = state;
970 }
971 
972 static void *
run_start_clients_C(void * me)973 run_start_clients_C(void * me)
974 {
975   ((TransporterRegistry*) me)->start_clients_thread();
976   return 0;
977 }
978 
979 // Run by kernel thread
980 void
do_connect(NodeId node_id)981 TransporterRegistry::do_connect(NodeId node_id)
982 {
983   PerformState &curr_state = performStates[node_id];
984   switch(curr_state){
985   case DISCONNECTED:
986     break;
987   case CONNECTED:
988     return;
989   case CONNECTING:
990     return;
991   case DISCONNECTING:
992     break;
993   }
994   DBUG_ENTER("TransporterRegistry::do_connect");
995   DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
996   curr_state= CONNECTING;
997   DBUG_VOID_RETURN;
998 }
999 void
do_disconnect(NodeId node_id)1000 TransporterRegistry::do_disconnect(NodeId node_id)
1001 {
1002   PerformState &curr_state = performStates[node_id];
1003   switch(curr_state){
1004   case DISCONNECTED:
1005     return;
1006   case CONNECTED:
1007     break;
1008   case CONNECTING:
1009     break;
1010   case DISCONNECTING:
1011     return;
1012   }
1013   DBUG_ENTER("TransporterRegistry::do_disconnect");
1014   DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1015   curr_state= DISCONNECTING;
1016   DBUG_VOID_RETURN;
1017 }
1018 
1019 void
report_connect(NodeId node_id)1020 TransporterRegistry::report_connect(NodeId node_id)
1021 {
1022   DBUG_ENTER("TransporterRegistry::report_connect");
1023   DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1024   performStates[node_id] = CONNECTED;
1025   reportConnect(callbackObj, node_id);
1026   DBUG_VOID_RETURN;
1027 }
1028 
1029 void
report_disconnect(NodeId node_id,int errnum)1030 TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
1031 {
1032   DBUG_ENTER("TransporterRegistry::report_disconnect");
1033   DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1034   performStates[node_id] = DISCONNECTED;
1035   reportDisconnect(callbackObj, node_id, errnum);
1036   DBUG_VOID_RETURN;
1037 }
1038 
1039 void
update_connections()1040 TransporterRegistry::update_connections()
1041 {
1042   for (int i= 0, n= 0; n < nTransporters; i++){
1043     Transporter * t = theTransporters[i];
1044     if (!t)
1045       continue;
1046     n++;
1047 
1048     const NodeId nodeId = t->getRemoteNodeId();
1049     switch(performStates[nodeId]){
1050     case CONNECTED:
1051     case DISCONNECTED:
1052       break;
1053     case CONNECTING:
1054       if(t->isConnected())
1055 	report_connect(nodeId);
1056       break;
1057     case DISCONNECTING:
1058       if(!t->isConnected())
1059 	report_disconnect(nodeId, 0);
1060       break;
1061     }
1062   }
1063 }
1064 
1065 // run as own thread
1066 void
start_clients_thread()1067 TransporterRegistry::start_clients_thread()
1068 {
1069   int persist_mgm_count= 0;
1070   DBUG_ENTER("TransporterRegistry::start_clients_thread");
1071   while (m_run_start_clients_thread) {
1072     NdbSleep_MilliSleep(100);
1073     persist_mgm_count++;
1074     if(persist_mgm_count==50)
1075     {
1076       ndb_mgm_check_connection(m_mgm_handle);
1077       persist_mgm_count= 0;
1078     }
1079     for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
1080       Transporter * t = theTransporters[i];
1081       if (!t)
1082 	continue;
1083       n++;
1084 
1085       const NodeId nodeId = t->getRemoteNodeId();
1086       switch(performStates[nodeId]){
1087       case CONNECTING:
1088 	if(!t->isConnected() && !t->isServer) {
1089 	  bool connected= false;
1090 	  /**
1091 	   * First, we try to connect (if we have a port number).
1092 	   */
1093 	  if (t->get_s_port())
1094 	    connected= t->connect_client();
1095 
1096 	  /**
1097 	   * If dynamic, get the port for connecting from the management server
1098 	   */
1099 	  if( !connected && t->get_s_port() <= 0) {	// Port is dynamic
1100 	    int server_port= 0;
1101 	    struct ndb_mgm_reply mgm_reply;
1102 
1103 	    if(!ndb_mgm_is_connected(m_mgm_handle))
1104 	      ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
1105 
1106 	    if(ndb_mgm_is_connected(m_mgm_handle))
1107 	    {
1108 	      int res=
1109 		ndb_mgm_get_connection_int_parameter(m_mgm_handle,
1110 						     t->getRemoteNodeId(),
1111 						     t->getLocalNodeId(),
1112 						     CFG_CONNECTION_SERVER_PORT,
1113 						     &server_port,
1114 						     &mgm_reply);
1115 	      DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
1116 				 server_port,t->getRemoteNodeId(),
1117 				 t->getLocalNodeId(),res));
1118 	      if( res >= 0 )
1119 	      {
1120 		/**
1121 		 * Server_port == 0 just means that that a mgmt server
1122 		 * has not received a new port yet. Keep the old.
1123 		 */
1124 		if (server_port)
1125 		  t->set_s_port(server_port);
1126 	      }
1127 	      else if(ndb_mgm_is_connected(m_mgm_handle))
1128 	      {
1129 		g_eventLogger.info("Failed to get dynamic port to connect to: %d", res);
1130 		ndb_mgm_disconnect(m_mgm_handle);
1131 	      }
1132 	      else
1133 	      {
1134 		g_eventLogger.info("Management server closed connection early. "
1135 			 "It is probably being shut down (or has problems). "
1136 			 "We will retry the connection. %d %s %s line: %d",
1137                                    ndb_mgm_get_latest_error(m_mgm_handle),
1138                                    ndb_mgm_get_latest_error_desc(m_mgm_handle),
1139                                    ndb_mgm_get_latest_error_msg(m_mgm_handle),
1140                                    ndb_mgm_get_latest_error_line(m_mgm_handle)
1141                                    );
1142 	      }
1143 	    }
1144 	    /** else
1145 	     * We will not be able to get a new port unless
1146 	     * the m_mgm_handle is connected. Note that not
1147 	     * being connected is an ok state, just continue
1148 	     * until it is able to connect. Continue using the
1149 	     * old port until we can connect again and get a
1150 	     * new port.
1151 	     */
1152 	  }
1153 	}
1154 	break;
1155       case DISCONNECTING:
1156 	if(t->isConnected())
1157 	  t->doDisconnect();
1158 	break;
1159       default:
1160 	break;
1161       }
1162     }
1163   }
1164   DBUG_VOID_RETURN;
1165 }
1166 
1167 bool
start_clients()1168 TransporterRegistry::start_clients()
1169 {
1170   m_run_start_clients_thread= true;
1171   m_start_clients_thread= NdbThread_Create(run_start_clients_C,
1172 					   (void**)this,
1173 					   32768,
1174 					   "ndb_start_clients",
1175 					   NDB_THREAD_PRIO_LOW);
1176   if (m_start_clients_thread == 0) {
1177     m_run_start_clients_thread= false;
1178     return false;
1179   }
1180   return true;
1181 }
1182 
1183 bool
stop_clients()1184 TransporterRegistry::stop_clients()
1185 {
1186   if (m_start_clients_thread) {
1187     m_run_start_clients_thread= false;
1188     void* status;
1189     NdbThread_WaitFor(m_start_clients_thread, &status);
1190     NdbThread_Destroy(&m_start_clients_thread);
1191   }
1192   return true;
1193 }
1194 
1195 void
add_transporter_interface(NodeId remoteNodeId,const char * interf,int s_port)1196 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
1197 					       const char *interf,
1198 					       int s_port)
1199 {
1200   DBUG_ENTER("TransporterRegistry::add_transporter_interface");
1201   DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
1202   if (interf && strlen(interf) == 0)
1203     interf= 0;
1204 
1205   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1206   {
1207     Transporter_interface &tmp= m_transporter_interface[i];
1208     if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
1209       continue;
1210     if (interf != 0 && tmp.m_interface != 0 &&
1211 	strcmp(interf, tmp.m_interface) == 0)
1212     {
1213       DBUG_VOID_RETURN; // found match, no need to insert
1214     }
1215     if (interf == 0 && tmp.m_interface == 0)
1216     {
1217       DBUG_VOID_RETURN; // found match, no need to insert
1218     }
1219   }
1220   Transporter_interface t;
1221   t.m_remote_nodeId= remoteNodeId;
1222   t.m_s_service_port= s_port;
1223   t.m_interface= interf;
1224   m_transporter_interface.push_back(t);
1225   DBUG_PRINT("exit",("interface and port added"));
1226   DBUG_VOID_RETURN;
1227 }
1228 
1229 bool
start_service(SocketServer & socket_server)1230 TransporterRegistry::start_service(SocketServer& socket_server)
1231 {
1232   DBUG_ENTER("TransporterRegistry::start_service");
1233   if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
1234   {
1235     g_eventLogger.error("TransporterRegistry::startReceiving: localNodeId not specified");
1236     DBUG_RETURN(false);
1237   }
1238 
1239   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1240   {
1241     Transporter_interface &t= m_transporter_interface[i];
1242 
1243     unsigned short port= (unsigned short)t.m_s_service_port;
1244     if(t.m_s_service_port<0)
1245       port= -t.m_s_service_port; // is a dynamic port
1246     TransporterService *transporter_service =
1247       new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
1248     if(!socket_server.setup(transporter_service,
1249 			    &port, t.m_interface))
1250     {
1251       DBUG_PRINT("info", ("Trying new port"));
1252       port= 0;
1253       if(t.m_s_service_port>0
1254 	 || !socket_server.setup(transporter_service,
1255 				 &port, t.m_interface))
1256       {
1257 	/*
1258 	 * If it wasn't a dynamically allocated port, or
1259 	 * our attempts at getting a new dynamic port failed
1260 	 */
1261 	g_eventLogger.error("Unable to setup transporter service port: %s:%d!\n"
1262 		 "Please check if the port is already used,\n"
1263 		 "(perhaps the node is already running)",
1264 		 t.m_interface ? t.m_interface : "*", t.m_s_service_port);
1265 	delete transporter_service;
1266 	DBUG_RETURN(false);
1267       }
1268     }
1269     t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
1270     DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
1271     transporter_service->setTransporterRegistry(this);
1272   }
1273   DBUG_RETURN(true);
1274 }
1275 
1276 #ifdef NDB_SHM_TRANSPORTER
1277 static
1278 RETSIGTYPE
shm_sig_handler(int signo)1279 shm_sig_handler(int signo)
1280 {
1281   g_shm_counter++;
1282 }
1283 #endif
1284 
1285 void
startReceiving()1286 TransporterRegistry::startReceiving()
1287 {
1288   DBUG_ENTER("TransporterRegistry::startReceiving");
1289 
1290 #ifdef NDB_SHM_TRANSPORTER
1291   m_shm_own_pid = getpid();
1292   if (g_ndb_shm_signum)
1293   {
1294     DBUG_PRINT("info",("Install signal handler for signum %d",
1295 		       g_ndb_shm_signum));
1296     struct sigaction sa;
1297     NdbThread_set_shm_sigmask(FALSE);
1298     sigemptyset(&sa.sa_mask);
1299     sa.sa_handler = shm_sig_handler;
1300     sa.sa_flags = 0;
1301     int ret;
1302     while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
1303     if(ret != 0)
1304     {
1305       DBUG_PRINT("error",("Install failed"));
1306       g_eventLogger.error("Failed to install signal handler for"
1307 			  " SHM transporter, signum %d, errno: %d (%s)",
1308 			  g_ndb_shm_signum, errno, strerror(errno));
1309     }
1310   }
1311 #endif // NDB_SHM_TRANSPORTER
1312   DBUG_VOID_RETURN;
1313 }
1314 
1315 void
stopReceiving()1316 TransporterRegistry::stopReceiving(){
1317   /**
1318    * Disconnect all transporters, this includes detach from remote node
1319    * and since that must be done from the same process that called attach
1320    * it's done here in the receive thread
1321    */
1322   disconnectAll();
1323 }
1324 
1325 void
startSending()1326 TransporterRegistry::startSending(){
1327 }
1328 
1329 void
stopSending()1330 TransporterRegistry::stopSending(){
1331 }
1332 
operator <<(NdbOut & out,SignalHeader & sh)1333 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
1334   out << "-- Signal Header --" << endl;
1335   out << "theLength:    " << sh.theLength << endl;
1336   out << "gsn:          " << sh.theVerId_signalNumber << endl;
1337   out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
1338   out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
1339   out << "sendersSig:   " << sh.theSendersSignalId << endl;
1340   out << "theSignalId:  " << sh.theSignalId << endl;
1341   out << "trace:        " << (int)sh.theTrace << endl;
1342   return out;
1343 }
1344 
1345 Transporter*
get_transporter(NodeId nodeId)1346 TransporterRegistry::get_transporter(NodeId nodeId) {
1347   return theTransporters[nodeId];
1348 }
1349 
connect_client(NdbMgmHandle * h)1350 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
1351 {
1352   DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
1353 
1354   Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
1355 
1356   if(!mgm_nodeid)
1357   {
1358     g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1359     return false;
1360   }
1361   Transporter * t = theTransporters[mgm_nodeid];
1362   if (!t)
1363   {
1364     g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1365     return false;
1366   }
1367   DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
1368 }
1369 
1370 /**
1371  * Given a connected NdbMgmHandle, turns it into a transporter
1372  * and returns the socket.
1373  */
connect_ndb_mgmd(NdbMgmHandle * h)1374 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
1375 {
1376   struct ndb_mgm_reply mgm_reply;
1377 
1378   if ( h==NULL || *h == NULL )
1379   {
1380     g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1381     return NDB_INVALID_SOCKET;
1382   }
1383 
1384   for(unsigned int i=0;i < m_transporter_interface.size();i++)
1385     if (m_transporter_interface[i].m_s_service_port < 0
1386 	&& ndb_mgm_set_connection_int_parameter(*h,
1387 				   get_localNodeId(),
1388 				   m_transporter_interface[i].m_remote_nodeId,
1389 				   CFG_CONNECTION_SERVER_PORT,
1390 				   m_transporter_interface[i].m_s_service_port,
1391 				   &mgm_reply) < 0)
1392     {
1393       g_eventLogger.error("Error: %s: %d",
1394 	       ndb_mgm_get_latest_error_desc(*h),
1395 	       ndb_mgm_get_latest_error(*h));
1396       g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1397       ndb_mgm_destroy_handle(h);
1398       return NDB_INVALID_SOCKET;
1399     }
1400 
1401   /**
1402    * convert_to_transporter also disposes of the handle (i.e. we don't leak
1403    * memory here.
1404    */
1405   NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
1406   if ( sockfd == NDB_INVALID_SOCKET)
1407   {
1408     g_eventLogger.error("Error: %s: %d",
1409 	     ndb_mgm_get_latest_error_desc(*h),
1410 	     ndb_mgm_get_latest_error(*h));
1411     g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1412     ndb_mgm_destroy_handle(h);
1413   }
1414   return sockfd;
1415 }
1416 
1417 /**
1418  * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
1419  * and returns the socket.
1420  */
connect_ndb_mgmd(SocketClient * sc)1421 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
1422 {
1423   NdbMgmHandle h= ndb_mgm_create_handle();
1424 
1425   if ( h == NULL )
1426   {
1427     return NDB_INVALID_SOCKET;
1428   }
1429 
1430   /**
1431    * Set connectstring
1432    */
1433   {
1434     BaseString cs;
1435     cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
1436     ndb_mgm_set_connectstring(h, cs.c_str());
1437   }
1438 
1439   if(ndb_mgm_connect(h, 0, 0, 0)<0)
1440   {
1441     ndb_mgm_destroy_handle(&h);
1442     return NDB_INVALID_SOCKET;
1443   }
1444 
1445   return connect_ndb_mgmd(&h);
1446 }
1447 
1448 template class Vector<TransporterRegistry::Transporter_interface>;
1449