1 /*
2    Copyright (C) 2003-2006 MySQL AB
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 #include <ndb_global.h>
27 
28 #include "TransporterRegistry.hpp"
29 #include "TransporterDefinitions.hpp"
30 #include "TransporterCallback.hpp"
31 #include <RefConvert.hpp>
32 
33 #include <NdbTick.h>
34 #include <NdbMain.h>
35 #include <NdbOut.hpp>
36 #include <NdbSleep.h>
37 
38 int basePortTCP = 17000;
39 
40 SCI_TransporterConfiguration sciTemplate = {
41   8000,
42        // Packet size
43   2500000,      // Buffer size
44   2,           // number of adapters
45   1,           // remote node id SCI
46   2,           // Remote node Id SCI
47   0,           // local ndb node id (server)
48   0,           // remote ndb node id (client)
49   0,              // byteOrder;
50   false,          // compression;
51   true,           // checksum;
52   true            // signalId;
53 };
54 
55 TCP_TransporterConfiguration tcpTemplate = {
56   17000,          // port;
57   "",             // remoteHostName;
58   "",             // localhostname
59   2,              // remoteNodeId;
60   1,              // localNodeId;
61   10000,          // sendBufferSize - Size of SendBuffer of priority B
62   10000,          // maxReceiveSize - Maximum no of bytes to receive
63   0,              // byteOrder;
64   false,          // compression;
65   true,           // checksum;
66   true            // signalId;
67 };
68 
69 SHM_TransporterConfiguration shmTemplate = {
70   0,      //remoteNodeId
71   0,      //localNodeId;
72   false,  //compression
73   true,   //checksum;
74   true,   //signalId;
75   0,      //byteOrder;
76   123,    //shmKey;
77   2500000 //shmSize;
78 };
79 
80 TransporterRegistry *tReg = 0;
81 
82 #include <signal.h>
83 
84 extern "C"
85 void
signalHandler(int signo)86 signalHandler(int signo){
87   ::signal(13, signalHandler);
88   char buf[255];
89   sprintf(buf,"Signal: %d\n", signo);
90   ndbout << buf << endl;
91 }
92 
93 void
usage(const char * progName)94 usage(const char * progName){
95   ndbout << "Usage: " << progName << " <type> localNodeId localHostName"
96 	 << " remoteHostName1 remoteHostName2" << endl;
97   ndbout << "  type = shm tcp ose sci" << endl;
98   ndbout << "  localNodeId - 1 to 3" << endl;
99 }
100 
101 typedef void (* CreateTransporterFunc)(void * conf,
102 				       NodeId localNodeId,
103 				       NodeId remoteNodeId,
104 				       const char * localHostName,
105 				       const char * remoteHostName);
106 
107 void createSCITransporter(void *, NodeId, NodeId, const char *, const char *);
108 void createTCPTransporter(void *, NodeId, NodeId, const char *, const char *);
109 void createSHMTransporter(void *, NodeId, NodeId, const char *, const char *);
110 
111 int signalReceived[4];
112 
113 int
main(int argc,const char ** argv)114 main(int argc, const char **argv){
115 
116   signalHandler(0);
117 
118   for(int i = 0; i<4; i++)
119     signalReceived[i] = 0;
120 
121   if(argc < 5){
122     usage(argv[0]);
123     return 0;
124   }
125 
126   Uint32 noOfConnections     = 0;
127   const char * progName      = argv[0];
128   const char * type          = argv[1];
129   const NodeId localNodeId   = atoi(argv[2]);
130   const char * localHostName = argv[3];
131   const char * remoteHost1   = argv[4];
132   const char * remoteHost2   = NULL;
133 
134   if(argc == 5)
135     noOfConnections = 1;
136   else {
137     noOfConnections = 2;
138     remoteHost2 = argv[5];
139   }
140 
141   if(localNodeId < 1 || localNodeId > 3){
142     ndbout << "localNodeId = " << localNodeId << endl << endl;
143     usage(progName);
144     return 0;
145   }
146 
147   ndbout << "-----------------" << endl;
148   ndbout << "localNodeId:           " << localNodeId << endl;
149   ndbout << "localHostName:         " << localHostName << endl;
150   ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
151 	 << remoteHost1 << endl;
152   if(noOfConnections == 2){
153     ndbout << "remoteHost2 (node " << (localNodeId == 3?2:3) << "): "
154 	   << remoteHost2 << endl;
155   }
156   ndbout << "-----------------" << endl;
157 
158   void * confTemplate = 0;
159   CreateTransporterFunc func = 0;
160 
161   if(strcasecmp(type, "tcp") == 0){
162     func = createTCPTransporter;
163     confTemplate = &tcpTemplate;
164   } else if(strcasecmp(type, "sci") == 0){
165     func = createSCITransporter;
166     confTemplate = &sciTemplate;
167   } else if(strcasecmp(type, "shm") == 0){
168     func = createSHMTransporter;
169     confTemplate = &shmTemplate;
170   } else {
171     ndbout << "Unsupported transporter type" << endl;
172     return 0;
173   }
174 
175   ndbout << "Creating transporter registry" << endl;
176   tReg = new TransporterRegistry;
177   tReg->init(localNodeId);
178 
179   switch(localNodeId){
180   case 1:
181     (* func)(confTemplate, 1, 2, localHostName, remoteHost1);
182     if(noOfConnections == 2)
183       (* func)(confTemplate, 1, 3, localHostName, remoteHost2);
184     break;
185   case 2:
186     (* func)(confTemplate, 2, 1, localHostName, remoteHost1);
187     if(noOfConnections == 2)
188       (* func)(confTemplate, 2, 3, localHostName, remoteHost2);
189     break;
190   case 3:
191     (* func)(confTemplate, 3, 1, localHostName, remoteHost1);
192     if(noOfConnections == 2)
193       (* func)(confTemplate, 3, 2, localHostName, remoteHost2);
194     break;
195   }
196 
197   ndbout << "Doing startSending/startReceiving" << endl;
198   tReg->startSending();
199   tReg->startReceiving();
200 
201   ndbout << "Connecting" << endl;
202   tReg->setPerformState(PerformConnect);
203   tReg->checkConnections();
204 
205   unsigned sum = 0;
206   do {
207     sum = 0;
208     for(int i = 0; i<4; i++)
209       sum += signalReceived[i];
210 
211     tReg->checkConnections();
212 
213     tReg->external_IO(500);
214     NdbSleep_MilliSleep(500);
215 
216     ndbout << "In main loop" << endl;
217   } while(sum != 2*noOfConnections);
218 
219   ndbout << "Doing setPerformState(Disconnect)" << endl;
220   tReg->setPerformState(PerformDisconnect);
221 
222   ndbout << "Doing checkConnections()" << endl;
223   tReg->checkConnections();
224 
225   ndbout << "Sleeping 3 secs" << endl;
226   NdbSleep_SecSleep(3);
227 
228   ndbout << "Deleting transporter registry" << endl;
229   delete tReg; tReg = 0;
230 
231   return 0;
232 }
233 
234 void
checkData(SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])235 checkData(SignalHeader * const header, Uint8 prio, Uint32 * const theData,
236 	  LinearSectionPtr ptr[3]){
237   Uint32 expectedLength = 0;
238   if(prio == 0)
239     expectedLength = 17;
240   else
241     expectedLength = 19;
242 
243   if(header->theLength != expectedLength){
244     ndbout << "Unexpected signal length: " << header->theLength
245 	   << " expected: " << expectedLength << endl;
246     abort();
247   }
248 
249   if(header->theVerId_signalNumber != expectedLength + 1)
250     abort();
251 
252   if(header->theReceiversBlockNumber != expectedLength + 2)
253     abort();
254 
255   if(refToBlock(header->theSendersBlockRef) != expectedLength + 3)
256     abort();
257 
258   if(header->theSendersSignalId != expectedLength + 5)
259     abort();
260 
261   if(header->theTrace != expectedLength + 6)
262     abort();
263 
264   if(header->m_noOfSections != (prio == 0 ? 0 : 1))
265     abort();
266 
267   if(header->m_fragmentInfo != (prio + 1))
268     abort();
269 
270   Uint32 dataWordStart = header->theLength ;
271   for(unsigned i = 0; i<header->theLength; i++){
272     if(theData[i] != i){ //dataWordStart){
273       ndbout << "data corrupt!\n" << endl;
274       abort();
275     }
276     dataWordStart ^= (~i*i);
277   }
278 
279   if(prio != 0){
280     ndbout_c("Found section");
281     if(ptr[0].sz != header->theLength)
282       abort();
283 
284     if(memcmp(ptr[0].p, theData, (ptr[0].sz * 4)) != 0)
285       abort();
286   }
287 }
288 
289 void
sendSignalTo(NodeId nodeId,int prio)290 sendSignalTo(NodeId nodeId, int prio){
291   SignalHeader sh;
292   sh.theLength = (prio == 0 ? 17 : 19);
293   sh.theVerId_signalNumber   = sh.theLength + 1;
294   sh.theReceiversBlockNumber = sh.theLength + 2;
295   sh.theSendersBlockRef      = sh.theLength + 3;
296   sh.theSendersSignalId      = sh.theLength + 4;
297   sh.theSignalId             = sh.theLength + 5;
298   sh.theTrace                = sh.theLength + 6;
299   sh.m_noOfSections          = (prio == 0 ? 0 : 1);
300   sh.m_fragmentInfo          = prio + 1;
301 
302   Uint32 theData[25];
303 
304   Uint32 dataWordStart = sh.theLength;
305   for(unsigned i = 0; i<sh.theLength; i++){
306     theData[i] = i;
307     dataWordStart ^= (~i*i);
308   }
309   ndbout << "Sending prio " << (int)prio << " signal to node: "
310 	 << nodeId
311 	 << " gsn = " << sh.theVerId_signalNumber << endl;
312 
313   LinearSectionPtr ptr[3];
314   ptr[0].p = &theData[0];
315   ptr[0].sz = sh.theLength;
316 
317   SendStatus s = tReg->prepareSend(&sh, prio, theData, nodeId, ptr);
318   if(s != SEND_OK){
319     ndbout << "Send was not ok. Send was: " << s << endl;
320   }
321 }
322 
323 void
execute(void * callbackObj,SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])324 execute(void* callbackObj,
325 	SignalHeader * const header, Uint8 prio, Uint32 * const theData,
326 	LinearSectionPtr ptr[3]){
327   const NodeId nodeId = refToNode(header->theSendersBlockRef);
328 
329   ndbout << "Recieved prio " << (int)prio << " signal from node: "
330 	 << nodeId
331 	 << " gsn = " << header->theVerId_signalNumber << endl;
332   checkData(header, prio, theData, ptr);
333   ndbout << " Data is ok!\n" << endl;
334 
335   signalReceived[nodeId]++;
336 
337   if(prio == 0)
338     sendSignalTo(nodeId, 1);
339   else
340     tReg->setPerformState(nodeId, PerformDisconnect);
341 }
342 
343 void
copy(Uint32 * & insertPtr,class SectionSegmentPool & thePool,const SegmentedSectionPtr & _ptr)344 copy(Uint32 * & insertPtr,
345      class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
346   abort();
347 }
348 
349 void
reportError(void * callbackObj,NodeId nodeId,TransporterError errorCode)350 reportError(void* callbackObj, NodeId nodeId, TransporterError errorCode){
351   char buf[255];
352   sprintf(buf, "reportError (%d, %x)", nodeId, errorCode);
353   ndbout << buf << endl;
354   if(errorCode & 0x8000){
355     tReg->setPerformState(nodeId, PerformDisconnect);
356     abort();
357   }
358 }
359 
360 /**
361  * Report average send theLength in bytes (4096 last sends)
362  */
363 void
reportSendLen(void * callbackObj,NodeId nodeId,Uint32 count,Uint64 bytes)364 reportSendLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
365   char buf[255];
366   sprintf(buf, "reportSendLen(%d, %d)", nodeId, (Uint32)(bytes/count));
367   ndbout << buf << endl;
368 }
369 
370 /**
371  * Report average receive theLength in bytes (4096 last receives)
372  */
373 void
reportReceiveLen(void * callbackObj,NodeId nodeId,Uint32 count,Uint64 bytes)374 reportReceiveLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
375   char buf[255];
376   sprintf(buf, "reportReceiveLen(%d, %d)", nodeId, (Uint32)(bytes/count));
377   ndbout << buf << endl;
378 }
379 
380 /**
381  * Report connection established
382  */
383 void
reportConnect(void * callbackObj,NodeId nodeId)384 reportConnect(void* callbackObj, NodeId nodeId){
385   char buf[255];
386   sprintf(buf, "reportConnect(%d)", nodeId);
387   ndbout << buf << endl;
388   tReg->setPerformState(nodeId, PerformIO);
389 
390   sendSignalTo(nodeId, 0);
391 }
392 
393 /**
394  * Report connection broken
395  */
396 void
reportDisconnect(void * callbackObj,NodeId nodeId,Uint32 errNo)397 reportDisconnect(void* callbackObj, NodeId nodeId, Uint32 errNo){
398   char buf[255];
399   sprintf(buf, "reportDisconnect(%d)", nodeId);
400   ndbout << buf << endl;
401   if(signalReceived[nodeId] < 2)
402     tReg->setPerformState(nodeId, PerformConnect);
403 }
404 
405 int
checkJobBuffer()406 checkJobBuffer() {
407   /**
408    * Check to see if jobbbuffers are starting to get full
409    * and if so call doJob
410    */
411   return 0;
412 }
413 
414 void
createOSETransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName)415 createOSETransporter(void * _conf,
416 		     NodeId localNodeId,
417 		     NodeId remoteNodeId,
418 		     const char * localHostName,
419 		     const char * remoteHostName){
420   ndbout << "Creating OSE transporter from node "
421 	 << localNodeId << "(" << localHostName << ") to "
422 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
423 
424   OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;
425 
426   conf->localNodeId    = localNodeId;
427   conf->localHostName  = localHostName;
428   conf->remoteNodeId   = remoteNodeId;
429   conf->remoteHostName = remoteHostName;
430   bool res = tReg->createTransporter(conf);
431   if(res)
432     ndbout << "... -- Success " << endl;
433   else
434     ndbout << "... -- Failure " << endl;
435 }
436 
437 void
createTCPTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName)438 createTCPTransporter(void * _conf,
439 		     NodeId localNodeId,
440 		     NodeId remoteNodeId,
441 		     const char * localHostName,
442 		     const char * remoteHostName){
443   ndbout << "Creating TCP transporter from node "
444 	 << localNodeId << "(" << localHostName << ") to "
445 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
446 
447   TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
448 
449   int port;
450   if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
451   if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
452   if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
453   if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
454   if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
455   if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
456 
457   conf->localNodeId    = localNodeId;
458   conf->localHostName  = localHostName;
459   conf->remoteNodeId   = remoteNodeId;
460   conf->remoteHostName = remoteHostName;
461   conf->port           = port;
462   bool res = tReg->createTransporter(conf);
463   if(res)
464     ndbout << "... -- Success " << endl;
465   else
466     ndbout << "... -- Failure " << endl;
467 }
468 
469 void
createSCITransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName)470 createSCITransporter(void * _conf,
471 		     NodeId localNodeId,
472 		     NodeId remoteNodeId,
473 		     const char * localHostName,
474 		     const char * remoteHostName){
475 
476 
477   ndbout << "Creating SCI transporter from node "
478 	 << localNodeId << "(" << localHostName << ") to "
479 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
480 
481 
482   SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
483 
484   conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
485   conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
486 
487 
488   conf->localNodeId    = localNodeId;
489   conf->remoteNodeId   = remoteNodeId;
490 
491   bool res = tReg->createTransporter(conf);
492   if(res)
493     ndbout << "... -- Success " << endl;
494   else
495     ndbout << "... -- Failure " << endl;
496 }
497 
498 void
createSHMTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName)499 createSHMTransporter(void * _conf,
500 		     NodeId localNodeId,
501 		     NodeId remoteNodeId,
502 		     const char * localHostName,
503 		     const char * remoteHostName){
504 
505 
506   ndbout << "Creating SHM transporter from node "
507 	 << localNodeId << "(" << localHostName << ") to "
508 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
509 
510 
511   SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
512 
513   conf->localNodeId    = localNodeId;
514   conf->remoteNodeId   = remoteNodeId;
515 
516   bool res = tReg->createTransporter(conf);
517   if(res)
518     ndbout << "... -- Success " << endl;
519   else
520     ndbout << "... -- Failure " << endl;
521 }
522