1 /*
2 Copyright (C) 2003-2006 MySQL AB
3 All rights reserved. Use is subject to license terms.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #include <ndb_global.h>
27
28 #include "TransporterRegistry.hpp"
29 #include "TransporterDefinitions.hpp"
30 #include "TransporterCallback.hpp"
31 #include <RefConvert.hpp>
32
33 #include <NdbTick.h>
34 #include <NdbMain.h>
35 #include <NdbOut.hpp>
36 #include <NdbSleep.h>
37
38 int basePortTCP = 17000;
39
40 SCI_TransporterConfiguration sciTemplate = {
41 8000,
42 // Packet size
43 2500000, // Buffer size
44 2, // number of adapters
45 1, // remote node id SCI
46 2, // Remote node Id SCI
47 0, // local ndb node id (server)
48 0, // remote ndb node id (client)
49 0, // byteOrder;
50 false, // compression;
51 true, // checksum;
52 true // signalId;
53 };
54
55 TCP_TransporterConfiguration tcpTemplate = {
56 17000, // port;
57 "", // remoteHostName;
58 "", // localhostname
59 2, // remoteNodeId;
60 1, // localNodeId;
61 10000, // sendBufferSize - Size of SendBuffer of priority B
62 10000, // maxReceiveSize - Maximum no of bytes to receive
63 0, // byteOrder;
64 false, // compression;
65 true, // checksum;
66 true // signalId;
67 };
68
69 SHM_TransporterConfiguration shmTemplate = {
70 0, //remoteNodeId
71 0, //localNodeId;
72 false, //compression
73 true, //checksum;
74 true, //signalId;
75 0, //byteOrder;
76 123, //shmKey;
77 2500000 //shmSize;
78 };
79
80 TransporterRegistry *tReg = 0;
81
82 #include <signal.h>
83
84 extern "C"
85 void
signalHandler(int signo)86 signalHandler(int signo){
87 ::signal(13, signalHandler);
88 char buf[255];
89 sprintf(buf,"Signal: %d\n", signo);
90 ndbout << buf << endl;
91 }
92
93 void
usage(const char * progName)94 usage(const char * progName){
95 ndbout << "Usage: " << progName << " <type> localNodeId localHostName"
96 << " remoteHostName1 remoteHostName2" << endl;
97 ndbout << " type = shm tcp ose sci" << endl;
98 ndbout << " localNodeId - 1 to 3" << endl;
99 }
100
101 typedef void (* CreateTransporterFunc)(void * conf,
102 NodeId localNodeId,
103 NodeId remoteNodeId,
104 const char * localHostName,
105 const char * remoteHostName);
106
107 void createSCITransporter(void *, NodeId, NodeId, const char *, const char *);
108 void createTCPTransporter(void *, NodeId, NodeId, const char *, const char *);
109 void createSHMTransporter(void *, NodeId, NodeId, const char *, const char *);
110
111 int signalReceived[4];
112
113 int
main(int argc,const char ** argv)114 main(int argc, const char **argv){
115
116 signalHandler(0);
117
118 for(int i = 0; i<4; i++)
119 signalReceived[i] = 0;
120
121 if(argc < 5){
122 usage(argv[0]);
123 return 0;
124 }
125
126 Uint32 noOfConnections = 0;
127 const char * progName = argv[0];
128 const char * type = argv[1];
129 const NodeId localNodeId = atoi(argv[2]);
130 const char * localHostName = argv[3];
131 const char * remoteHost1 = argv[4];
132 const char * remoteHost2 = NULL;
133
134 if(argc == 5)
135 noOfConnections = 1;
136 else {
137 noOfConnections = 2;
138 remoteHost2 = argv[5];
139 }
140
141 if(localNodeId < 1 || localNodeId > 3){
142 ndbout << "localNodeId = " << localNodeId << endl << endl;
143 usage(progName);
144 return 0;
145 }
146
147 ndbout << "-----------------" << endl;
148 ndbout << "localNodeId: " << localNodeId << endl;
149 ndbout << "localHostName: " << localHostName << endl;
150 ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
151 << remoteHost1 << endl;
152 if(noOfConnections == 2){
153 ndbout << "remoteHost2 (node " << (localNodeId == 3?2:3) << "): "
154 << remoteHost2 << endl;
155 }
156 ndbout << "-----------------" << endl;
157
158 void * confTemplate = 0;
159 CreateTransporterFunc func = 0;
160
161 if(strcasecmp(type, "tcp") == 0){
162 func = createTCPTransporter;
163 confTemplate = &tcpTemplate;
164 } else if(strcasecmp(type, "sci") == 0){
165 func = createSCITransporter;
166 confTemplate = &sciTemplate;
167 } else if(strcasecmp(type, "shm") == 0){
168 func = createSHMTransporter;
169 confTemplate = &shmTemplate;
170 } else {
171 ndbout << "Unsupported transporter type" << endl;
172 return 0;
173 }
174
175 ndbout << "Creating transporter registry" << endl;
176 tReg = new TransporterRegistry;
177 tReg->init(localNodeId);
178
179 switch(localNodeId){
180 case 1:
181 (* func)(confTemplate, 1, 2, localHostName, remoteHost1);
182 if(noOfConnections == 2)
183 (* func)(confTemplate, 1, 3, localHostName, remoteHost2);
184 break;
185 case 2:
186 (* func)(confTemplate, 2, 1, localHostName, remoteHost1);
187 if(noOfConnections == 2)
188 (* func)(confTemplate, 2, 3, localHostName, remoteHost2);
189 break;
190 case 3:
191 (* func)(confTemplate, 3, 1, localHostName, remoteHost1);
192 if(noOfConnections == 2)
193 (* func)(confTemplate, 3, 2, localHostName, remoteHost2);
194 break;
195 }
196
197 ndbout << "Doing startSending/startReceiving" << endl;
198 tReg->startSending();
199 tReg->startReceiving();
200
201 ndbout << "Connecting" << endl;
202 tReg->setPerformState(PerformConnect);
203 tReg->checkConnections();
204
205 unsigned sum = 0;
206 do {
207 sum = 0;
208 for(int i = 0; i<4; i++)
209 sum += signalReceived[i];
210
211 tReg->checkConnections();
212
213 tReg->external_IO(500);
214 NdbSleep_MilliSleep(500);
215
216 ndbout << "In main loop" << endl;
217 } while(sum != 2*noOfConnections);
218
219 ndbout << "Doing setPerformState(Disconnect)" << endl;
220 tReg->setPerformState(PerformDisconnect);
221
222 ndbout << "Doing checkConnections()" << endl;
223 tReg->checkConnections();
224
225 ndbout << "Sleeping 3 secs" << endl;
226 NdbSleep_SecSleep(3);
227
228 ndbout << "Deleting transporter registry" << endl;
229 delete tReg; tReg = 0;
230
231 return 0;
232 }
233
234 void
checkData(SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])235 checkData(SignalHeader * const header, Uint8 prio, Uint32 * const theData,
236 LinearSectionPtr ptr[3]){
237 Uint32 expectedLength = 0;
238 if(prio == 0)
239 expectedLength = 17;
240 else
241 expectedLength = 19;
242
243 if(header->theLength != expectedLength){
244 ndbout << "Unexpected signal length: " << header->theLength
245 << " expected: " << expectedLength << endl;
246 abort();
247 }
248
249 if(header->theVerId_signalNumber != expectedLength + 1)
250 abort();
251
252 if(header->theReceiversBlockNumber != expectedLength + 2)
253 abort();
254
255 if(refToBlock(header->theSendersBlockRef) != expectedLength + 3)
256 abort();
257
258 if(header->theSendersSignalId != expectedLength + 5)
259 abort();
260
261 if(header->theTrace != expectedLength + 6)
262 abort();
263
264 if(header->m_noOfSections != (prio == 0 ? 0 : 1))
265 abort();
266
267 if(header->m_fragmentInfo != (prio + 1))
268 abort();
269
270 Uint32 dataWordStart = header->theLength ;
271 for(unsigned i = 0; i<header->theLength; i++){
272 if(theData[i] != i){ //dataWordStart){
273 ndbout << "data corrupt!\n" << endl;
274 abort();
275 }
276 dataWordStart ^= (~i*i);
277 }
278
279 if(prio != 0){
280 ndbout_c("Found section");
281 if(ptr[0].sz != header->theLength)
282 abort();
283
284 if(memcmp(ptr[0].p, theData, (ptr[0].sz * 4)) != 0)
285 abort();
286 }
287 }
288
289 void
sendSignalTo(NodeId nodeId,int prio)290 sendSignalTo(NodeId nodeId, int prio){
291 SignalHeader sh;
292 sh.theLength = (prio == 0 ? 17 : 19);
293 sh.theVerId_signalNumber = sh.theLength + 1;
294 sh.theReceiversBlockNumber = sh.theLength + 2;
295 sh.theSendersBlockRef = sh.theLength + 3;
296 sh.theSendersSignalId = sh.theLength + 4;
297 sh.theSignalId = sh.theLength + 5;
298 sh.theTrace = sh.theLength + 6;
299 sh.m_noOfSections = (prio == 0 ? 0 : 1);
300 sh.m_fragmentInfo = prio + 1;
301
302 Uint32 theData[25];
303
304 Uint32 dataWordStart = sh.theLength;
305 for(unsigned i = 0; i<sh.theLength; i++){
306 theData[i] = i;
307 dataWordStart ^= (~i*i);
308 }
309 ndbout << "Sending prio " << (int)prio << " signal to node: "
310 << nodeId
311 << " gsn = " << sh.theVerId_signalNumber << endl;
312
313 LinearSectionPtr ptr[3];
314 ptr[0].p = &theData[0];
315 ptr[0].sz = sh.theLength;
316
317 SendStatus s = tReg->prepareSend(&sh, prio, theData, nodeId, ptr);
318 if(s != SEND_OK){
319 ndbout << "Send was not ok. Send was: " << s << endl;
320 }
321 }
322
323 void
execute(void * callbackObj,SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])324 execute(void* callbackObj,
325 SignalHeader * const header, Uint8 prio, Uint32 * const theData,
326 LinearSectionPtr ptr[3]){
327 const NodeId nodeId = refToNode(header->theSendersBlockRef);
328
329 ndbout << "Recieved prio " << (int)prio << " signal from node: "
330 << nodeId
331 << " gsn = " << header->theVerId_signalNumber << endl;
332 checkData(header, prio, theData, ptr);
333 ndbout << " Data is ok!\n" << endl;
334
335 signalReceived[nodeId]++;
336
337 if(prio == 0)
338 sendSignalTo(nodeId, 1);
339 else
340 tReg->setPerformState(nodeId, PerformDisconnect);
341 }
342
343 void
copy(Uint32 * & insertPtr,class SectionSegmentPool & thePool,const SegmentedSectionPtr & _ptr)344 copy(Uint32 * & insertPtr,
345 class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
346 abort();
347 }
348
349 void
reportError(void * callbackObj,NodeId nodeId,TransporterError errorCode)350 reportError(void* callbackObj, NodeId nodeId, TransporterError errorCode){
351 char buf[255];
352 sprintf(buf, "reportError (%d, %x)", nodeId, errorCode);
353 ndbout << buf << endl;
354 if(errorCode & 0x8000){
355 tReg->setPerformState(nodeId, PerformDisconnect);
356 abort();
357 }
358 }
359
360 /**
361 * Report average send theLength in bytes (4096 last sends)
362 */
363 void
reportSendLen(void * callbackObj,NodeId nodeId,Uint32 count,Uint64 bytes)364 reportSendLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
365 char buf[255];
366 sprintf(buf, "reportSendLen(%d, %d)", nodeId, (Uint32)(bytes/count));
367 ndbout << buf << endl;
368 }
369
370 /**
371 * Report average receive theLength in bytes (4096 last receives)
372 */
373 void
reportReceiveLen(void * callbackObj,NodeId nodeId,Uint32 count,Uint64 bytes)374 reportReceiveLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
375 char buf[255];
376 sprintf(buf, "reportReceiveLen(%d, %d)", nodeId, (Uint32)(bytes/count));
377 ndbout << buf << endl;
378 }
379
380 /**
381 * Report connection established
382 */
383 void
reportConnect(void * callbackObj,NodeId nodeId)384 reportConnect(void* callbackObj, NodeId nodeId){
385 char buf[255];
386 sprintf(buf, "reportConnect(%d)", nodeId);
387 ndbout << buf << endl;
388 tReg->setPerformState(nodeId, PerformIO);
389
390 sendSignalTo(nodeId, 0);
391 }
392
393 /**
394 * Report connection broken
395 */
396 void
reportDisconnect(void * callbackObj,NodeId nodeId,Uint32 errNo)397 reportDisconnect(void* callbackObj, NodeId nodeId, Uint32 errNo){
398 char buf[255];
399 sprintf(buf, "reportDisconnect(%d)", nodeId);
400 ndbout << buf << endl;
401 if(signalReceived[nodeId] < 2)
402 tReg->setPerformState(nodeId, PerformConnect);
403 }
404
405 int
checkJobBuffer()406 checkJobBuffer() {
407 /**
408 * Check to see if jobbbuffers are starting to get full
409 * and if so call doJob
410 */
411 return 0;
412 }
413
414 void
createOSETransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName)415 createOSETransporter(void * _conf,
416 NodeId localNodeId,
417 NodeId remoteNodeId,
418 const char * localHostName,
419 const char * remoteHostName){
420 ndbout << "Creating OSE transporter from node "
421 << localNodeId << "(" << localHostName << ") to "
422 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
423
424 OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;
425
426 conf->localNodeId = localNodeId;
427 conf->localHostName = localHostName;
428 conf->remoteNodeId = remoteNodeId;
429 conf->remoteHostName = remoteHostName;
430 bool res = tReg->createTransporter(conf);
431 if(res)
432 ndbout << "... -- Success " << endl;
433 else
434 ndbout << "... -- Failure " << endl;
435 }
436
437 void
createTCPTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName)438 createTCPTransporter(void * _conf,
439 NodeId localNodeId,
440 NodeId remoteNodeId,
441 const char * localHostName,
442 const char * remoteHostName){
443 ndbout << "Creating TCP transporter from node "
444 << localNodeId << "(" << localHostName << ") to "
445 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
446
447 TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
448
449 int port;
450 if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
451 if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
452 if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
453 if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
454 if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
455 if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
456
457 conf->localNodeId = localNodeId;
458 conf->localHostName = localHostName;
459 conf->remoteNodeId = remoteNodeId;
460 conf->remoteHostName = remoteHostName;
461 conf->port = port;
462 bool res = tReg->createTransporter(conf);
463 if(res)
464 ndbout << "... -- Success " << endl;
465 else
466 ndbout << "... -- Failure " << endl;
467 }
468
469 void
createSCITransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName)470 createSCITransporter(void * _conf,
471 NodeId localNodeId,
472 NodeId remoteNodeId,
473 const char * localHostName,
474 const char * remoteHostName){
475
476
477 ndbout << "Creating SCI transporter from node "
478 << localNodeId << "(" << localHostName << ") to "
479 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
480
481
482 SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
483
484 conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
485 conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
486
487
488 conf->localNodeId = localNodeId;
489 conf->remoteNodeId = remoteNodeId;
490
491 bool res = tReg->createTransporter(conf);
492 if(res)
493 ndbout << "... -- Success " << endl;
494 else
495 ndbout << "... -- Failure " << endl;
496 }
497
498 void
createSHMTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName)499 createSHMTransporter(void * _conf,
500 NodeId localNodeId,
501 NodeId remoteNodeId,
502 const char * localHostName,
503 const char * remoteHostName){
504
505
506 ndbout << "Creating SHM transporter from node "
507 << localNodeId << "(" << localHostName << ") to "
508 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
509
510
511 SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
512
513 conf->localNodeId = localNodeId;
514 conf->remoteNodeId = remoteNodeId;
515
516 bool res = tReg->createTransporter(conf);
517 if(res)
518 ndbout << "... -- Success " << endl;
519 else
520 ndbout << "... -- Failure " << endl;
521 }
522