1 /*
2    Copyright (C) 2003-2006 MySQL AB
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 #include <ndb_global.h>
27 
28 #include "TransporterRegistry.hpp"
29 #include "TransporterDefinitions.hpp"
30 #include "TransporterCallback.hpp"
31 #include <RefConvert.hpp>
32 
33 #include <NdbTick.h>
34 #include <NdbMain.h>
35 #include <NdbOut.hpp>
36 #include <NdbSleep.h>
37 
38 int basePortTCP = 17000;
39 
40 SCI_TransporterConfiguration sciTemplate = {
41   2000,
42        // Packet size
43   2000000,      // Buffer size
44   2,           // number of adapters
45   1,           // remote node id SCI
46   2,           // Remote node Id SCI
47   0,           // local ndb node id (server)
48   0,           // remote ndb node id (client)
49   0,              // byteOrder;
50   false,          // compression;
51   true,          // checksum;
52   true            // signalId;
53 };
54 
55 
56 SHM_TransporterConfiguration shmTemplate = {
57   0,      //remoteNodeId
58   0,      //localNodeId;
59   false,  //compression
60   true,   //checksum;
61   true,   //signalId;
62   0,      //byteOrder;
63   123,    //shmKey;
64   25000000 //shmSize;
65 };
66 
67 
68 TCP_TransporterConfiguration tcpTemplate = {
69   17000,          // port;
70   "",             // remoteHostName;
71   "",             // localhostname
72   2,              // remoteNodeId;
73   1,              // localNodeId;
74   25000000,        // sendBufferSize - Size of SendBuffer of priority B
75   5000000,         // maxReceiveSize - Maximum no of bytes to receive
76   0,              // byteOrder;
77   false,          // compression;
78   true,           // checksum;
79   true            // signalId;
80 };
81 
82 TransporterRegistry *tReg = 0;
83 
84 #include <signal.h>
85 
86 extern "C"
87 void
signalHandler(int signo)88 signalHandler(int signo){
89   ::signal(13, signalHandler);
90   char buf[255];
91   sprintf(buf,"Signal: %d\n", signo);
92   ndbout << buf << endl;
93 }
94 
95 void
usage(const char * progName)96 usage(const char * progName){
97   ndbout << "Usage: " << progName << " <type> localNodeId localHostName"
98 	 << " remoteHostName"
99 	 << " [<loop count>] [<send buf size>] [<recv buf size>]" << endl;
100   ndbout << "  type = shm tcp ose sci" << endl;
101   ndbout << "  localNodeId - {1,2}" << endl;
102 }
103 
104 typedef void (* CreateTransporterFunc)(void * conf,
105 				       NodeId localNodeId,
106 				       NodeId remoteNodeId,
107 				       const char * localHostName,
108 				       const char * remoteHostName,
109 				       int sendBuf,
110 				       int recvBuf);
111 
112 void
113 createTCPTransporter(void*, NodeId, NodeId, const char*, const char*, int, int);
114 void
115 createSHMTransporter(void*, NodeId, NodeId, const char*, const char*, int, int);
116 void
117 createSCITransporter(void*, NodeId, NodeId, const char*, const char*, int, int);
118 
119 struct TestPhase {
120   int signalSize;
121   int noOfSignals;
122   int noOfSignalSent;
123   int noOfSignalReceived;
124   NDB_TICKS startTime;
125   NDB_TICKS stopTime;
126   NDB_TICKS accTime;
127   int loopCount;
128   Uint64 sendLenBytes, sendCount;
129   Uint64 recvLenBytes, recvCount;
130 };
131 
132 TestPhase testSpec[] = {
133    {  1,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
134   ,{  1,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
135   ,{  1,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
136   ,{  1, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
137 
138   ,{  8,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
139   ,{  8,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
140   ,{  8,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
141   ,{  8, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
142 
143   ,{ 16,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
144   ,{ 16,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
145   ,{ 16,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
146   ,{ 16, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
147 
148   ,{ 24,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
149   ,{ 24,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
150   ,{ 24,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
151   ,{ 24, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
152 
153   ,{  0,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of random size
154   ,{  0,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of random size
155   ,{  0,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of random size
156   ,{  0, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of random size
157 
158   ,{ 100,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals
159   ,{ 100,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals
160   ,{ 100,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals
161   ,{ 100, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals
162 
163   ,{ 500,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals
164   ,{ 500,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals
165   ,{ 500,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals
166   ,{ 500, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals
167 
168   ,{ 1000,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals
169   ,{ 1000,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals
170   ,{ 1000,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals
171   ,{ 1000, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals
172 };
173 
174 const int noOfTests = sizeof(testSpec)/sizeof(TestPhase);
175 
176 Uint32 StaticBuffer[1000];
177 
178 SendStatus
sendSignalTo(NodeId nodeId,int signalSize,Uint32 count)179 sendSignalTo(NodeId nodeId, int signalSize, Uint32 count){
180   if(signalSize == 0)
181     signalSize = (rand() % 25) + 1;
182 
183   SignalHeader sh;
184   sh.theLength               = (signalSize > 25 ? 25 : signalSize);
185   sh.theVerId_signalNumber   = count;
186   sh.theReceiversBlockNumber = rand();
187   sh.theSendersBlockRef      = rand();
188   sh.theSendersSignalId      = rand();
189   sh.theSignalId             = rand();
190   sh.theTrace                = rand();
191 
192   Uint32 theData[25];
193   for(int i = 0; i<25; i++)
194     theData[i] = (i+1) * (Uint32)(&theData[i]);
195 
196   theData[0] = count;
197   LinearSectionPtr ptr[3];
198 
199   if(signalSize <= 25){
200     sh.m_noOfSections = 0;
201   } else {
202     sh.m_noOfSections = 1;
203     ptr[0].sz = signalSize - 25;
204     ptr[0].p = &StaticBuffer[0];
205   }
206 
207   return tReg->prepareSend(&sh, 1, theData, nodeId, ptr);
208 }
209 
210 void
reportHeader()211 reportHeader(){
212   ndbout << "#Sigs\tSz\tTime\tSig/sec\tBps\tBps-tot\t"
213 	 << "s len\tr len" << endl;
214 }
215 
216 void
print(char * dst,int i)217 print(char * dst, int i){
218   if(i > 1000000){
219     const int d = i / 1000000;
220     const int r = (i - (d * 1000000)) / 100000;
221     if(d < 100)
222       sprintf(dst, "%d.%dM", d, r);
223     else
224       sprintf(dst, "%dM", d);
225   } else if(i > 1000){
226     const int d = i / 1000;
227     const int r = (i - (d * 1000)) / 100;
228     if(d < 100)
229       sprintf(dst, "%d.%dk", d, r);
230     else
231       sprintf(dst, "%dk", d);
232   } else {
233     sprintf(dst, "%d", i);
234   }
235 }
236 
237 void
printReport(TestPhase & p)238 printReport(TestPhase & p){
239   if(p.accTime > 0) {
240     Uint32 secs = (p.accTime/p.loopCount)/1000;
241     Uint32 mill = (p.accTime/p.loopCount)%1000;
242     char st[255];
243     if(secs > 0){
244       sprintf(st, "%d.%.2ds", secs, (mill/10));
245     } else {
246       sprintf(st, "%dms", mill);
247     }
248 
249     Uint32 sps = (1000*p.noOfSignals*p.loopCount)/p.accTime;
250     Uint32 dps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*p.signalSize);
251     Uint32 bps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(p.signalSize+3));
252     if(p.signalSize == 0){
253       dps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(13));
254       bps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(13+3));
255     }
256     char ssps[255];
257     char sbps[255];
258     char sdps[255];
259 
260     print(ssps, sps);
261     print(sbps, bps);
262     print(sdps, dps);
263 
264 
265     char buf[255];
266     if(p.signalSize != 0){
267       BaseString::snprintf(buf, 255,
268 	       "%d\t%d\t%s\t%s\t%s\t%s\t%d\t%d",
269 	       p.noOfSignals,
270 	       4*p.signalSize,
271 	       st,
272 	       ssps,
273 	       sdps,
274 	       sbps,
275 	       (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
276 	       (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)));
277     } else {
278       BaseString::snprintf(buf, 255,
279 	       "%d\trand\t%s\t%s\t%s\t%s\t%d\t%d",
280 	       p.noOfSignals,
281 	       st,
282 	       ssps,
283 	       sdps,
284 	       sbps,
285 	       (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
286 	       (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)));
287 
288     }
289     ndbout << buf << endl;
290   }
291 }
292 
293 int loopCount = 1;
294 int sendBufSz = -1;
295 int recvBufSz = -1;
296 
297 bool      isClient     = false;
298 bool      isConnected  = false;
299 bool      isStarted    = false;
300 int       currentPhase = 0;
301 TestPhase allPhases[noOfTests];
302 Uint32    signalToEcho;
303 Uint32    signalsEchoed;
304 NDB_TICKS startTime, stopTime;
305 
306 void
client(NodeId remoteNodeId)307 client(NodeId remoteNodeId){
308   isClient = true;
309 
310   currentPhase = 0;
311   memcpy(allPhases, testSpec, sizeof(testSpec));
312 
313   int counter = 0;
314   int sigCounter = 0;
315 
316   while(true){
317     TestPhase * current = &allPhases[currentPhase];
318     if(current->noOfSignals == current->noOfSignalSent &&
319        current->noOfSignals == current->noOfSignalReceived){
320 
321       /**
322        * Test phase done
323        */
324       current->stopTime  = NdbTick_CurrentMillisecond();
325       current->accTime  += (current->stopTime - current->startTime);
326 
327       NdbSleep_MilliSleep(500 / loopCount);
328 
329       current->startTime = NdbTick_CurrentMillisecond();
330 
331       current->noOfSignalSent     = 0;
332       current->noOfSignalReceived = 0;
333 
334       current->loopCount ++;
335       if(current->loopCount == loopCount){
336 
337 	printReport(allPhases[currentPhase]);
338 
339 	currentPhase ++;
340 	if(currentPhase == noOfTests){
341 	  /**
342 	   * Now we are done
343 	   */
344 	  break;
345 	}
346 	NdbSleep_MilliSleep(500);
347 	current = &allPhases[currentPhase];
348 	current->startTime = NdbTick_CurrentMillisecond();
349       }
350     }
351 
352     int signalsLeft = current->noOfSignals - current->noOfSignalSent;
353     if(signalsLeft > 0){
354       for(; signalsLeft > 0; signalsLeft--){
355 	if(sendSignalTo(remoteNodeId,current->signalSize,sigCounter)== SEND_OK){
356 	  current->noOfSignalSent++;
357 	  sigCounter++;
358 	} else {
359 	  ndbout << "Failed to send: " << sigCounter << endl;
360 	  tReg->external_IO(10);
361 	  break;
362 	}
363       }
364     }
365     if(counter % 10 == 0)
366       tReg->checkConnections();
367     tReg->external_IO(0);
368     counter++;
369   }
370 }
371 
372 void
server()373 server(){
374   isClient = false;
375 
376   signalToEcho = 0;
377   signalsEchoed = 0;
378   for(int i = 0; i<noOfTests; i++)
379     signalToEcho += testSpec[i].noOfSignals;
380 
381   signalToEcho *= loopCount;
382 
383   while(signalToEcho > signalsEchoed){
384     tReg->checkConnections();
385     for(int i = 0; i<10; i++)
386       tReg->external_IO(10);
387   }
388 }
389 
390 int
main(int argc,const char ** argv)391 main(int argc, const char **argv){
392 
393   const char * progName = argv[0];
394 
395   loopCount = 100;
396   sendBufSz = -1;
397   recvBufSz = -1;
398 
399   isClient     = false;
400   isConnected  = false;
401   isStarted    = false;
402   currentPhase = 0;
403 
404   signalHandler(0);
405 
406   if(argc < 5){
407     usage(progName);
408     return 0;
409   }
410 
411   const char * type = argv[1];
412   const NodeId localNodeId   = atoi(argv[2]);
413   const char * localHostName = argv[3];
414   const char * remoteHost1   = argv[4];
415 
416   if(argc >= 6)
417     loopCount = atoi(argv[5]);
418   if(argc >= 7)
419     sendBufSz = atoi(argv[6]);
420   if(argc >= 8)
421     recvBufSz = atoi(argv[7]);
422 
423   if(localNodeId < 1 || localNodeId > 2){
424     ndbout << "localNodeId = " << localNodeId << endl << endl;
425     usage(progName);
426     return 0;
427   }
428 
429   if(localNodeId == 1)
430     ndbout << "-- ECHO CLIENT --" << endl;
431   else
432     ndbout << "-- ECHO SERVER --" << endl;
433 
434   ndbout << "localNodeId:           " << localNodeId << endl;
435   ndbout << "localHostName:         " << localHostName << endl;
436   ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
437 	 << remoteHost1 << endl;
438   ndbout << "Loop count: " << loopCount << endl;
439   ndbout << "-----------------" << endl;
440 
441   void * confTemplate = 0;
442   CreateTransporterFunc func = 0;
443   if(strcasecmp(type, "tcp") == 0){
444     func = createTCPTransporter;
445     confTemplate = &tcpTemplate;
446   } else if(strcasecmp(type, "sci") == 0){
447     func = createSCITransporter;
448     confTemplate = &sciTemplate;
449   } else if(strcasecmp(type, "shm") == 0){
450     func = createSHMTransporter;
451     confTemplate = &shmTemplate;
452   } else {
453     ndbout << "Unsupported transporter type" << endl;
454     return 0;
455   }
456 
457   ndbout << "Creating transporter registry" << endl;
458   tReg = new TransporterRegistry;
459   tReg->init(localNodeId);
460 
461   switch(localNodeId){
462   case 1:
463     (* func)(confTemplate, 1, 2, localHostName, remoteHost1,
464 	     sendBufSz, recvBufSz);
465     break;
466   case 2:
467     (* func)(confTemplate, 2, 1, localHostName, remoteHost1,
468 	     sendBufSz, recvBufSz);
469     break;
470   }
471 
472   ndbout << "Doing startSending/startReceiving" << endl;
473   tReg->startSending();
474   tReg->startReceiving();
475 
476   ndbout << "Connecting" << endl;
477   tReg->setPerformState(PerformConnect);
478   tReg->checkConnections();
479 
480   if(localNodeId == 1)
481     client(2);
482   else
483     server();
484 
485   isStarted = false;
486 
487   ndbout << "Sleep 3 secs" << endl;
488   NdbSleep_SecSleep(3);
489 
490   ndbout << "Doing setPerformState(Disconnect)" << endl;
491   tReg->setPerformState(PerformDisconnect);
492 
493   ndbout << "Doing checkConnections()" << endl;
494   tReg->checkConnections();
495 
496   ndbout << "Deleting transporter registry" << endl;
497   delete tReg; tReg = 0;
498 
499   return 0;
500 }
501 
502 void
execute(void * callbackObj,SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])503 execute(void* callbackObj, SignalHeader * const header, Uint8 prio,
504 	Uint32 * const theData,
505 	LinearSectionPtr ptr[3]){
506   const NodeId nodeId = refToNode(header->theSendersBlockRef);
507 
508   if(isClient){
509     allPhases[currentPhase].noOfSignalReceived++;
510   } else {
511     int sleepTime = 10;
512     if(theData[0] != signalsEchoed){
513       ndbout << "Missing signal theData[0] = " << theData[0]
514 	     << " signalsEchoed = " << signalsEchoed << endl;
515       ndbout << (* header) << endl;
516       abort();
517     }
518     while(tReg->prepareSend(header, prio, theData, nodeId, ptr) != SEND_OK){
519       ndbout << "Failed to echo " << theData[0] << endl;
520       NdbSleep_MilliSleep(sleepTime);
521       // sleepTime += 10;
522     }
523     signalsEchoed++;
524   }
525 }
526 
527 void
copy(Uint32 * & insertPtr,class SectionSegmentPool & thePool,const SegmentedSectionPtr & _ptr)528 copy(Uint32 * & insertPtr,
529      class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
530   abort();
531 }
532 
533 void
reportError(void * callbackObj,NodeId nodeId,TransporterError errorCode)534 reportError(void* callbackObj, NodeId nodeId, TransporterError errorCode){
535   char buf[255];
536   sprintf(buf, "reportError (%d, %x) in perfTest", nodeId, errorCode);
537   ndbout << buf << endl;
538   if(errorCode & 0x8000 && errorCode != 0x8014){
539     abort(); //tReg->setPerformState(nodeId, PerformDisconnect);
540   }
541 }
542 
543 /**
544  * Report average send theLength in bytes (4096 last sends)
545  */
546 void
reportSendLen(void * callbackObj,NodeId nodeId,Uint32 count,Uint64 bytes)547 reportSendLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
548   allPhases[currentPhase].sendCount    += count;
549   allPhases[currentPhase].sendLenBytes += bytes;
550 
551   if(!isClient){
552     ndbout << "reportSendLen(" << nodeId << ", "
553 	   << (bytes/count) << ")" << endl;
554   }
555 }
556 
557 /**
558  * Report average receive theLength in bytes (4096 last receives)
559  */
560 void
reportReceiveLen(void * callbackObj,NodeId nodeId,Uint32 count,Uint64 bytes)561 reportReceiveLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
562   allPhases[currentPhase].recvCount    += count;
563   allPhases[currentPhase].recvLenBytes += bytes;
564 
565   if(!isClient){
566     ndbout << "reportReceiveLen(" << nodeId << ", "
567 	   << (bytes/count) << ")" << endl;
568   }
569 }
570 
571 /**
572  * Report connection established
573  */
574 void
reportConnect(void * callbackObj,NodeId nodeId)575 reportConnect(void* callbackObj, NodeId nodeId){
576   char buf[255];
577   sprintf(buf, "reportConnect(%d)", nodeId);
578   ndbout << buf << endl;
579   tReg->setPerformState(nodeId, PerformIO);
580 
581   if(!isStarted){
582     isStarted = true;
583     startTime = NdbTick_CurrentMillisecond();
584     if(isClient){
585       reportHeader();
586       allPhases[0].startTime = startTime;
587     }
588   }
589   else{
590     // Resend signals that were lost when connection failed
591     TestPhase * current = &allPhases[currentPhase];
592     current->noOfSignalSent = current->noOfSignalReceived;
593   }
594 }
595 
596 /**
597  * Report connection broken
598  */
599 void
reportDisconnect(void * callbackObj,NodeId nodeId,Uint32 errNo)600 reportDisconnect(void* callbackObj, NodeId nodeId, Uint32 errNo){
601   char buf[255];
602   sprintf(buf, "reportDisconnect(%d)", nodeId);
603   ndbout << buf << endl;
604 
605   if(isStarted)
606     tReg->setPerformState(nodeId, PerformConnect);
607 }
608 
609 
610 int
checkJobBuffer()611 checkJobBuffer() {
612   /**
613    * Check to see if jobbbuffers are starting to get full
614    * and if so call doJob
615    */
616   return 0;
617 }
618 
619 void
createSCITransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendbuf,int recvbuf)620 createSCITransporter(void * _conf,
621 		     NodeId localNodeId,
622 		     NodeId remoteNodeId,
623 		     const char * localHostName,
624 		     const char * remoteHostName,
625 		     int sendbuf,
626 		     int recvbuf) {
627 
628 
629   ndbout << "Creating SCI transporter from node "
630 	 << localNodeId << "(" << localHostName << ") to "
631 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
632 
633 
634   SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
635 
636   conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
637   conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
638 
639 
640   conf->localNodeId    = localNodeId;
641   conf->remoteNodeId   = remoteNodeId;
642 
643   bool res = tReg->createTransporter(conf);
644   if(res)
645     ndbout << "... -- Success " << endl;
646   else
647     ndbout << "... -- Failure " << endl;
648 }
649 
650 void
createSHMTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendbuf,int recvbuf)651 createSHMTransporter(void * _conf,
652 		     NodeId localNodeId,
653 		     NodeId remoteNodeId,
654 		     const char * localHostName,
655 		     const char * remoteHostName,
656 		     int sendbuf,
657 		     int recvbuf) {
658 
659 
660   ndbout << "Creating SHM transporter from node "
661 	 << localNodeId << "(" << localHostName << ") to "
662 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
663 
664 
665   SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
666 
667 
668   conf->localNodeId    = localNodeId;
669   conf->remoteNodeId   = remoteNodeId;
670 
671   bool res = tReg->createTransporter(conf);
672   if(res)
673     ndbout << "... -- Success " << endl;
674   else
675     ndbout << "... -- Failure " << endl;
676 }
677 
678 
679 void
createTCPTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendBuf,int recvBuf)680 createTCPTransporter(void * _conf,
681 		     NodeId localNodeId,
682 		     NodeId remoteNodeId,
683 		     const char * localHostName,
684 		     const char * remoteHostName,
685 		     int sendBuf,
686 		     int recvBuf){
687   ndbout << "Creating TCP transporter from node "
688 	 << localNodeId << "(" << localHostName << ") to "
689 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
690 
691   TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
692 
693   int port;
694   if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
695   if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
696   if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
697   if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
698   if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
699   if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
700 
701   if(sendBuf != -1){
702     conf->sendBufferSize = sendBuf;
703   }
704   if(recvBuf != -1){
705     conf->maxReceiveSize = recvBuf;
706   }
707 
708   ndbout << "\tSendBufferSize:    " << conf->sendBufferSize << endl;
709   ndbout << "\tReceiveBufferSize: " << conf->maxReceiveSize << endl;
710 
711   conf->localNodeId    = localNodeId;
712   conf->localHostName  = localHostName;
713   conf->remoteNodeId   = remoteNodeId;
714   conf->remoteHostName = remoteHostName;
715   conf->port           = port;
716   bool res = tReg->createTransporter(conf);
717   if(res)
718     ndbout << "... -- Success " << endl;
719   else
720     ndbout << "... -- Failure " << endl;
721 }
722