1 /*
2    Copyright (C) 2003-2006 MySQL AB
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 #include <ndb_global.h>
27 
28 #include "TransporterRegistry.hpp"
29 #include "TransporterDefinitions.hpp"
30 #include "TransporterCallback.hpp"
31 #include <RefConvert.hpp>
32 
33 #include "prioTransporterTest.hpp"
34 
35 #include <NdbTick.h>
36 #include <NdbMain.h>
37 #include <NdbOut.hpp>
38 #include <NdbSleep.h>
39 
40 int basePortTCP = 17000;
41 
42 SCI_TransporterConfiguration sciTemplate = {
43   2000,
44        // Packet size
45   2000000,      // Buffer size
46   2,           // number of adapters
47   1,           // remote node id SCI
48   2,           // Remote node Id SCI
49   0,           // local ndb node id (server)
50   0,           // remote ndb node id (client)
51   0,              // byteOrder;
52   false,          // compression;
53   true,          // checksum;
54   true            // signalId;
55 };
56 
57 
58 SHM_TransporterConfiguration shmTemplate = {
59   100000, // shmSize
60   0,      // shmKey
61   1,           // local ndb node id (server)
62   2,           // remote ndb node id (client)
63   0,              // byteOrder;
64   false,          // compression;
65   true,           // checksum;
66   true            // signalId;
67 };
68 
69 TCP_TransporterConfiguration tcpTemplate = {
70   17000,          // port;
71   "",             // remoteHostName;
72   "",             // localhostname
73   2,              // remoteNodeId;
74   1,              // localNodeId;
75   2000000,          // sendBufferSize - Size of SendBuffer of priority B
76   2000,           // maxReceiveSize - Maximum no of bytes to receive
77   0,              // byteOrder;
78   false,          // compression;
79   true,           // checksum;
80   true            // signalId;
81 };
82 
83 TransporterRegistry *tReg = 0;
84 
85 #include <signal.h>
86 
87 extern "C"
88 void
signalHandler(int signo)89 signalHandler(int signo){
90   ::signal(13, signalHandler);
91   char buf[255];
92   sprintf(buf,"Signal: %d\n", signo);
93   ndbout << buf << endl;
94 }
95 
96 void
usage(const char * progName)97 usage(const char * progName){
98   ndbout << "Usage: " << progName << " localNodeId localHostName"
99 	 << " remoteHostName"
100 	 << " [<loop count>] [<send buf size>] [<recv buf size>]" << endl;
101   ndbout << "  localNodeId - {1,2}" << endl;
102 }
103 
104 typedef void (* CreateTransporterFunc)(void * conf,
105 				       NodeId localNodeId,
106 				       NodeId remoteNodeId,
107 				       const char * localHostName,
108 				       const char * remoteHostName,
109 				       int sendBuf,
110 				       int recvBuf);
111 
112 void
createSCITransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendbuf,int recvbuf)113 createSCITransporter(void * _conf,
114 		     NodeId localNodeId,
115 		     NodeId remoteNodeId,
116 		     const char * localHostName,
117 		     const char * remoteHostName,
118 		     int sendbuf,
119 		     int recvbuf) {
120 
121 
122   ndbout << "Creating SCI transporter from node "
123 	 << localNodeId << "(" << localHostName << ") to "
124 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
125 
126 
127   SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
128 
129   conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
130   conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
131 
132 
133   conf->localNodeId    = localNodeId;
134   conf->remoteNodeId   = remoteNodeId;
135 
136   bool res = tReg->createTransporter(conf);
137   if(res)
138     ndbout << "... -- Success " << endl;
139   else
140     ndbout << "... -- Failure " << endl;
141 }
142 
143 void
createSHMTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendbuf,int recvbuf)144 createSHMTransporter(void * _conf,
145 		     NodeId localNodeId,
146 		     NodeId remoteNodeId,
147 		     const char * localHostName,
148 		     const char * remoteHostName,
149 		     int sendbuf,
150 		     int recvbuf) {
151 
152 
153   ndbout << "Creating SHM transporter from node "
154 	 << localNodeId << "(" << localHostName << ") to "
155 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
156 
157 
158   SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
159 
160 
161   conf->localNodeId    = localNodeId;
162   conf->remoteNodeId   = remoteNodeId;
163 
164   bool res = tReg->createTransporter(conf);
165   if(res)
166     ndbout << "... -- Success " << endl;
167   else
168     ndbout << "... -- Failure " << endl;
169 }
170 
171 
172 void
createTCPTransporter(void * _conf,NodeId localNodeId,NodeId remoteNodeId,const char * localHostName,const char * remoteHostName,int sendBuf,int recvBuf)173 createTCPTransporter(void * _conf,
174 		     NodeId localNodeId,
175 		     NodeId remoteNodeId,
176 		     const char * localHostName,
177 		     const char * remoteHostName,
178 		     int sendBuf,
179 		     int recvBuf){
180   ndbout << "Creating TCP transporter from node "
181 	 << localNodeId << "(" << localHostName << ") to "
182 	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
183 
184   TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
185 
186   int port;
187   if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
188   if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
189   if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
190   if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
191   if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
192   if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
193 
194   if(sendBuf != -1){
195     conf->sendBufferSize = sendBuf;
196   }
197   if(recvBuf != -1){
198     conf->maxReceiveSize = recvBuf;
199   }
200 
201   ndbout << "\tSendBufferSize:    " << conf->sendBufferSize << endl;
202   ndbout << "\tReceiveBufferSize: " << conf->maxReceiveSize << endl;
203 
204   conf->localNodeId    = localNodeId;
205   conf->localHostName  = localHostName;
206   conf->remoteNodeId   = remoteNodeId;
207   conf->remoteHostName = remoteHostName;
208   conf->port           = port;
209   bool res = tReg->createTransporter(conf);
210   if(res)
211     ndbout << "... -- Success " << endl;
212   else
213     ndbout << "... -- Failure " << endl;
214 }
215 
216 struct TestPhase {
217   int signalSize;
218   int noOfSignals;
219   int noOfSignalSent;
220   int noOfSignalReceived;
221   NDB_TICKS startTime;
222   NDB_TICKS stopTime;
223 
224   NDB_TICKS startTimePrioA;
225   NDB_TICKS stopTimePrioA;
226   NDB_TICKS totTimePrioA;
227   int bytesSentBeforePrioA;
228   NDB_TICKS accTime;
229   int loopCount;
230   Uint64 sendLenBytes, sendCount;
231   Uint64 recvLenBytes, recvCount;
232 };
233 
234 TestPhase testSpec[] = {
235    {  1,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
236   ,{  1,   10000, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
237   ,{  1,  10000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
238   ,{  1, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
239 
240   ,{  8,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
241   ,{  8,   10000, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
242   ,{  8,  10000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
243   ,{  8, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
244 
245   ,{ 16,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
246   ,{ 16,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
247   ,{ 16,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
248   ,{ 16, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
249 
250   ,{ 24,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
251   ,{ 24,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
252   ,{ 24,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
253   ,{ 24, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
254 
255   ,{  0,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of random size
256   ,{  0,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of random size
257   ,{  0,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of random size
258   ,{  0, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of random size
259 };
260 
261 const int noOfTests = sizeof(testSpec)/sizeof(TestPhase);
262 
263 SendStatus
sendSignalTo(NodeId nodeId,int signalSize,int prio)264 sendSignalTo(NodeId nodeId, int signalSize, int prio){
265   if(signalSize == 0)
266     signalSize = (rand() % 25) + 1;
267 
268   SignalHeader sh;
269   sh.theLength               = signalSize;
270   sh.theVerId_signalNumber   = rand();
271   sh.theReceiversBlockNumber = rand();
272   sh.theSendersBlockRef      = rand();
273   sh.theSendersSignalId      = rand();
274   sh.theSignalId             = rand();
275   sh.theTrace                = rand();
276 
277   Uint32 theData[25];
278   for(int i = 0; i<signalSize; i++)
279     theData[i] = (i+1) * (Uint32)(&theData[i]);
280 
281   return tReg->prepareSend(&sh, prio, theData, nodeId);
282 }
283 
284 void
reportHeader()285 reportHeader(){
286   ndbout << "#Sigs\tSz\tPayload\tTime\tSig/sec\tBps\t"
287 	 << "s len\tr len\tprioAtime\tbytesb4pA" << endl;
288 }
289 
290 void
printReport(TestPhase & p)291 printReport(TestPhase & p){
292   if(p.accTime > 0) {
293     Uint32 secs = (p.accTime/p.loopCount)/1000;
294     Uint32 mill = (p.accTime/p.loopCount)%1000;
295     char st[255];
296     if(secs > 0){
297       sprintf(st, "%d.%.2ds", secs, (mill/10));
298     } else {
299       sprintf(st, "%dms", mill);
300     }
301 
302     Uint32 sps = (1000*p.noOfSignals*p.loopCount)/p.accTime;
303     Uint32 bps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(p.signalSize+3));
304     if(p.signalSize == 0)
305       ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(13+3));
306 
307     char ssps[255];
308     if(sps > 1000000){
309       sps /= 1000000;
310       sprintf(ssps, "%dM", (int)sps);
311     } else if(sps > 1000){
312     sps /= 1000;
313     sprintf(ssps, "%dk", (int)sps);
314     } else {
315       sprintf(ssps, "%d", (int)sps);
316     }
317 
318     char sbps[255];
319     if(bps > 1000000){
320       bps /= 1000000;
321       sprintf(sbps, "%dM", bps);
322     } else if(bps>1000){
323       bps /= 1000;
324       sprintf(sbps, "%dk", bps);
325     } else {
326       sprintf(sbps, "%d", bps);
327   }
328 
329     char buf[255];
330     if(p.signalSize != 0){
331       BaseString::snprintf(buf, 255,
332 	       "%d\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%d\t%d",
333 	       p.noOfSignals,
334 	       p.signalSize,
335 	       (4*p.signalSize),
336 	       st,
337 	       ssps,
338 	       sbps,
339 	       (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
340 	       (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
341 	       (int)(p.totTimePrioA / p.loopCount),
342 	       (int)(p.bytesSentBeforePrioA));
343     } else {
344       BaseString::snprintf(buf, 255,
345 	       "%d\trand\t4*rand\t%s\t%s\t%s\t%d\t%d\t%d\t%d",
346 	       p.noOfSignals,
347 	       st,
348 	       ssps,
349 	       sbps,
350 	       (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
351 	       (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
352 	       (int)(p.totTimePrioA / p.loopCount),
353 	       (int)(p.bytesSentBeforePrioA));
354 
355     }
356     ndbout << buf << endl;
357   }
358 }
359 
360 int loopCount = 1;
361 int sendBufSz = -1;
362 int recvBufSz = -1;
363 
364 NDB_TICKS startSec=0;
365 NDB_TICKS stopSec=0;
366 Uint32 startMicro=0;
367 Uint32 stopMicro=0;
368 int timerStarted;
369 int timerStopped;
370 
371 bool      isClient     = false;
372 bool      isConnected  = false;
373 bool      isStarted    = false;
374 int       currentPhase = 0;
375 TestPhase allPhases[noOfTests];
376 Uint32    signalToEcho;
377 NDB_TICKS startTime, stopTime;
378 
379 void
client(NodeId remoteNodeId)380 client(NodeId remoteNodeId){
381   isClient = true;
382 
383   currentPhase = 0;
384   memcpy(allPhases, testSpec, sizeof(testSpec));
385 
386   int counter = 0;
387 
388   while(true){
389     TestPhase * current = &allPhases[currentPhase];
390     if(current->noOfSignals == current->noOfSignalSent &&
391        current->noOfSignals == current->noOfSignalReceived){
392 
393       /**
394        * Test phase done
395        */
396       current->stopTime  = NdbTick_CurrentMillisecond();
397       current->accTime  += (current->stopTime - current->startTime);
398 
399       NdbSleep_MilliSleep(500 / loopCount);
400 
401       current->startTime = NdbTick_CurrentMillisecond();
402 
403       current->noOfSignalSent     = 0;
404       current->noOfSignalReceived = 0;
405 
406       current->loopCount ++;
407       if(current->loopCount == loopCount){
408 
409 	printReport(allPhases[currentPhase]);
410 
411 	currentPhase ++;
412 	if(currentPhase == noOfTests){
413 	  /**
414 	   * Now we are done
415 	   */
416 	  break;
417 	}
418 	NdbSleep_MilliSleep(500);
419 	current = &allPhases[currentPhase];
420 	current->startTime = NdbTick_CurrentMillisecond();
421       }
422     }
423     int signalsLeft = current->noOfSignals - current->noOfSignalSent;
424     if(signalsLeft > 0){
425       for(; signalsLeft > 1; signalsLeft--){
426 	if(sendSignalTo(remoteNodeId, current->signalSize, 1) == SEND_OK) {
427 	  current->noOfSignalSent++;
428 	  //	  ndbout << "sent prio b" << endl;
429 	  current->bytesSentBeforePrioA += (current->signalSize << 2);
430 	}
431 	else {
432 	  tReg->external_IO(10);
433 	  break;
434 	}
435       }
436       //prio A
437       if(signalsLeft==1) {
438 	NDB_TICKS sec = 0;
439 	Uint32 micro=0;
440 	int ret = NdbTick_CurrentMicrosecond(&sec,&micro);
441 	if(ret==0)
442 	  current->startTimePrioA  = micro + sec*1000000;
443 	if(sendSignalTo(remoteNodeId, current->signalSize, 0) == SEND_OK) {
444 	  current->noOfSignalSent++;
445 	  signalsLeft--;
446 	}
447 	else {
448 	  tReg->external_IO(10);
449 	  break;
450 	}
451       }
452     }
453 
454     if(counter % 10 == 0)
455       tReg->checkConnections();
456     tReg->external_IO(0);
457     counter++;
458   }
459 }
460 
461 void
server()462 server(){
463   isClient = false;
464 
465   signalToEcho = 0;
466   for(int i = 0; i<noOfTests; i++)
467     signalToEcho += testSpec[i].noOfSignals;
468 
469   signalToEcho *= loopCount;
470 
471   while(signalToEcho > 0){
472     tReg->checkConnections();
473     for(int i = 0; i<10; i++)
474       tReg->external_IO(10);
475   }
476 }
477 
478 int
prioTransporterTest(TestType tt,const char * progName,int argc,const char ** argv)479 prioTransporterTest(TestType tt, const char * progName,
480 		    int argc, const char **argv){
481 
482   loopCount = 100;
483   sendBufSz = -1;
484   recvBufSz = -1;
485 
486   isClient     = false;
487   isConnected  = false;
488   isStarted    = false;
489   currentPhase = 0;
490 
491   signalHandler(0);
492 
493   if(argc < 4){
494     usage(progName);
495     return 0;
496   }
497 
498   const NodeId localNodeId   = atoi(argv[1]);
499   const char * localHostName = argv[2];
500   const char * remoteHost1   = argv[3];
501 
502   if(argc >= 5)
503     loopCount = atoi(argv[4]);
504   if(argc >= 6)
505     sendBufSz = atoi(argv[5]);
506   if(argc >= 7)
507     recvBufSz = atoi(argv[6]);
508 
509   if(localNodeId < 1 || localNodeId > 2){
510     ndbout << "localNodeId = " << localNodeId << endl << endl;
511     usage(progName);
512     return 0;
513   }
514 
515   if(localNodeId == 1)
516     ndbout << "-- ECHO CLIENT --" << endl;
517   else
518     ndbout << "-- ECHO SERVER --" << endl;
519 
520   ndbout << "localNodeId:           " << localNodeId << endl;
521   ndbout << "localHostName:         " << localHostName << endl;
522   ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
523 	 << remoteHost1 << endl;
524   ndbout << "Loop count: " << loopCount << endl;
525   ndbout << "-----------------" << endl;
526 
527   void * confTemplate = 0;
528   CreateTransporterFunc func = 0;
529   switch(tt){
530   case TestTCP:
531     func = createTCPTransporter;
532     confTemplate = &tcpTemplate;
533     break;
534   case TestSCI:
535     func = createSCITransporter;
536     confTemplate = &sciTemplate;
537     break;
538   case TestSHM:
539     func = createSHMTransporter;
540     confTemplate = &shmTemplate;
541     break;
542   default:
543     ndbout << "Unsupported transporter type" << endl;
544     return 0;
545   }
546 
547   ndbout << "Creating transporter registry" << endl;
548   tReg = new TransporterRegistry;
549   tReg->init(localNodeId);
550 
551   switch(localNodeId){
552   case 1:
553     (* func)(confTemplate, 1, 2, localHostName, remoteHost1,
554 	     sendBufSz, recvBufSz);
555     break;
556   case 2:
557     (* func)(confTemplate, 2, 1, localHostName, remoteHost1,
558 	     sendBufSz, recvBufSz);
559     break;
560   }
561 
562   ndbout << "Doing startSending/startReceiving" << endl;
563   tReg->startSending();
564   tReg->startReceiving();
565 
566   ndbout << "Connecting" << endl;
567   tReg->setPerformState(PerformConnect);
568   tReg->checkConnections();
569 
570   if(localNodeId == 1)
571     client(2);
572   else
573     server();
574 
575   isStarted = false;
576 
577   ndbout << "Sleep 3 secs" << endl;
578   NdbSleep_SecSleep(3);
579 
580   ndbout << "Doing setPerformState(Disconnect)" << endl;
581   tReg->setPerformState(PerformDisconnect);
582 
583   ndbout << "Doing checkConnections()" << endl;
584   tReg->checkConnections();
585 
586   ndbout << "Deleting transporter registry" << endl;
587   delete tReg; tReg = 0;
588 
589   return 0;
590 }
591 
operator <<(NdbOut & out,SignalHeader & sh)592 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
593   out << "-- Signal Header --" << endl;
594   out << "theLength:    " << sh.theLength << endl;
595   out << "gsn:          " << sh.theVerId_signalNumber << endl;
596   out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
597   out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
598   out << "sendersSig:   " << sh.theSendersSignalId << endl;
599   out << "theSignalId:  " << sh.theSignalId << endl;
600   out << "trace:        " << (int)sh.theTrace << endl;
601   return out;
602 }
603 
604 void
execute(SignalHeader * const header,Uint8 prio,Uint32 * const theData)605 execute(SignalHeader * const header, Uint8 prio, Uint32 * const theData){
606   const NodeId nodeId = refToNode(header->theSendersBlockRef);
607   NDB_TICKS sec = 0;
608   Uint32 micro=0;
609   int ret = NdbTick_CurrentMicrosecond(&sec,&micro);
610   if(prio == 0 && isClient && ret == 0) {
611     allPhases[currentPhase].stopTimePrioA  = micro + sec*1000000;
612     allPhases[currentPhase].totTimePrioA +=
613       allPhases[currentPhase].stopTimePrioA -
614       allPhases[currentPhase].startTimePrioA;
615   }
616   if(ret!=0)
617     allPhases[currentPhase].totTimePrioA = -1;
618 
619   if(isClient){
620     allPhases[currentPhase].noOfSignalReceived++;
621   } else {
622     int sleepTime = 10;
623     while(tReg->prepareSend(header, prio, theData, nodeId) != SEND_OK){
624       ndbout << "Failed to echo" << sleepTime << endl;
625       NdbSleep_MilliSleep(sleepTime);
626       // sleepTime += 10;
627     }
628 
629     signalToEcho--;
630   }
631 }
632 
633 void
reportError(NodeId nodeId,TransporterError errorCode)634 reportError(NodeId nodeId, TransporterError errorCode){
635   char buf[255];
636   sprintf(buf, "reportError (%d, %x) in perfTest", nodeId, errorCode);
637   ndbout << buf << endl;
638   if(errorCode & 0x8000){
639     tReg->setPerformState(nodeId, PerformDisconnect);
640   }
641 }
642 
643 /**
644  * Report average send theLength in bytes (4096 last sends)
645  */
646 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)647 reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes){
648   allPhases[currentPhase].sendCount    += count;
649   allPhases[currentPhase].sendLenBytes += bytes;
650 
651   if(!isClient){
652     ndbout << "reportSendLen(" << nodeId << ", "
653 	   << (bytes/count) << ")" << endl;
654   }
655 }
656 
657 /**
658  * Report average receive theLength in bytes (4096 last receives)
659  */
660 void
reportReceiveLen(NodeId nodeId,Uint32 count,Uint64 bytes)661 reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes){
662   allPhases[currentPhase].recvCount    += count;
663   allPhases[currentPhase].recvLenBytes += bytes;
664 
665   if(!isClient){
666     ndbout << "reportReceiveLen(" << nodeId << ", "
667 	   << (bytes/count) << ")" << endl;
668   }
669 }
670 
671 /**
672  * Report connection established
673  */
674 void
reportConnect(NodeId nodeId)675 reportConnect(NodeId nodeId){
676   char buf[255];
677   sprintf(buf, "reportConnect(%d)", nodeId);
678   ndbout << buf << endl;
679   tReg->setPerformState(nodeId, PerformIO);
680 
681   if(!isStarted){
682     isStarted = true;
683     startTime = NdbTick_CurrentMillisecond();
684     if(isClient){
685       reportHeader();
686       allPhases[0].startTime = startTime;
687     }
688   }
689   else{
690     // Resend signals that were lost when connection failed
691     TestPhase * current = &allPhases[currentPhase];
692     current->noOfSignalSent = current->noOfSignalReceived;
693   }
694 }
695 
696 /**
697  * Report connection broken
698  */
699 void
reportDisconnect(NodeId nodeId,Uint32 errNo)700 reportDisconnect(NodeId nodeId, Uint32 errNo){
701   char buf[255];
702   sprintf(buf, "reportDisconnect(%d)", nodeId);
703   ndbout << buf << endl;
704 
705   if(isStarted)
706     tReg->setPerformState(nodeId, PerformConnect);
707 }
708 
709 
710 int
checkJobBuffer()711 checkJobBuffer() {
712   /**
713    * Check to see if jobbbuffers are starting to get full
714    * and if so call doJob
715    */
716   return 0;
717 }
718