1 /*
2 Copyright (C) 2003-2006 MySQL AB
3 All rights reserved. Use is subject to license terms.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #include <ndb_global.h>
27
28 #include "TransporterRegistry.hpp"
29 #include "TransporterDefinitions.hpp"
30 #include "TransporterCallback.hpp"
31 #include <RefConvert.hpp>
32
33 #include "prioTransporterTest.hpp"
34
35 #include <NdbTick.h>
36 #include <NdbMain.h>
37 #include <NdbOut.hpp>
38 #include <NdbSleep.h>
39
40 int basePortTCP = 17000;
41
42 SCI_TransporterConfiguration sciTemplate = {
43 2000,
44 // Packet size
45 2000000, // Buffer size
46 2, // number of adapters
47 1, // remote node id SCI
48 2, // Remote node Id SCI
49 0, // local ndb node id (server)
50 0, // remote ndb node id (client)
51 0, // byteOrder;
52 false, // compression;
53 true, // checksum;
54 true // signalId;
55 };
56
57
58 SHM_TransporterConfiguration shmTemplate = {
59 100000, // shmSize
60 0, // shmKey
61 1, // local ndb node id (server)
62 2, // remote ndb node id (client)
63 0, // byteOrder;
64 false, // compression;
65 true, // checksum;
66 true // signalId;
67 };
68
69 TCP_TransporterConfiguration tcpTemplate = {
70 17000, // port;
71 "", // remoteHostName;
72 "", // localhostname
73 2, // remoteNodeId;
74 1, // localNodeId;
75 2000000, // sendBufferSize - Size of SendBuffer of priority B
76 2000, // maxReceiveSize - Maximum no of bytes to receive
77 0, // byteOrder;
78 false, // compression;
79 true, // checksum;
80 true // signalId;
81 };
82
83 TransporterRegistry *tReg = 0;
84
85 #include <signal.h>
86
87 extern "C"
88 void
signalHandler(int signo)89 signalHandler(int signo){
90 ::signal(13, signalHandler);
91 char buf[255];
92 sprintf(buf,"Signal: %d\n", signo);
93 ndbout << buf << endl;
94 }
95
96 void
usage(const char * progName)97 usage(const char * progName){
98 ndbout << "Usage: " << progName << " localNodeId localHostName"
99 << " remoteHostName"
100 << " [<loop count>] [<send buf size>] [<recv buf size>]" << endl;
101 ndbout << " localNodeId - {1,2}" << endl;
102 }
103
104 typedef void (* CreateTransporterFunc)(void * conf,
105 NodeId localNodeId,
106 NodeId remoteNodeId,
107 const char * localHostName,
108 const char * remoteHostName,
109 int sendBuf,
110 int recvBuf);
111
112 void
createSCITransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendbuf,int recvbuf)113 createSCITransporter(void * _conf,
114 NodeId localNodeId,
115 NodeId remoteNodeId,
116 const char * localHostName,
117 const char * remoteHostName,
118 int sendbuf,
119 int recvbuf) {
120
121
122 ndbout << "Creating SCI transporter from node "
123 << localNodeId << "(" << localHostName << ") to "
124 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
125
126
127 SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
128
129 conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
130 conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
131
132
133 conf->localNodeId = localNodeId;
134 conf->remoteNodeId = remoteNodeId;
135
136 bool res = tReg->createTransporter(conf);
137 if(res)
138 ndbout << "... -- Success " << endl;
139 else
140 ndbout << "... -- Failure " << endl;
141 }
142
143 void
createSHMTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendbuf,int recvbuf)144 createSHMTransporter(void * _conf,
145 NodeId localNodeId,
146 NodeId remoteNodeId,
147 const char * localHostName,
148 const char * remoteHostName,
149 int sendbuf,
150 int recvbuf) {
151
152
153 ndbout << "Creating SHM transporter from node "
154 << localNodeId << "(" << localHostName << ") to "
155 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
156
157
158 SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
159
160
161 conf->localNodeId = localNodeId;
162 conf->remoteNodeId = remoteNodeId;
163
164 bool res = tReg->createTransporter(conf);
165 if(res)
166 ndbout << "... -- Success " << endl;
167 else
168 ndbout << "... -- Failure " << endl;
169 }
170
171
172 void
createTCPTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendBuf,int recvBuf)173 createTCPTransporter(void * _conf,
174 NodeId localNodeId,
175 NodeId remoteNodeId,
176 const char * localHostName,
177 const char * remoteHostName,
178 int sendBuf,
179 int recvBuf){
180 ndbout << "Creating TCP transporter from node "
181 << localNodeId << "(" << localHostName << ") to "
182 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
183
184 TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
185
186 int port;
187 if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
188 if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
189 if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
190 if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
191 if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
192 if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
193
194 if(sendBuf != -1){
195 conf->sendBufferSize = sendBuf;
196 }
197 if(recvBuf != -1){
198 conf->maxReceiveSize = recvBuf;
199 }
200
201 ndbout << "\tSendBufferSize: " << conf->sendBufferSize << endl;
202 ndbout << "\tReceiveBufferSize: " << conf->maxReceiveSize << endl;
203
204 conf->localNodeId = localNodeId;
205 conf->localHostName = localHostName;
206 conf->remoteNodeId = remoteNodeId;
207 conf->remoteHostName = remoteHostName;
208 conf->port = port;
209 bool res = tReg->createTransporter(conf);
210 if(res)
211 ndbout << "... -- Success " << endl;
212 else
213 ndbout << "... -- Failure " << endl;
214 }
215
216 struct TestPhase {
217 int signalSize;
218 int noOfSignals;
219 int noOfSignalSent;
220 int noOfSignalReceived;
221 NDB_TICKS startTime;
222 NDB_TICKS stopTime;
223
224 NDB_TICKS startTimePrioA;
225 NDB_TICKS stopTimePrioA;
226 NDB_TICKS totTimePrioA;
227 int bytesSentBeforePrioA;
228 NDB_TICKS accTime;
229 int loopCount;
230 Uint64 sendLenBytes, sendCount;
231 Uint64 recvLenBytes, recvCount;
232 };
233
234 TestPhase testSpec[] = {
235 { 1, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
236 ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
237 ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
238 ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
239
240 ,{ 8, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
241 ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
242 ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
243 ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
244
245 ,{ 16, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
246 ,{ 16, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
247 ,{ 16, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
248 ,{ 16, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
249
250 ,{ 24, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
251 ,{ 24, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
252 ,{ 24, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
253 ,{ 24, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
254
255 ,{ 0, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of random size
256 ,{ 0, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of random size
257 ,{ 0, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of random size
258 ,{ 0, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of random size
259 };
260
261 const int noOfTests = sizeof(testSpec)/sizeof(TestPhase);
262
263 SendStatus
sendSignalTo(NodeId nodeId,int signalSize,int prio)264 sendSignalTo(NodeId nodeId, int signalSize, int prio){
265 if(signalSize == 0)
266 signalSize = (rand() % 25) + 1;
267
268 SignalHeader sh;
269 sh.theLength = signalSize;
270 sh.theVerId_signalNumber = rand();
271 sh.theReceiversBlockNumber = rand();
272 sh.theSendersBlockRef = rand();
273 sh.theSendersSignalId = rand();
274 sh.theSignalId = rand();
275 sh.theTrace = rand();
276
277 Uint32 theData[25];
278 for(int i = 0; i<signalSize; i++)
279 theData[i] = (i+1) * (Uint32)(&theData[i]);
280
281 return tReg->prepareSend(&sh, prio, theData, nodeId);
282 }
283
284 void
reportHeader()285 reportHeader(){
286 ndbout << "#Sigs\tSz\tPayload\tTime\tSig/sec\tBps\t"
287 << "s len\tr len\tprioAtime\tbytesb4pA" << endl;
288 }
289
290 void
printReport(TestPhase & p)291 printReport(TestPhase & p){
292 if(p.accTime > 0) {
293 Uint32 secs = (p.accTime/p.loopCount)/1000;
294 Uint32 mill = (p.accTime/p.loopCount)%1000;
295 char st[255];
296 if(secs > 0){
297 sprintf(st, "%d.%.2ds", secs, (mill/10));
298 } else {
299 sprintf(st, "%dms", mill);
300 }
301
302 Uint32 sps = (1000*p.noOfSignals*p.loopCount)/p.accTime;
303 Uint32 bps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(p.signalSize+3));
304 if(p.signalSize == 0)
305 ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(13+3));
306
307 char ssps[255];
308 if(sps > 1000000){
309 sps /= 1000000;
310 sprintf(ssps, "%dM", (int)sps);
311 } else if(sps > 1000){
312 sps /= 1000;
313 sprintf(ssps, "%dk", (int)sps);
314 } else {
315 sprintf(ssps, "%d", (int)sps);
316 }
317
318 char sbps[255];
319 if(bps > 1000000){
320 bps /= 1000000;
321 sprintf(sbps, "%dM", bps);
322 } else if(bps>1000){
323 bps /= 1000;
324 sprintf(sbps, "%dk", bps);
325 } else {
326 sprintf(sbps, "%d", bps);
327 }
328
329 char buf[255];
330 if(p.signalSize != 0){
331 BaseString::snprintf(buf, 255,
332 "%d\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d",
333 p.noOfSignals,
334 p.signalSize,
335 (4*p.signalSize),
336 st,
337 ssps,
338 sbps,
339 (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
340 (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
341 (int)(p.totTimePrioA / p.loopCount),
342 (int)(p.bytesSentBeforePrioA));
343 } else {
344 BaseString::snprintf(buf, 255,
345 "%d\trand\t4*rand\t%s\t%s\t%s\t%d\t%d\t%d\t%d",
346 p.noOfSignals,
347 st,
348 ssps,
349 sbps,
350 (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
351 (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
352 (int)(p.totTimePrioA / p.loopCount),
353 (int)(p.bytesSentBeforePrioA));
354
355 }
356 ndbout << buf << endl;
357 }
358 }
359
360 int loopCount = 1;
361 int sendBufSz = -1;
362 int recvBufSz = -1;
363
364 NDB_TICKS startSec=0;
365 NDB_TICKS stopSec=0;
366 Uint32 startMicro=0;
367 Uint32 stopMicro=0;
368 int timerStarted;
369 int timerStopped;
370
371 bool isClient = false;
372 bool isConnected = false;
373 bool isStarted = false;
374 int currentPhase = 0;
375 TestPhase allPhases[noOfTests];
376 Uint32 signalToEcho;
377 NDB_TICKS startTime, stopTime;
378
379 void
client(NodeId remoteNodeId)380 client(NodeId remoteNodeId){
381 isClient = true;
382
383 currentPhase = 0;
384 memcpy(allPhases, testSpec, sizeof(testSpec));
385
386 int counter = 0;
387
388 while(true){
389 TestPhase * current = &allPhases[currentPhase];
390 if(current->noOfSignals == current->noOfSignalSent &&
391 current->noOfSignals == current->noOfSignalReceived){
392
393 /**
394 * Test phase done
395 */
396 current->stopTime = NdbTick_CurrentMillisecond();
397 current->accTime += (current->stopTime - current->startTime);
398
399 NdbSleep_MilliSleep(500 / loopCount);
400
401 current->startTime = NdbTick_CurrentMillisecond();
402
403 current->noOfSignalSent = 0;
404 current->noOfSignalReceived = 0;
405
406 current->loopCount ++;
407 if(current->loopCount == loopCount){
408
409 printReport(allPhases[currentPhase]);
410
411 currentPhase ++;
412 if(currentPhase == noOfTests){
413 /**
414 * Now we are done
415 */
416 break;
417 }
418 NdbSleep_MilliSleep(500);
419 current = &allPhases[currentPhase];
420 current->startTime = NdbTick_CurrentMillisecond();
421 }
422 }
423 int signalsLeft = current->noOfSignals - current->noOfSignalSent;
424 if(signalsLeft > 0){
425 for(; signalsLeft > 1; signalsLeft--){
426 if(sendSignalTo(remoteNodeId, current->signalSize, 1) == SEND_OK) {
427 current->noOfSignalSent++;
428 // ndbout << "sent prio b" << endl;
429 current->bytesSentBeforePrioA += (current->signalSize << 2);
430 }
431 else {
432 tReg->external_IO(10);
433 break;
434 }
435 }
436 //prio A
437 if(signalsLeft==1) {
438 NDB_TICKS sec = 0;
439 Uint32 micro=0;
440 int ret = NdbTick_CurrentMicrosecond(&sec,µ);
441 if(ret==0)
442 current->startTimePrioA = micro + sec*1000000;
443 if(sendSignalTo(remoteNodeId, current->signalSize, 0) == SEND_OK) {
444 current->noOfSignalSent++;
445 signalsLeft--;
446 }
447 else {
448 tReg->external_IO(10);
449 break;
450 }
451 }
452 }
453
454 if(counter % 10 == 0)
455 tReg->checkConnections();
456 tReg->external_IO(0);
457 counter++;
458 }
459 }
460
461 void
server()462 server(){
463 isClient = false;
464
465 signalToEcho = 0;
466 for(int i = 0; i<noOfTests; i++)
467 signalToEcho += testSpec[i].noOfSignals;
468
469 signalToEcho *= loopCount;
470
471 while(signalToEcho > 0){
472 tReg->checkConnections();
473 for(int i = 0; i<10; i++)
474 tReg->external_IO(10);
475 }
476 }
477
478 int
prioTransporterTest(TestType tt,const char * progName,int argc,const char ** argv)479 prioTransporterTest(TestType tt, const char * progName,
480 int argc, const char **argv){
481
482 loopCount = 100;
483 sendBufSz = -1;
484 recvBufSz = -1;
485
486 isClient = false;
487 isConnected = false;
488 isStarted = false;
489 currentPhase = 0;
490
491 signalHandler(0);
492
493 if(argc < 4){
494 usage(progName);
495 return 0;
496 }
497
498 const NodeId localNodeId = atoi(argv[1]);
499 const char * localHostName = argv[2];
500 const char * remoteHost1 = argv[3];
501
502 if(argc >= 5)
503 loopCount = atoi(argv[4]);
504 if(argc >= 6)
505 sendBufSz = atoi(argv[5]);
506 if(argc >= 7)
507 recvBufSz = atoi(argv[6]);
508
509 if(localNodeId < 1 || localNodeId > 2){
510 ndbout << "localNodeId = " << localNodeId << endl << endl;
511 usage(progName);
512 return 0;
513 }
514
515 if(localNodeId == 1)
516 ndbout << "-- ECHO CLIENT --" << endl;
517 else
518 ndbout << "-- ECHO SERVER --" << endl;
519
520 ndbout << "localNodeId: " << localNodeId << endl;
521 ndbout << "localHostName: " << localHostName << endl;
522 ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
523 << remoteHost1 << endl;
524 ndbout << "Loop count: " << loopCount << endl;
525 ndbout << "-----------------" << endl;
526
527 void * confTemplate = 0;
528 CreateTransporterFunc func = 0;
529 switch(tt){
530 case TestTCP:
531 func = createTCPTransporter;
532 confTemplate = &tcpTemplate;
533 break;
534 case TestSCI:
535 func = createSCITransporter;
536 confTemplate = &sciTemplate;
537 break;
538 case TestSHM:
539 func = createSHMTransporter;
540 confTemplate = &shmTemplate;
541 break;
542 default:
543 ndbout << "Unsupported transporter type" << endl;
544 return 0;
545 }
546
547 ndbout << "Creating transporter registry" << endl;
548 tReg = new TransporterRegistry;
549 tReg->init(localNodeId);
550
551 switch(localNodeId){
552 case 1:
553 (* func)(confTemplate, 1, 2, localHostName, remoteHost1,
554 sendBufSz, recvBufSz);
555 break;
556 case 2:
557 (* func)(confTemplate, 2, 1, localHostName, remoteHost1,
558 sendBufSz, recvBufSz);
559 break;
560 }
561
562 ndbout << "Doing startSending/startReceiving" << endl;
563 tReg->startSending();
564 tReg->startReceiving();
565
566 ndbout << "Connecting" << endl;
567 tReg->setPerformState(PerformConnect);
568 tReg->checkConnections();
569
570 if(localNodeId == 1)
571 client(2);
572 else
573 server();
574
575 isStarted = false;
576
577 ndbout << "Sleep 3 secs" << endl;
578 NdbSleep_SecSleep(3);
579
580 ndbout << "Doing setPerformState(Disconnect)" << endl;
581 tReg->setPerformState(PerformDisconnect);
582
583 ndbout << "Doing checkConnections()" << endl;
584 tReg->checkConnections();
585
586 ndbout << "Deleting transporter registry" << endl;
587 delete tReg; tReg = 0;
588
589 return 0;
590 }
591
operator <<(NdbOut & out,SignalHeader & sh)592 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
593 out << "-- Signal Header --" << endl;
594 out << "theLength: " << sh.theLength << endl;
595 out << "gsn: " << sh.theVerId_signalNumber << endl;
596 out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
597 out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
598 out << "sendersSig: " << sh.theSendersSignalId << endl;
599 out << "theSignalId: " << sh.theSignalId << endl;
600 out << "trace: " << (int)sh.theTrace << endl;
601 return out;
602 }
603
604 void
execute(SignalHeader * const header,Uint8 prio,Uint32 * const theData)605 execute(SignalHeader * const header, Uint8 prio, Uint32 * const theData){
606 const NodeId nodeId = refToNode(header->theSendersBlockRef);
607 NDB_TICKS sec = 0;
608 Uint32 micro=0;
609 int ret = NdbTick_CurrentMicrosecond(&sec,µ);
610 if(prio == 0 && isClient && ret == 0) {
611 allPhases[currentPhase].stopTimePrioA = micro + sec*1000000;
612 allPhases[currentPhase].totTimePrioA +=
613 allPhases[currentPhase].stopTimePrioA -
614 allPhases[currentPhase].startTimePrioA;
615 }
616 if(ret!=0)
617 allPhases[currentPhase].totTimePrioA = -1;
618
619 if(isClient){
620 allPhases[currentPhase].noOfSignalReceived++;
621 } else {
622 int sleepTime = 10;
623 while(tReg->prepareSend(header, prio, theData, nodeId) != SEND_OK){
624 ndbout << "Failed to echo" << sleepTime << endl;
625 NdbSleep_MilliSleep(sleepTime);
626 // sleepTime += 10;
627 }
628
629 signalToEcho--;
630 }
631 }
632
633 void
reportError(NodeId nodeId,TransporterError errorCode)634 reportError(NodeId nodeId, TransporterError errorCode){
635 char buf[255];
636 sprintf(buf, "reportError (%d, %x) in perfTest", nodeId, errorCode);
637 ndbout << buf << endl;
638 if(errorCode & 0x8000){
639 tReg->setPerformState(nodeId, PerformDisconnect);
640 }
641 }
642
643 /**
644 * Report average send theLength in bytes (4096 last sends)
645 */
646 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)647 reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes){
648 allPhases[currentPhase].sendCount += count;
649 allPhases[currentPhase].sendLenBytes += bytes;
650
651 if(!isClient){
652 ndbout << "reportSendLen(" << nodeId << ", "
653 << (bytes/count) << ")" << endl;
654 }
655 }
656
657 /**
658 * Report average receive theLength in bytes (4096 last receives)
659 */
660 void
reportReceiveLen(NodeId nodeId,Uint32 count,Uint64 bytes)661 reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes){
662 allPhases[currentPhase].recvCount += count;
663 allPhases[currentPhase].recvLenBytes += bytes;
664
665 if(!isClient){
666 ndbout << "reportReceiveLen(" << nodeId << ", "
667 << (bytes/count) << ")" << endl;
668 }
669 }
670
671 /**
672 * Report connection established
673 */
674 void
reportConnect(NodeId nodeId)675 reportConnect(NodeId nodeId){
676 char buf[255];
677 sprintf(buf, "reportConnect(%d)", nodeId);
678 ndbout << buf << endl;
679 tReg->setPerformState(nodeId, PerformIO);
680
681 if(!isStarted){
682 isStarted = true;
683 startTime = NdbTick_CurrentMillisecond();
684 if(isClient){
685 reportHeader();
686 allPhases[0].startTime = startTime;
687 }
688 }
689 else{
690 // Resend signals that were lost when connection failed
691 TestPhase * current = &allPhases[currentPhase];
692 current->noOfSignalSent = current->noOfSignalReceived;
693 }
694 }
695
696 /**
697 * Report connection broken
698 */
699 void
reportDisconnect(NodeId nodeId,Uint32 errNo)700 reportDisconnect(NodeId nodeId, Uint32 errNo){
701 char buf[255];
702 sprintf(buf, "reportDisconnect(%d)", nodeId);
703 ndbout << buf << endl;
704
705 if(isStarted)
706 tReg->setPerformState(nodeId, PerformConnect);
707 }
708
709
710 int
checkJobBuffer()711 checkJobBuffer() {
712 /**
713 * Check to see if jobbbuffers are starting to get full
714 * and if so call doJob
715 */
716 return 0;
717 }
718