1 /* Copyright (c) 2003-2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #include <ndb_global.h>
18 #include <my_pthread.h>
19
20 #include <TransporterRegistry.hpp>
21 #include "TransporterInternalDefinitions.hpp"
22
23 #include "Transporter.hpp"
24 #include <SocketAuthenticator.hpp>
25
26 #ifdef NDB_TCP_TRANSPORTER
27 #include "TCP_Transporter.hpp"
28 #endif
29
30 #ifdef NDB_SCI_TRANSPORTER
31 #include "SCI_Transporter.hpp"
32 #endif
33
34 #ifdef NDB_SHM_TRANSPORTER
35 #include "SHM_Transporter.hpp"
36 extern int g_ndb_shm_signum;
37 #endif
38
39 #include "TransporterCallback.hpp"
40 #include "NdbOut.hpp"
41 #include <NdbSleep.h>
42 #include <NdbTick.h>
43 #include <InputStream.hpp>
44 #include <OutputStream.hpp>
45
46 #include <mgmapi/mgmapi.h>
47 #include <mgmapi_internal.h>
48 #include <mgmapi/mgmapi_debug.h>
49
50 #include <EventLogger.hpp>
51 extern EventLogger g_eventLogger;
52
53 struct in_addr
get_connect_address(NodeId node_id) const54 TransporterRegistry::get_connect_address(NodeId node_id) const
55 {
56 return theTransporters[node_id]->m_connect_address;
57 }
58
newSession(NDB_SOCKET_TYPE sockfd)59 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
60 {
61 DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
62 if (m_auth && !m_auth->server_authenticate(sockfd)){
63 NDB_CLOSE_SOCKET(sockfd);
64 DBUG_RETURN(0);
65 }
66
67 if (!m_transporter_registry->connect_server(sockfd))
68 {
69 NDB_CLOSE_SOCKET(sockfd);
70 DBUG_RETURN(0);
71 }
72
73 DBUG_RETURN(0);
74 }
75
TransporterRegistry(void * callback,unsigned _maxTransporters,unsigned sizeOfLongSignalMemory)76 TransporterRegistry::TransporterRegistry(void * callback,
77 unsigned _maxTransporters,
78 unsigned sizeOfLongSignalMemory) :
79 m_mgm_handle(0),
80 m_transp_count(0)
81 {
82 DBUG_ENTER("TransporterRegistry::TransporterRegistry");
83
84 nodeIdSpecified = false;
85 maxTransporters = _maxTransporters;
86 sendCounter = 1;
87
88 callbackObj=callback;
89
90 theTCPTransporters = new TCP_Transporter * [maxTransporters];
91 theSCITransporters = new SCI_Transporter * [maxTransporters];
92 theSHMTransporters = new SHM_Transporter * [maxTransporters];
93 theTransporterTypes = new TransporterType [maxTransporters];
94 theTransporters = new Transporter * [maxTransporters];
95 performStates = new PerformState [maxTransporters];
96 ioStates = new IOState [maxTransporters];
97
98 // Initialize member variables
99 nTransporters = 0;
100 nTCPTransporters = 0;
101 nSCITransporters = 0;
102 nSHMTransporters = 0;
103
104 // Initialize the transporter arrays
105 for (unsigned i=0; i<maxTransporters; i++) {
106 theTCPTransporters[i] = NULL;
107 theSCITransporters[i] = NULL;
108 theSHMTransporters[i] = NULL;
109 theTransporters[i] = NULL;
110 performStates[i] = DISCONNECTED;
111 ioStates[i] = NoHalt;
112 }
113
114 DBUG_VOID_RETURN;
115 }
116
set_mgm_handle(NdbMgmHandle h)117 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
118 {
119 DBUG_ENTER("TransporterRegistry::set_mgm_handle");
120 if (m_mgm_handle)
121 ndb_mgm_destroy_handle(&m_mgm_handle);
122 m_mgm_handle= h;
123 ndb_mgm_set_timeout(m_mgm_handle, 5000);
124 #ifndef DBUG_OFF
125 if (h)
126 {
127 char buf[256];
128 DBUG_PRINT("info",("handle set with connectstring: %s",
129 ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
130 }
131 else
132 {
133 DBUG_PRINT("info",("handle set to NULL"));
134 }
135 #endif
136 DBUG_VOID_RETURN;
137 }
138
~TransporterRegistry()139 TransporterRegistry::~TransporterRegistry()
140 {
141 DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
142
143 removeAll();
144
145 delete[] theTCPTransporters;
146 delete[] theSCITransporters;
147 delete[] theSHMTransporters;
148 delete[] theTransporterTypes;
149 delete[] theTransporters;
150 delete[] performStates;
151 delete[] ioStates;
152
153 if (m_mgm_handle)
154 ndb_mgm_destroy_handle(&m_mgm_handle);
155
156 DBUG_VOID_RETURN;
157 }
158
159 void
removeAll()160 TransporterRegistry::removeAll(){
161 for(unsigned i = 0; i<maxTransporters; i++){
162 if(theTransporters[i] != NULL)
163 removeTransporter(theTransporters[i]->getRemoteNodeId());
164 }
165 }
166
167 void
disconnectAll()168 TransporterRegistry::disconnectAll(){
169 for(unsigned i = 0; i<maxTransporters; i++){
170 if(theTransporters[i] != NULL)
171 theTransporters[i]->doDisconnect();
172 }
173 }
174
175 bool
init(NodeId nodeId)176 TransporterRegistry::init(NodeId nodeId) {
177 DBUG_ENTER("TransporterRegistry::init");
178 nodeIdSpecified = true;
179 localNodeId = nodeId;
180
181 DEBUG("TransporterRegistry started node: " << localNodeId);
182
183 DBUG_RETURN(true);
184 }
185
186 bool
connect_server(NDB_SOCKET_TYPE sockfd)187 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
188 {
189 DBUG_ENTER("TransporterRegistry::connect_server");
190
191 // read node id from client
192 // read transporter type
193 int nodeId, remote_transporter_type= -1;
194 SocketInputStream s_input(sockfd);
195 char buf[256];
196 if (s_input.gets(buf, 256) == 0) {
197 DBUG_PRINT("error", ("Could not get node id from client"));
198 DBUG_RETURN(false);
199 }
200 int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
201 switch (r) {
202 case 2:
203 break;
204 case 1:
205 // we're running version prior to 4.1.9
206 // ok, but with no checks on transporter configuration compatability
207 break;
208 default:
209 DBUG_PRINT("error", ("Error in node id from client"));
210 DBUG_RETURN(false);
211 }
212
213 DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
214 nodeId,remote_transporter_type));
215
216 //check that nodeid is valid and that there is an allocated transporter
217 if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
218 DBUG_PRINT("error", ("Node id out of range from client"));
219 DBUG_RETURN(false);
220 }
221 if (theTransporters[nodeId] == 0) {
222 DBUG_PRINT("error", ("No transporter for this node id from client"));
223 DBUG_RETURN(false);
224 }
225
226 //check that the transporter should be connected
227 if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
228 DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
229 DBUG_RETURN(false);
230 }
231
232 Transporter *t= theTransporters[nodeId];
233
234 // send info about own id (just as response to acknowledge connection)
235 // send info on own transporter type
236 SocketOutputStream s_output(sockfd);
237 s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
238
239 if (remote_transporter_type != -1)
240 {
241 if (remote_transporter_type != t->m_type)
242 {
243 DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
244 t->m_type, remote_transporter_type));
245 g_eventLogger.error("Incompatible configuration: Transporter type "
246 "mismatch with node %d", nodeId);
247
248 // wait for socket close for 1 second to let message arrive at client
249 {
250 fd_set a_set;
251 FD_ZERO(&a_set);
252 FD_SET(sockfd, &a_set);
253 struct timeval timeout;
254 timeout.tv_sec = 1; timeout.tv_usec = 0;
255 select(sockfd+1, &a_set, 0, 0, &timeout);
256 }
257 DBUG_RETURN(false);
258 }
259 }
260 else if (t->m_type == tt_SHM_TRANSPORTER)
261 {
262 g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
263 }
264
265 // setup transporter (transporter responsible for closing sockfd)
266 t->connect_server(sockfd);
267
268 DBUG_RETURN(true);
269 }
270
271 bool
createTCPTransporter(TransporterConfiguration * config)272 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
273 #ifdef NDB_TCP_TRANSPORTER
274
275 if(!nodeIdSpecified){
276 init(config->localNodeId);
277 }
278
279 if(config->localNodeId != localNodeId)
280 return false;
281
282 if(theTransporters[config->remoteNodeId] != NULL)
283 return false;
284
285 TCP_Transporter * t = new TCP_Transporter(*this,
286 config->tcp.sendBufferSize,
287 config->tcp.maxReceiveSize,
288 config->localHostName,
289 config->remoteHostName,
290 config->s_port,
291 config->isMgmConnection,
292 localNodeId,
293 config->remoteNodeId,
294 config->serverNodeId,
295 config->checksum,
296 config->signalId);
297 if (t == NULL)
298 return false;
299 else if (!t->initTransporter()) {
300 delete t;
301 return false;
302 }
303
304 // Put the transporter in the transporter arrays
305 theTCPTransporters[nTCPTransporters] = t;
306 theTransporters[t->getRemoteNodeId()] = t;
307 theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
308 performStates[t->getRemoteNodeId()] = DISCONNECTED;
309 nTransporters++;
310 nTCPTransporters++;
311
312 return true;
313 #else
314 return false;
315 #endif
316 }
317
318 bool
createSCITransporter(TransporterConfiguration * config)319 TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
320 #ifdef NDB_SCI_TRANSPORTER
321
322 if(!SCI_Transporter::initSCI())
323 abort();
324
325 if(!nodeIdSpecified){
326 init(config->localNodeId);
327 }
328
329 if(config->localNodeId != localNodeId)
330 return false;
331
332 if(theTransporters[config->remoteNodeId] != NULL)
333 return false;
334
335 SCI_Transporter * t = new SCI_Transporter(*this,
336 config->localHostName,
337 config->remoteHostName,
338 config->s_port,
339 config->isMgmConnection,
340 config->sci.sendLimit,
341 config->sci.bufferSize,
342 config->sci.nLocalAdapters,
343 config->sci.remoteSciNodeId0,
344 config->sci.remoteSciNodeId1,
345 localNodeId,
346 config->remoteNodeId,
347 config->serverNodeId,
348 config->checksum,
349 config->signalId);
350
351 if (t == NULL)
352 return false;
353 else if (!t->initTransporter()) {
354 delete t;
355 return false;
356 }
357 // Put the transporter in the transporter arrays
358 theSCITransporters[nSCITransporters] = t;
359 theTransporters[t->getRemoteNodeId()] = t;
360 theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
361 performStates[t->getRemoteNodeId()] = DISCONNECTED;
362 nTransporters++;
363 nSCITransporters++;
364
365 return true;
366 #else
367 return false;
368 #endif
369 }
370
371 bool
createSHMTransporter(TransporterConfiguration * config)372 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
373 DBUG_ENTER("TransporterRegistry::createTransporter SHM");
374 #ifdef NDB_SHM_TRANSPORTER
375 if(!nodeIdSpecified){
376 init(config->localNodeId);
377 }
378
379 if(config->localNodeId != localNodeId)
380 return false;
381
382 if (!g_ndb_shm_signum) {
383 g_ndb_shm_signum= config->shm.signum;
384 DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
385 /**
386 * Make sure to block g_ndb_shm_signum
387 * TransporterRegistry::init is run from "main" thread
388 */
389 NdbThread_set_shm_sigmask(TRUE);
390 }
391
392 if(config->shm.signum != g_ndb_shm_signum)
393 return false;
394
395 if(theTransporters[config->remoteNodeId] != NULL)
396 return false;
397
398 SHM_Transporter * t = new SHM_Transporter(*this,
399 config->localHostName,
400 config->remoteHostName,
401 config->s_port,
402 config->isMgmConnection,
403 localNodeId,
404 config->remoteNodeId,
405 config->serverNodeId,
406 config->checksum,
407 config->signalId,
408 config->shm.shmKey,
409 config->shm.shmSize
410 );
411 if (t == NULL)
412 return false;
413 else if (!t->initTransporter()) {
414 delete t;
415 return false;
416 }
417 // Put the transporter in the transporter arrays
418 theSHMTransporters[nSHMTransporters] = t;
419 theTransporters[t->getRemoteNodeId()] = t;
420 theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
421 performStates[t->getRemoteNodeId()] = DISCONNECTED;
422
423 nTransporters++;
424 nSHMTransporters++;
425
426 DBUG_RETURN(true);
427 #else
428 DBUG_RETURN(false);
429 #endif
430 }
431
432
433 void
removeTransporter(NodeId nodeId)434 TransporterRegistry::removeTransporter(NodeId nodeId) {
435
436 DEBUG("Removing transporter from " << localNodeId
437 << " to " << nodeId);
438
439 if(theTransporters[nodeId] == NULL)
440 return;
441
442 theTransporters[nodeId]->doDisconnect();
443
444 const TransporterType type = theTransporterTypes[nodeId];
445
446 int ind = 0;
447 switch(type){
448 case tt_TCP_TRANSPORTER:
449 #ifdef NDB_TCP_TRANSPORTER
450 for(; ind < nTCPTransporters; ind++)
451 if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
452 break;
453 ind++;
454 for(; ind<nTCPTransporters; ind++)
455 theTCPTransporters[ind-1] = theTCPTransporters[ind];
456 nTCPTransporters --;
457 #endif
458 break;
459 case tt_SCI_TRANSPORTER:
460 #ifdef NDB_SCI_TRANSPORTER
461 for(; ind < nSCITransporters; ind++)
462 if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
463 break;
464 ind++;
465 for(; ind<nSCITransporters; ind++)
466 theSCITransporters[ind-1] = theSCITransporters[ind];
467 nSCITransporters --;
468 #endif
469 break;
470 case tt_SHM_TRANSPORTER:
471 #ifdef NDB_SHM_TRANSPORTER
472 for(; ind < nSHMTransporters; ind++)
473 if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
474 break;
475 ind++;
476 for(; ind<nSHMTransporters; ind++)
477 theSHMTransporters[ind-1] = theSHMTransporters[ind];
478 nSHMTransporters --;
479 #endif
480 break;
481 }
482
483 nTransporters--;
484
485 // Delete the transporter and remove it from theTransporters array
486 delete theTransporters[nodeId];
487 theTransporters[nodeId] = NULL;
488 }
489
490 Uint32
get_free_buffer(Uint32 node) const491 TransporterRegistry::get_free_buffer(Uint32 node) const
492 {
493 Transporter *t;
494 if(likely((t = theTransporters[node]) != 0))
495 {
496 return t->get_free_buffer();
497 }
498 return 0;
499 }
500
501
502 SendStatus
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const LinearSectionPtr ptr[3])503 TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
504 Uint8 prio,
505 const Uint32 * const signalData,
506 NodeId nodeId,
507 const LinearSectionPtr ptr[3]){
508
509
510 Transporter *t = theTransporters[nodeId];
511 if(t != NULL &&
512 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
513 ((signalHeader->theReceiversBlockNumber == 252) ||
514 (signalHeader->theReceiversBlockNumber == 4002)))) {
515
516 if(t->isConnected()){
517 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
518 if(lenBytes <= MAX_MESSAGE_SIZE){
519 Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
520 if(insertPtr != 0){
521 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
522 t->updateWritePtr(lenBytes, prio);
523 return SEND_OK;
524 }
525
526 int sleepTime = 2;
527
528 /**
529 * @note: on linux/i386 the granularity is 10ms
530 * so sleepTime = 2 generates a 10 ms sleep.
531 */
532 for(int i = 0; i<50; i++){
533 if((nSHMTransporters+nSCITransporters) == 0)
534 NdbSleep_MilliSleep(sleepTime);
535 insertPtr = t->getWritePtr(lenBytes, prio);
536 if(insertPtr != 0){
537 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
538 t->updateWritePtr(lenBytes, prio);
539 break;
540 }
541 }
542
543 if(insertPtr != 0){
544 /**
545 * Send buffer full, but resend works
546 */
547 reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
548 return SEND_OK;
549 }
550
551 WARNING("Signal to " << nodeId << " lost(buffer)");
552 reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
553 return SEND_BUFFER_FULL;
554 } else {
555 return SEND_MESSAGE_TOO_BIG;
556 }
557 } else {
558 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
559 return SEND_DISCONNECTED;
560 }
561 } else {
562 DEBUG("Discarding message to block: "
563 << signalHeader->theReceiversBlockNumber
564 << " node: " << nodeId);
565
566 if(t == NULL)
567 return SEND_UNKNOWN_NODE;
568
569 return SEND_BLOCKED;
570 }
571 }
572
573 SendStatus
prepareSend(const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3])574 TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
575 Uint8 prio,
576 const Uint32 * const signalData,
577 NodeId nodeId,
578 class SectionSegmentPool & thePool,
579 const SegmentedSectionPtr ptr[3]){
580
581
582 Transporter *t = theTransporters[nodeId];
583 if(t != NULL &&
584 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
585 ((signalHeader->theReceiversBlockNumber == 252)||
586 (signalHeader->theReceiversBlockNumber == 4002)))) {
587
588 if(t->isConnected()){
589 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
590 if(lenBytes <= MAX_MESSAGE_SIZE){
591 Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
592 if(insertPtr != 0){
593 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
594 t->updateWritePtr(lenBytes, prio);
595 return SEND_OK;
596 }
597
598
599 /**
600 * @note: on linux/i386 the granularity is 10ms
601 * so sleepTime = 2 generates a 10 ms sleep.
602 */
603 int sleepTime = 2;
604 for(int i = 0; i<50; i++){
605 if((nSHMTransporters+nSCITransporters) == 0)
606 NdbSleep_MilliSleep(sleepTime);
607 insertPtr = t->getWritePtr(lenBytes, prio);
608 if(insertPtr != 0){
609 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
610 t->updateWritePtr(lenBytes, prio);
611 break;
612 }
613 }
614
615 if(insertPtr != 0){
616 /**
617 * Send buffer full, but resend works
618 */
619 reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
620 return SEND_OK;
621 }
622
623 WARNING("Signal to " << nodeId << " lost(buffer)");
624 reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
625 return SEND_BUFFER_FULL;
626 } else {
627 return SEND_MESSAGE_TOO_BIG;
628 }
629 } else {
630 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
631 return SEND_DISCONNECTED;
632 }
633 } else {
634 DEBUG("Discarding message to block: "
635 << signalHeader->theReceiversBlockNumber
636 << " node: " << nodeId);
637
638 if(t == NULL)
639 return SEND_UNKNOWN_NODE;
640
641 return SEND_BLOCKED;
642 }
643 }
644
645 void
external_IO(Uint32 timeOutMillis)646 TransporterRegistry::external_IO(Uint32 timeOutMillis) {
647 //-----------------------------------------------------------
648 // Most of the time we will send the buffers here and then wait
649 // for new signals. Thus we start by sending without timeout
650 // followed by the receive part where we expect to sleep for
651 // a while.
652 //-----------------------------------------------------------
653 if(pollReceive(timeOutMillis)){
654 performReceive();
655 }
656 performSend();
657 }
658
659 Uint32
pollReceive(Uint32 timeOutMillis)660 TransporterRegistry::pollReceive(Uint32 timeOutMillis){
661 Uint32 retVal = 0;
662
663 if((nSCITransporters) > 0)
664 {
665 timeOutMillis=0;
666 }
667
668 #ifdef NDB_SHM_TRANSPORTER
669 if(nSHMTransporters > 0)
670 {
671 Uint32 res = poll_SHM(0);
672 if(res)
673 {
674 retVal |= res;
675 timeOutMillis = 0;
676 }
677 }
678 #endif
679
680 #ifdef NDB_TCP_TRANSPORTER
681 if(nTCPTransporters > 0 || retVal == 0)
682 {
683 retVal |= poll_TCP(timeOutMillis);
684 }
685 else
686 tcpReadSelectReply = 0;
687 #endif
688 #ifdef NDB_SCI_TRANSPORTER
689 if(nSCITransporters > 0)
690 retVal |= poll_SCI(timeOutMillis);
691 #endif
692 #ifdef NDB_SHM_TRANSPORTER
693 if(nSHMTransporters > 0 && retVal == 0)
694 {
695 int res = poll_SHM(0);
696 retVal |= res;
697 }
698 #endif
699 return retVal;
700 }
701
702
703 #ifdef NDB_SCI_TRANSPORTER
704 Uint32
poll_SCI(Uint32 timeOutMillis)705 TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
706 {
707 for (int i=0; i<nSCITransporters; i++) {
708 SCI_Transporter * t = theSCITransporters[i];
709 if (t->isConnected()) {
710 if(t->hasDataToRead())
711 return 1;
712 }
713 }
714 return 0;
715 }
716 #endif
717
718
719 #ifdef NDB_SHM_TRANSPORTER
720 static int g_shm_counter = 0;
721 Uint32
poll_SHM(Uint32 timeOutMillis)722 TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
723 {
724 for(int j=0; j < 100; j++)
725 {
726 for (int i=0; i<nSHMTransporters; i++) {
727 SHM_Transporter * t = theSHMTransporters[i];
728 if (t->isConnected()) {
729 if(t->hasDataToRead()) {
730 return 1;
731 }
732 }
733 }
734 }
735 return 0;
736 }
737 #endif
738
739 #ifdef NDB_TCP_TRANSPORTER
740 Uint32
poll_TCP(Uint32 timeOutMillis)741 TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
742 {
743 bool hasdata = false;
744 if (false && nTCPTransporters == 0)
745 {
746 tcpReadSelectReply = 0;
747 return 0;
748 }
749
750 NDB_SOCKET_TYPE maxSocketValue = -1;
751
752 // Needed for TCP/IP connections
753 // The read- and writeset are used by select
754
755 FD_ZERO(&tcpReadset);
756
757 // Prepare for sending and receiving
758 for (int i = 0; i < nTCPTransporters; i++) {
759 TCP_Transporter * t = theTCPTransporters[i];
760
761 // If the transporter is connected
762 NodeId nodeId = t->getRemoteNodeId();
763 if (is_connected(nodeId) && t->isConnected()) {
764
765 const NDB_SOCKET_TYPE socket = t->getSocket();
766 // Find the highest socket value. It will be used by select
767 if (socket > maxSocketValue)
768 maxSocketValue = socket;
769
770 // Put the connected transporters in the socket read-set
771 FD_SET(socket, &tcpReadset);
772 }
773 hasdata |= t->hasReceiveData();
774 }
775
776 timeOutMillis = hasdata ? 0 : timeOutMillis;
777
778 struct timeval timeout;
779 timeout.tv_sec = timeOutMillis / 1000;
780 timeout.tv_usec = (timeOutMillis % 1000) * 1000;
781
782 // The highest socket value plus one
783 maxSocketValue++;
784
785 tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
786 if(false && tcpReadSelectReply == -1 && errno == EINTR)
787 g_eventLogger.info("woke-up by signal");
788
789 #ifdef NDB_WIN32
790 if(tcpReadSelectReply == SOCKET_ERROR)
791 {
792 NdbSleep_MilliSleep(timeOutMillis);
793 }
794 #endif
795
796 return tcpReadSelectReply || hasdata;
797 }
798 #endif
799
800
801 void
performReceive()802 TransporterRegistry::performReceive()
803 {
804 #ifdef NDB_TCP_TRANSPORTER
805 for (int i=0; i<nTCPTransporters; i++)
806 {
807 checkJobBuffer();
808 TCP_Transporter *t = theTCPTransporters[i];
809 const NodeId nodeId = t->getRemoteNodeId();
810 const NDB_SOCKET_TYPE socket = t->getSocket();
811 if(is_connected(nodeId)){
812 if(t->isConnected())
813 {
814 if (FD_ISSET(socket, &tcpReadset))
815 {
816 t->doReceive();
817 }
818
819 if (t->hasReceiveData())
820 {
821 Uint32 * ptr;
822 Uint32 sz = t->getReceiveData(&ptr);
823 transporter_recv_from(callbackObj, nodeId);
824 Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
825 t->updateReceiveDataPtr(szUsed);
826 }
827 }
828 }
829 }
830 #endif
831
832 #ifdef NDB_SCI_TRANSPORTER
833 //performReceive
834 //do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
835 for (int i=0; i<nSCITransporters; i++)
836 {
837 checkJobBuffer();
838 SCI_Transporter *t = theSCITransporters[i];
839 const NodeId nodeId = t->getRemoteNodeId();
840 if(is_connected(nodeId))
841 {
842 if(t->isConnected() && t->checkConnected())
843 {
844 Uint32 * readPtr, * eodPtr;
845 t->getReceivePtr(&readPtr, &eodPtr);
846 transporter_recv_from(callbackObj, nodeId);
847 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
848 t->updateReceivePtr(newPtr);
849 }
850 }
851 }
852 #endif
853 #ifdef NDB_SHM_TRANSPORTER
854 for (int i=0; i<nSHMTransporters; i++)
855 {
856 checkJobBuffer();
857 SHM_Transporter *t = theSHMTransporters[i];
858 const NodeId nodeId = t->getRemoteNodeId();
859 if(is_connected(nodeId)){
860 if(t->isConnected() && t->checkConnected())
861 {
862 Uint32 * readPtr, * eodPtr;
863 t->getReceivePtr(&readPtr, &eodPtr);
864 transporter_recv_from(callbackObj, nodeId);
865 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
866 t->updateReceivePtr(newPtr);
867 }
868 }
869 }
870 #endif
871 }
872
873 void
performSend()874 TransporterRegistry::performSend()
875 {
876 int i;
877 sendCounter = 1;
878
879 #ifdef NDB_TCP_TRANSPORTER
880 for (i = m_transp_count; i < nTCPTransporters; i++)
881 {
882 TCP_Transporter *t = theTCPTransporters[i];
883 if (t && t->hasDataToSend() && t->isConnected() &&
884 is_connected(t->getRemoteNodeId()))
885 {
886 t->doSend();
887 }
888 }
889 for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
890 {
891 TCP_Transporter *t = theTCPTransporters[i];
892 if (t && t->hasDataToSend() && t->isConnected() &&
893 is_connected(t->getRemoteNodeId()))
894 {
895 t->doSend();
896 }
897 }
898 m_transp_count++;
899 if (m_transp_count == nTCPTransporters) m_transp_count = 0;
900 #endif
901 #ifdef NDB_SCI_TRANSPORTER
902 //scroll through the SCI transporters,
903 // get each transporter, check if connected, send data
904 for (i=0; i<nSCITransporters; i++) {
905 SCI_Transporter *t = theSCITransporters[i];
906 const NodeId nodeId = t->getRemoteNodeId();
907
908 if(is_connected(nodeId))
909 {
910 if(t->isConnected() && t->hasDataToSend()) {
911 t->doSend();
912 } //if
913 } //if
914 }
915 #endif
916
917 #ifdef NDB_SHM_TRANSPORTER
918 for (i=0; i<nSHMTransporters; i++)
919 {
920 SHM_Transporter *t = theSHMTransporters[i];
921 const NodeId nodeId = t->getRemoteNodeId();
922 if(is_connected(nodeId))
923 {
924 if(t->isConnected())
925 {
926 t->doSend();
927 }
928 }
929 }
930 #endif
931 }
932
933 int
forceSendCheck(int sendLimit)934 TransporterRegistry::forceSendCheck(int sendLimit){
935 int tSendCounter = sendCounter;
936 sendCounter = tSendCounter + 1;
937 if (tSendCounter >= sendLimit) {
938 performSend();
939 sendCounter = 1;
940 return 1;
941 }//if
942 return 0;
943 }//TransporterRegistry::forceSendCheck()
944
945 #ifdef DEBUG_TRANSPORTER
946 void
printState()947 TransporterRegistry::printState(){
948 ndbout << "-- TransporterRegistry -- " << endl << endl
949 << "Transporters = " << nTransporters << endl;
950 for(int i = 0; i<maxTransporters; i++)
951 if(theTransporters[i] != NULL){
952 const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
953 ndbout << "Transporter: " << remoteNodeId
954 << " PerformState: " << performStates[remoteNodeId]
955 << " IOState: " << ioStates[remoteNodeId] << endl;
956 }
957 }
958 #endif
959
960 IOState
ioState(NodeId nodeId)961 TransporterRegistry::ioState(NodeId nodeId) {
962 return ioStates[nodeId];
963 }
964
965 void
setIOState(NodeId nodeId,IOState state)966 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
967 DEBUG("TransporterRegistry::setIOState("
968 << nodeId << ", " << state << ")");
969 ioStates[nodeId] = state;
970 }
971
972 static void *
run_start_clients_C(void * me)973 run_start_clients_C(void * me)
974 {
975 ((TransporterRegistry*) me)->start_clients_thread();
976 return 0;
977 }
978
979 // Run by kernel thread
980 void
do_connect(NodeId node_id)981 TransporterRegistry::do_connect(NodeId node_id)
982 {
983 PerformState &curr_state = performStates[node_id];
984 switch(curr_state){
985 case DISCONNECTED:
986 break;
987 case CONNECTED:
988 return;
989 case CONNECTING:
990 return;
991 case DISCONNECTING:
992 break;
993 }
994 DBUG_ENTER("TransporterRegistry::do_connect");
995 DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
996 curr_state= CONNECTING;
997 DBUG_VOID_RETURN;
998 }
999 void
do_disconnect(NodeId node_id)1000 TransporterRegistry::do_disconnect(NodeId node_id)
1001 {
1002 PerformState &curr_state = performStates[node_id];
1003 switch(curr_state){
1004 case DISCONNECTED:
1005 return;
1006 case CONNECTED:
1007 break;
1008 case CONNECTING:
1009 break;
1010 case DISCONNECTING:
1011 return;
1012 }
1013 DBUG_ENTER("TransporterRegistry::do_disconnect");
1014 DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1015 curr_state= DISCONNECTING;
1016 DBUG_VOID_RETURN;
1017 }
1018
1019 void
report_connect(NodeId node_id)1020 TransporterRegistry::report_connect(NodeId node_id)
1021 {
1022 DBUG_ENTER("TransporterRegistry::report_connect");
1023 DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1024 performStates[node_id] = CONNECTED;
1025 reportConnect(callbackObj, node_id);
1026 DBUG_VOID_RETURN;
1027 }
1028
1029 void
report_disconnect(NodeId node_id,int errnum)1030 TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
1031 {
1032 DBUG_ENTER("TransporterRegistry::report_disconnect");
1033 DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1034 performStates[node_id] = DISCONNECTED;
1035 reportDisconnect(callbackObj, node_id, errnum);
1036 DBUG_VOID_RETURN;
1037 }
1038
1039 void
update_connections()1040 TransporterRegistry::update_connections()
1041 {
1042 for (int i= 0, n= 0; n < nTransporters; i++){
1043 Transporter * t = theTransporters[i];
1044 if (!t)
1045 continue;
1046 n++;
1047
1048 const NodeId nodeId = t->getRemoteNodeId();
1049 switch(performStates[nodeId]){
1050 case CONNECTED:
1051 case DISCONNECTED:
1052 break;
1053 case CONNECTING:
1054 if(t->isConnected())
1055 report_connect(nodeId);
1056 break;
1057 case DISCONNECTING:
1058 if(!t->isConnected())
1059 report_disconnect(nodeId, 0);
1060 break;
1061 }
1062 }
1063 }
1064
1065 // run as own thread
1066 void
start_clients_thread()1067 TransporterRegistry::start_clients_thread()
1068 {
1069 int persist_mgm_count= 0;
1070 DBUG_ENTER("TransporterRegistry::start_clients_thread");
1071 while (m_run_start_clients_thread) {
1072 NdbSleep_MilliSleep(100);
1073 persist_mgm_count++;
1074 if(persist_mgm_count==50)
1075 {
1076 ndb_mgm_check_connection(m_mgm_handle);
1077 persist_mgm_count= 0;
1078 }
1079 for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
1080 Transporter * t = theTransporters[i];
1081 if (!t)
1082 continue;
1083 n++;
1084
1085 const NodeId nodeId = t->getRemoteNodeId();
1086 switch(performStates[nodeId]){
1087 case CONNECTING:
1088 if(!t->isConnected() && !t->isServer) {
1089 bool connected= false;
1090 /**
1091 * First, we try to connect (if we have a port number).
1092 */
1093 if (t->get_s_port())
1094 connected= t->connect_client();
1095
1096 /**
1097 * If dynamic, get the port for connecting from the management server
1098 */
1099 if( !connected && t->get_s_port() <= 0) { // Port is dynamic
1100 int server_port= 0;
1101 struct ndb_mgm_reply mgm_reply;
1102
1103 if(!ndb_mgm_is_connected(m_mgm_handle))
1104 ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
1105
1106 if(ndb_mgm_is_connected(m_mgm_handle))
1107 {
1108 int res=
1109 ndb_mgm_get_connection_int_parameter(m_mgm_handle,
1110 t->getRemoteNodeId(),
1111 t->getLocalNodeId(),
1112 CFG_CONNECTION_SERVER_PORT,
1113 &server_port,
1114 &mgm_reply);
1115 DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
1116 server_port,t->getRemoteNodeId(),
1117 t->getLocalNodeId(),res));
1118 if( res >= 0 )
1119 {
1120 /**
1121 * Server_port == 0 just means that that a mgmt server
1122 * has not received a new port yet. Keep the old.
1123 */
1124 if (server_port)
1125 t->set_s_port(server_port);
1126 }
1127 else if(ndb_mgm_is_connected(m_mgm_handle))
1128 {
1129 g_eventLogger.info("Failed to get dynamic port to connect to: %d", res);
1130 ndb_mgm_disconnect(m_mgm_handle);
1131 }
1132 else
1133 {
1134 g_eventLogger.info("Management server closed connection early. "
1135 "It is probably being shut down (or has problems). "
1136 "We will retry the connection. %d %s %s line: %d",
1137 ndb_mgm_get_latest_error(m_mgm_handle),
1138 ndb_mgm_get_latest_error_desc(m_mgm_handle),
1139 ndb_mgm_get_latest_error_msg(m_mgm_handle),
1140 ndb_mgm_get_latest_error_line(m_mgm_handle)
1141 );
1142 }
1143 }
1144 /** else
1145 * We will not be able to get a new port unless
1146 * the m_mgm_handle is connected. Note that not
1147 * being connected is an ok state, just continue
1148 * until it is able to connect. Continue using the
1149 * old port until we can connect again and get a
1150 * new port.
1151 */
1152 }
1153 }
1154 break;
1155 case DISCONNECTING:
1156 if(t->isConnected())
1157 t->doDisconnect();
1158 break;
1159 default:
1160 break;
1161 }
1162 }
1163 }
1164 DBUG_VOID_RETURN;
1165 }
1166
1167 bool
start_clients()1168 TransporterRegistry::start_clients()
1169 {
1170 m_run_start_clients_thread= true;
1171 m_start_clients_thread= NdbThread_Create(run_start_clients_C,
1172 (void**)this,
1173 32768,
1174 "ndb_start_clients",
1175 NDB_THREAD_PRIO_LOW);
1176 if (m_start_clients_thread == 0) {
1177 m_run_start_clients_thread= false;
1178 return false;
1179 }
1180 return true;
1181 }
1182
1183 bool
stop_clients()1184 TransporterRegistry::stop_clients()
1185 {
1186 if (m_start_clients_thread) {
1187 m_run_start_clients_thread= false;
1188 void* status;
1189 NdbThread_WaitFor(m_start_clients_thread, &status);
1190 NdbThread_Destroy(&m_start_clients_thread);
1191 }
1192 return true;
1193 }
1194
1195 void
add_transporter_interface(NodeId remoteNodeId,const char * interf,int s_port)1196 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
1197 const char *interf,
1198 int s_port)
1199 {
1200 DBUG_ENTER("TransporterRegistry::add_transporter_interface");
1201 DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
1202 if (interf && strlen(interf) == 0)
1203 interf= 0;
1204
1205 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1206 {
1207 Transporter_interface &tmp= m_transporter_interface[i];
1208 if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
1209 continue;
1210 if (interf != 0 && tmp.m_interface != 0 &&
1211 strcmp(interf, tmp.m_interface) == 0)
1212 {
1213 DBUG_VOID_RETURN; // found match, no need to insert
1214 }
1215 if (interf == 0 && tmp.m_interface == 0)
1216 {
1217 DBUG_VOID_RETURN; // found match, no need to insert
1218 }
1219 }
1220 Transporter_interface t;
1221 t.m_remote_nodeId= remoteNodeId;
1222 t.m_s_service_port= s_port;
1223 t.m_interface= interf;
1224 m_transporter_interface.push_back(t);
1225 DBUG_PRINT("exit",("interface and port added"));
1226 DBUG_VOID_RETURN;
1227 }
1228
1229 bool
start_service(SocketServer & socket_server)1230 TransporterRegistry::start_service(SocketServer& socket_server)
1231 {
1232 DBUG_ENTER("TransporterRegistry::start_service");
1233 if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
1234 {
1235 g_eventLogger.error("TransporterRegistry::startReceiving: localNodeId not specified");
1236 DBUG_RETURN(false);
1237 }
1238
1239 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1240 {
1241 Transporter_interface &t= m_transporter_interface[i];
1242
1243 unsigned short port= (unsigned short)t.m_s_service_port;
1244 if(t.m_s_service_port<0)
1245 port= -t.m_s_service_port; // is a dynamic port
1246 TransporterService *transporter_service =
1247 new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
1248 if(!socket_server.setup(transporter_service,
1249 &port, t.m_interface))
1250 {
1251 DBUG_PRINT("info", ("Trying new port"));
1252 port= 0;
1253 if(t.m_s_service_port>0
1254 || !socket_server.setup(transporter_service,
1255 &port, t.m_interface))
1256 {
1257 /*
1258 * If it wasn't a dynamically allocated port, or
1259 * our attempts at getting a new dynamic port failed
1260 */
1261 g_eventLogger.error("Unable to setup transporter service port: %s:%d!\n"
1262 "Please check if the port is already used,\n"
1263 "(perhaps the node is already running)",
1264 t.m_interface ? t.m_interface : "*", t.m_s_service_port);
1265 delete transporter_service;
1266 DBUG_RETURN(false);
1267 }
1268 }
1269 t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
1270 DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
1271 transporter_service->setTransporterRegistry(this);
1272 }
1273 DBUG_RETURN(true);
1274 }
1275
1276 #ifdef NDB_SHM_TRANSPORTER
1277 static
1278 RETSIGTYPE
shm_sig_handler(int signo)1279 shm_sig_handler(int signo)
1280 {
1281 g_shm_counter++;
1282 }
1283 #endif
1284
1285 void
startReceiving()1286 TransporterRegistry::startReceiving()
1287 {
1288 DBUG_ENTER("TransporterRegistry::startReceiving");
1289
1290 #ifdef NDB_SHM_TRANSPORTER
1291 m_shm_own_pid = getpid();
1292 if (g_ndb_shm_signum)
1293 {
1294 DBUG_PRINT("info",("Install signal handler for signum %d",
1295 g_ndb_shm_signum));
1296 struct sigaction sa;
1297 NdbThread_set_shm_sigmask(FALSE);
1298 sigemptyset(&sa.sa_mask);
1299 sa.sa_handler = shm_sig_handler;
1300 sa.sa_flags = 0;
1301 int ret;
1302 while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
1303 if(ret != 0)
1304 {
1305 DBUG_PRINT("error",("Install failed"));
1306 g_eventLogger.error("Failed to install signal handler for"
1307 " SHM transporter, signum %d, errno: %d (%s)",
1308 g_ndb_shm_signum, errno, strerror(errno));
1309 }
1310 }
1311 #endif // NDB_SHM_TRANSPORTER
1312 DBUG_VOID_RETURN;
1313 }
1314
1315 void
stopReceiving()1316 TransporterRegistry::stopReceiving(){
1317 /**
1318 * Disconnect all transporters, this includes detach from remote node
1319 * and since that must be done from the same process that called attach
1320 * it's done here in the receive thread
1321 */
1322 disconnectAll();
1323 }
1324
1325 void
startSending()1326 TransporterRegistry::startSending(){
1327 }
1328
1329 void
stopSending()1330 TransporterRegistry::stopSending(){
1331 }
1332
operator <<(NdbOut & out,SignalHeader & sh)1333 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
1334 out << "-- Signal Header --" << endl;
1335 out << "theLength: " << sh.theLength << endl;
1336 out << "gsn: " << sh.theVerId_signalNumber << endl;
1337 out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
1338 out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
1339 out << "sendersSig: " << sh.theSendersSignalId << endl;
1340 out << "theSignalId: " << sh.theSignalId << endl;
1341 out << "trace: " << (int)sh.theTrace << endl;
1342 return out;
1343 }
1344
1345 Transporter*
get_transporter(NodeId nodeId)1346 TransporterRegistry::get_transporter(NodeId nodeId) {
1347 return theTransporters[nodeId];
1348 }
1349
connect_client(NdbMgmHandle * h)1350 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
1351 {
1352 DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
1353
1354 Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
1355
1356 if(!mgm_nodeid)
1357 {
1358 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1359 return false;
1360 }
1361 Transporter * t = theTransporters[mgm_nodeid];
1362 if (!t)
1363 {
1364 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1365 return false;
1366 }
1367 DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
1368 }
1369
1370 /**
1371 * Given a connected NdbMgmHandle, turns it into a transporter
1372 * and returns the socket.
1373 */
connect_ndb_mgmd(NdbMgmHandle * h)1374 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
1375 {
1376 struct ndb_mgm_reply mgm_reply;
1377
1378 if ( h==NULL || *h == NULL )
1379 {
1380 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1381 return NDB_INVALID_SOCKET;
1382 }
1383
1384 for(unsigned int i=0;i < m_transporter_interface.size();i++)
1385 if (m_transporter_interface[i].m_s_service_port < 0
1386 && ndb_mgm_set_connection_int_parameter(*h,
1387 get_localNodeId(),
1388 m_transporter_interface[i].m_remote_nodeId,
1389 CFG_CONNECTION_SERVER_PORT,
1390 m_transporter_interface[i].m_s_service_port,
1391 &mgm_reply) < 0)
1392 {
1393 g_eventLogger.error("Error: %s: %d",
1394 ndb_mgm_get_latest_error_desc(*h),
1395 ndb_mgm_get_latest_error(*h));
1396 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1397 ndb_mgm_destroy_handle(h);
1398 return NDB_INVALID_SOCKET;
1399 }
1400
1401 /**
1402 * convert_to_transporter also disposes of the handle (i.e. we don't leak
1403 * memory here.
1404 */
1405 NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
1406 if ( sockfd == NDB_INVALID_SOCKET)
1407 {
1408 g_eventLogger.error("Error: %s: %d",
1409 ndb_mgm_get_latest_error_desc(*h),
1410 ndb_mgm_get_latest_error(*h));
1411 g_eventLogger.error("%s: %d", __FILE__, __LINE__);
1412 ndb_mgm_destroy_handle(h);
1413 }
1414 return sockfd;
1415 }
1416
1417 /**
1418 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
1419 * and returns the socket.
1420 */
connect_ndb_mgmd(SocketClient * sc)1421 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
1422 {
1423 NdbMgmHandle h= ndb_mgm_create_handle();
1424
1425 if ( h == NULL )
1426 {
1427 return NDB_INVALID_SOCKET;
1428 }
1429
1430 /**
1431 * Set connectstring
1432 */
1433 {
1434 BaseString cs;
1435 cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
1436 ndb_mgm_set_connectstring(h, cs.c_str());
1437 }
1438
1439 if(ndb_mgm_connect(h, 0, 0, 0)<0)
1440 {
1441 ndb_mgm_destroy_handle(&h);
1442 return NDB_INVALID_SOCKET;
1443 }
1444
1445 return connect_ndb_mgmd(&h);
1446 }
1447
1448 template class Vector<TransporterRegistry::Transporter_interface>;
1449