1 /*
2    Copyright (C) 2003-2006 MySQL AB
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 
27 #include <ndb_global.h>
28 
29 extern "C" {
30 #include <dba.h>
31 }
32 
33 #include <NdbOut.hpp>
34 #include <NdbSleep.h>
35 #include <NdbTimer.hpp>
36 #include <NDBT_Stats.hpp>
37 #include <NDBT_ReturnCodes.h>
38 #include <NdbMain.h>
39 #include <time.h>
40 
41 #undef min
42 #undef max
43 
44 static const int NP_Insert      = 0;
45 static const int NP_Update      = 1;
46 static const int NP_WriteUpdate = 2;
47 static const int NP_WriteInsert = 3;
48 static const int NP_Delete      = 4;
49 static const int NP_BulkRead    = 5;
50 static const int NP_MAX         = 5;
51 
52 static const char * Operations[] = {
53   "Insert  ",
54   "Update  ",
55   "WriteUpd",
56   "WriteIns",
57   "Delete  ",
58   "BulkRead"
59 };
60 
61 /**
62  * Configuration variables
63  */
64 static int NoOfTransactions         = 10000;
65 static int ParallellTransactions    = 1000;
66 static int OperationsPerTransaction = 10;
67 static int NoOfColumns              = 20;
68 static int BytesPerInsert           = 300;
69 static int BytesPerUpdate           = 200;
70 static int LoopCount                = 10;
71 
72 /**
73  * Global variables
74  */
75 static char TableName[255];
76 static DBA_ColumnDesc_t    * ColumnDescriptions;
77 static DBA_ColumnBinding_t * InsertBindings;
78 static DBA_ColumnBinding_t * UpdateBindings; static int UpdateBindingColumns;
79 static DBA_ColumnBinding_t * DeleteBindings;
80 
81 static char * TestData;
82 static DBA_Binding_t * InsertB;
83 static DBA_Binding_t * UpdateB;
84 static DBA_Binding_t * DeleteB;
85 
86 /**
87  * Function prototypes
88  */
89 static void sequence(int loops);
90 
getPtr(int rowNo)91 inline void * getPtr(int rowNo) { return TestData+rowNo*BytesPerInsert;}
setPK(int rowNo,int pk)92 inline void   setPK(int rowNo, int pk){ * (int *)getPtr(rowNo) = pk; }
93 
94 static void SetupTestData();
95 static void CleanupTestData();
96 
97 static bool CreateTable();
98 static bool CleanTable();
99 static bool CreateBindings();
100 
101 static void usage();
102 
103 static
104 void
usage()105 usage(){
106   int ForceSend, Interval;
107   DBA_GetParameter(0, &Interval);
108   DBA_GetParameter(3, &ForceSend);
109 
110   ndbout << "newtonPerf" << endl
111 	 << "   -n Transactions per loop and operation ("
112 	 << NoOfTransactions << ")" << endl
113 	 << "   -p parallell transactions (" << ParallellTransactions << ")"
114 	 << endl
115 	 << "   -o operations per transaction (" << OperationsPerTransaction
116 	 << ")" << endl
117 	 << "   -a no of columns (" << NoOfColumns << ")" << endl
118 	 << "   -b Table size in bytes (" << BytesPerInsert << ")" << endl
119 	 << "   -u Bytes per update (" << BytesPerUpdate << ")" << endl
120 	 << "   -l Loop count (" << LoopCount << ")" << endl
121 	 << "   -i Interval (" << Interval << "ms)" << endl
122 	 << "   -f Force send algorithm (" << ForceSend << ")" << endl
123 	 << "   -h Help" << endl;
124 
125 }
126 
127 static
128 bool
parseArgs(int argc,const char ** argv)129 parseArgs(int argc, const char **argv){
130   bool a = false, b = false, u = false;
131 
132   for(int i = 1; i<argc; i++){
133     if(argv[i][0] != '-'){
134       ndbout << "Invalid argument: " << argv[i] << endl;
135       return false;
136     }
137 
138     if(argv[i][1] == 'h')
139       return false;
140 
141     if(i == argc-1){
142       ndbout << "Expecting argument to " << argv[i] << endl;
143       return false;
144     }
145 
146     switch(argv[i][1]){
147     case 'n':
148       NoOfTransactions = atoi(argv[i+1]);
149       break;
150     case 'p':
151       ParallellTransactions = atoi(argv[i+1]);
152       break;
153     case 'o':
154       OperationsPerTransaction = atoi(argv[i+1]);
155       break;
156     case 'a':
157       NoOfColumns = atoi(argv[i+1]);
158       a = true;
159       break;
160     case 'b':
161       BytesPerInsert = atoi(argv[i+1]);
162       b = true;
163       break;
164     case 'u':
165       BytesPerUpdate = atoi(argv[i+1]);
166       u = true;
167       break;
168     case 'l':
169       LoopCount = atoi(argv[i+1]);
170       break;
171     case 'f':
172       {
173 	const int val = atoi(argv[i+1]);
174 	if(DBA_SetParameter(3, val) != DBA_NO_ERROR){
175 	  ndbout << "Invalid force send algorithm: "
176 		 << DBA_GetLatestErrorMsg()
177 		 << "(" << DBA_GetLatestError() << ")" << endl;
178 	  return false;
179 	}
180       }
181       break;
182     case 'i':
183       {
184 	const int val = atoi(argv[i+1]);
185 	if(DBA_SetParameter(0, val) != DBA_NO_ERROR){
186 	  ndbout << "Invalid NBP interval: "
187 		 << DBA_GetLatestErrorMsg()
188 		 << "(" << DBA_GetLatestError() << ")" << endl;
189 	  return false;
190 	}
191       }
192       break;
193     default:
194       ndbout << "Invalid option: " << argv[i] << endl;
195       return false;
196     }
197     i++;
198   }
199   if(a && !b) BytesPerInsert = 15 * NoOfColumns;
200   if(!a && b) NoOfColumns = ((BytesPerInsert + 14) / 15)+1;
201 
202   if(!u)
203     BytesPerUpdate = (2 * BytesPerInsert) / 3;
204 
205   bool t = true;
206   if(NoOfColumns < 2) t = false;
207   if(BytesPerInsert < 8) t = false;
208   if(BytesPerUpdate < 8) t = false;
209 
210   if(!t){
211     ndbout << "Invalid arguments combination of -a -b -u not working out"
212 	   << endl;
213     return false;
214   }
215   return true;
216 }
217 
218 NDB_COMMAND(newton_perf, "newton_perf",
219 	    "newton_perf", "newton_perf", 65535){
220 
221   if(!parseArgs(argc, argv)){
222     usage();
223     return NDBT_ProgramExit(NDBT_WRONGARGS);
224   }
225 
226   ndbout << "-----------" << endl;
227   usage();
228   ndbout << endl;
229 
230   SetupTestData();
231 
232   DBA_Open();
233 
234   if(!CreateTable()){
235     DBA_Close();
236     CleanupTestData();
237     return 0;
238   }
239 
240   if(!CreateBindings()){
241     DBA_Close();
242     CleanupTestData();
243     return 0;
244   }
245 
246   CleanTable();
247 
248   sequence(LoopCount);
249 
250   DBA_Close();
251   CleanupTestData();
252 
253   DBA_DestroyBinding(InsertB);
254   DBA_DestroyBinding(UpdateB);
255   DBA_DestroyBinding(DeleteB);
256 }
257 
258 static
259 void
ErrorMsg(const char * s)260 ErrorMsg(const char * s){
261   ndbout << s
262 	 << ": " << DBA_GetLatestError() << "-" << DBA_GetLatestErrorMsg()
263 	 << ", " << DBA_GetLatestNdbError()
264 	 << endl;
265 }
266 
267 static
268 int
m4(int i)269 m4(int i){
270   const int j = i - (i & 3);
271   return j;
272 }
273 
274 static
275 void
SetupTestData()276 SetupTestData(){
277   ndbout << "Creating testdata" << endl;
278 
279   ColumnDescriptions = new DBA_ColumnDesc_t[NoOfColumns];
280   InsertBindings = new DBA_ColumnBinding_t[NoOfColumns];
281 
282   const int sz = m4((BytesPerInsert - ((NoOfColumns+1)/2)*4)/(NoOfColumns/2));
283   int sum = 0;
284   UpdateBindingColumns = 0;
285   for(int i = 0; i<NoOfColumns; i++){
286     char tmp[16];
287     if((i % 2) == 0){
288       sprintf(tmp, "I%d", i);
289       ColumnDescriptions[i].DataType = DBA_INT;
290       ColumnDescriptions[i].Size = 4;
291       sum += 4;
292     } else {
293       sprintf(tmp, "S%d", i);
294       ColumnDescriptions[i].DataType = DBA_CHAR;
295       ColumnDescriptions[i].Size = sz;
296       sum += sz;
297     }
298     ColumnDescriptions[i].IsKey = 0;
299     ColumnDescriptions[i].Name  = strdup(tmp);
300 
301     InsertBindings[i].Name     = strdup(tmp);
302     InsertBindings[i].DataType = ColumnDescriptions[i].DataType;
303     InsertBindings[i].Size     = ColumnDescriptions[i].Size;
304     InsertBindings[i].Offset   = sum - ColumnDescriptions[i].Size;
305     InsertBindings[i].Ptr      = 0;
306 
307     if(sum <= BytesPerUpdate)
308       UpdateBindingColumns++;
309   }
310   if(UpdateBindingColumns == 1)
311     UpdateBindingColumns++;
312 
313   ColumnDescriptions[0].IsKey = 1;
314 
315   assert(sum <= BytesPerInsert);
316   sprintf(TableName, "NEWTON_%d_%d", sum, NoOfColumns);
317 
318   UpdateBindings = new DBA_ColumnBinding_t[UpdateBindingColumns];
319   memcpy(UpdateBindings, InsertBindings,
320 	 UpdateBindingColumns*sizeof(DBA_ColumnBinding_t));
321 
322   DeleteBindings = new DBA_ColumnBinding_t[1];
323   memcpy(DeleteBindings, InsertBindings,
324 	 1*sizeof(DBA_ColumnBinding_t));
325 
326   TestData = (char *)malloc(NoOfTransactions *
327 			    OperationsPerTransaction * BytesPerInsert);
328 
329   assert(TestData != 0);
330   for(int i = 0; i<NoOfTransactions; i++)
331     for(int j = 0; j<OperationsPerTransaction; j++){
332       const int pk = i * OperationsPerTransaction + j;
333       setPK(pk, pk);
334     }
335 }
336 
337 static
338 void
CleanupTestData()339 CleanupTestData(){
340   free(TestData);
341   for(int i = 0; i<NoOfColumns; i++){
342     free((char*)ColumnDescriptions[i].Name);
343     free((char*)InsertBindings[i].Name);
344   }
345   delete [] ColumnDescriptions;
346   delete [] InsertBindings;
347   delete [] UpdateBindings;
348   delete [] DeleteBindings;
349 }
350 
351 
352 static bool CleanReturnValue = true;
353 static int  CleanCallbacks = 0;
354 static int  CleanRows = 0;
355 
356 extern "C"
357 void
CleanCallback(DBA_ReqId_t reqId,DBA_Error_t error,DBA_ErrorCode_t ec)358 CleanCallback(DBA_ReqId_t reqId, DBA_Error_t error, DBA_ErrorCode_t ec){
359   CleanCallbacks++;
360   if(error == DBA_NO_ERROR)
361     CleanRows++;
362 }
363 
364 static
365 bool
CleanTable()366 CleanTable(){
367   ndbout << "Cleaning table..." << flush;
368   CleanReturnValue = true;
369   CleanCallbacks = 0;
370   CleanRows = 0;
371   for(int i = 0; i<NoOfTransactions * OperationsPerTransaction; i++){
372     DBA_ArrayDeleteRows(DeleteB,
373 			getPtr(i), 1,
374 			CleanCallback);
375     while((i-CleanCallbacks)>ParallellTransactions)
376       NdbSleep_MilliSleep(100);
377   }
378   while(CleanCallbacks != (NoOfTransactions * OperationsPerTransaction))
379     NdbSleep_SecSleep(1);
380 
381   ndbout << CleanRows << " rows deleted" << endl;
382 
383   return CleanReturnValue;
384 }
385 
386 static
387 bool
CreateBindings()388 CreateBindings(){
389   ndbout << "Creating bindings" << endl;
390   InsertB = UpdateB = DeleteB = 0;
391 
392   InsertB = DBA_CreateBinding(TableName, NoOfColumns,
393 			       InsertBindings, BytesPerInsert);
394   if(InsertB == 0){
395     ErrorMsg("Failed to create insert bindings");
396     return false;
397   }
398 
399   UpdateB = DBA_CreateBinding(TableName, UpdateBindingColumns,
400 			       UpdateBindings, BytesPerInsert);
401   if(UpdateB == 0){
402     ErrorMsg("Failed to create update bindings");
403     DBA_DestroyBinding(InsertB);
404     return false;
405   }
406 
407   DeleteB = DBA_CreateBinding(TableName, 1,
408 			       DeleteBindings, BytesPerInsert);
409   if(DeleteB == 0){
410     ErrorMsg("Failed to create delete bindings");
411     DBA_DestroyBinding(InsertB);
412     DBA_DestroyBinding(UpdateB);
413     return false;
414   }
415   return true;
416 }
417 
418 static
419 bool
CreateTable()420 CreateTable(){
421   ndbout << "Creating " << TableName << endl;
422   return DBA_CreateTable( TableName,
423 			  NoOfColumns,
424 			  ColumnDescriptions ) == DBA_NO_ERROR;
425 }
426 
427 /**
428  *
429  */
430 static NdbTimer SequenceTimer;
431 
432 static int CurrentOp    = NP_Insert;
433 static int SequenceSent = 0;
434 static int SequenceRecv = 0;
435 static NDBT_Stats SequenceStats[NP_MAX][4];
436 static NDBT_Stats SequenceLatency[NP_MAX];
437 
438 static int           HashMax;
439 static DBA_ReqId_t * ReqHash;    // ReqId - Latency/Row
440 static int         * ReqHashPos; // (row in StartTime)
441 
442 static int SequenceLatencyPos;
443 static NDB_TICKS   * StartTime;
444 
445 static
446 inline
447 int
computeHashMax(int elements)448 computeHashMax(int elements){
449   HashMax = 1;
450   while(HashMax < elements)
451     HashMax *= 2;
452 
453   if(HashMax < 1024)
454     HashMax = 1024;
455 
456   return HashMax;
457 }
458 
459 static
460 inline
461 int
hash(DBA_ReqId_t request)462 hash(DBA_ReqId_t request){
463   int r = (request >> 2) & (HashMax-1);
464   return r;
465 }
466 
467 static
468 inline
469 void
addRequest(DBA_ReqId_t request,int pos)470 addRequest(DBA_ReqId_t request, int pos){
471 
472   int i = hash(request);
473 
474   while(ReqHash[i] != 0)
475     i = ((i + 1) & (HashMax-1));
476 
477   ReqHash[i] = request;
478   ReqHashPos[i] = pos;
479 }
480 
481 static
482 inline
483 int
getRequest(DBA_ReqId_t request)484 getRequest(DBA_ReqId_t request){
485 
486   int i = hash(request);
487 
488   while(ReqHash[i] != request)
489     i = ((i + 1) & (HashMax-1));
490 
491   ReqHash[i] = 0;
492 
493   return ReqHashPos[i];
494 }
495 
496 extern "C"
497 void
SequenceCallback(DBA_ReqId_t reqId,DBA_Error_t error,DBA_ErrorCode_t ec)498 SequenceCallback(DBA_ReqId_t reqId, DBA_Error_t error, DBA_ErrorCode_t ec){
499   int p = getRequest(reqId) - 1;
500 
501   if(error != DBA_NO_ERROR){
502     ndbout << "p = " << p << endl;
503     ndbout << "DBA_GetErrorMsg(" << error << ") = "
504 	   << DBA_GetErrorMsg(error) << endl;
505     ndbout << "DBA_GetNdbErrorMsg(" << ec << ") = "
506 	   << DBA_GetNdbErrorMsg(ec) << endl;
507 
508     assert(error == DBA_NO_ERROR);
509   }
510 
511   SequenceRecv++;
512   if(SequenceRecv == NoOfTransactions){
513     SequenceTimer.doStop();
514   }
515 
516   if((p & 127) == 127){
517     NDB_TICKS t = NdbTick_CurrentMillisecond() - StartTime[p];
518     SequenceLatency[CurrentOp].addObservation(t);
519   }
520 }
521 
522 typedef DBA_ReqId_t (* DBA_ArrayFunction)( const DBA_Binding_t* pBindings,
523 					   const void * pData,
524 					   int NbRows,
525 					   DBA_AsyncCallbackFn_t CbFunc );
526 
527 inline
528 int
min(int a,int b)529 min(int a, int b){
530   return a > b ? b : a;
531 }
532 
533 static
534 void
SequenceOp(DBA_ArrayFunction func,const DBA_Binding_t * pBindings,int op)535 SequenceOp(DBA_ArrayFunction func, const DBA_Binding_t* pBindings, int op){
536   SequenceSent = 0;
537   SequenceRecv = 0;
538   SequenceLatencyPos = 1;
539   CurrentOp = op;
540 
541   SequenceTimer.doStart();
542   for(int i = 0; i<NoOfTransactions; ){
543     const int l1 = ParallellTransactions - (SequenceSent - SequenceRecv);
544     const int l2 = min(NoOfTransactions - i, l1);
545     for(int j = 0; j<l2; j++){
546       const DBA_ReqId_t r = func(pBindings,
547 				 getPtr(i*OperationsPerTransaction),
548 				 OperationsPerTransaction,
549 				 SequenceCallback);
550       assert(r != 0);
551       SequenceSent++;
552       addRequest(r, i + 1);
553       i++;
554 
555       if((SequenceSent & 127) == 127){
556 	NDB_TICKS t = NdbTick_CurrentMillisecond();
557 	StartTime[i] = t;
558       }
559     }
560     if(l2 == 0)
561       NdbSleep_MilliSleep(10);
562   }
563 
564   while(SequenceRecv != SequenceSent)
565     NdbSleep_SecSleep(1);
566 
567   ndbout << "Performed " << NoOfTransactions << " " << Operations[op]
568 	 << " in ";
569 
570   double p = NoOfTransactions * 1000;
571   double t = SequenceTimer.elapsedTime();
572   double o = p * OperationsPerTransaction;
573 
574   p /= t;
575   o /= t;
576 
577   int _p = p;
578   int _o = o;
579 
580   double b = 0;
581 
582   switch(op){
583   case NP_Insert:
584   case NP_WriteInsert:
585   case NP_WriteUpdate:
586   case NP_BulkRead:
587     b = BytesPerInsert;
588     break;
589   case NP_Update:
590     b = BytesPerUpdate;
591     break;
592   case NP_Delete:
593     b = 4;
594     break;
595   default:
596     b = 0;
597   }
598   b *= NoOfTransactions * OperationsPerTransaction;
599   b /= t;
600   int _b = b;
601 
602   SequenceStats[op][0].addObservation(t);
603   SequenceStats[op][1].addObservation(p);
604   SequenceStats[op][2].addObservation(o);
605   SequenceStats[op][3].addObservation(b);
606 
607   int t2 = SequenceStats[op][0].getMean();
608   int p2 = SequenceStats[op][1].getMean();
609   int o2 = SequenceStats[op][2].getMean();
610   int b2 = SequenceStats[op][3].getMean();
611 
612   ndbout << SequenceTimer.elapsedTime() << "(" << t2 << ")ms";
613   ndbout << " -> " << _p << "(" << p2 << ") T/s - " << _o
614 	 << "(" << o2 << ") O/s - " << _b << "(" << b2 << ") Kb/s" << endl;
615 
616   ndbout << "  Latency (ms) Avg: " << (int)SequenceLatency[op].getMean()
617 	 << " min: " << (int)SequenceLatency[op].getMin()
618 	 << " max: " << (int)SequenceLatency[op].getMax()
619 	 << " stddev: " << (int)SequenceLatency[op].getStddev()
620 	 << " n: " << SequenceLatency[op].getCount() << endl;
621 }
622 
623 /**
624  * Sequence
625  */
626 static
627 void
sequence(int loops)628 sequence(int loops){
629   computeHashMax(ParallellTransactions);
630   ReqHash    = new DBA_ReqId_t[HashMax];
631   ReqHashPos = new int[HashMax];
632   StartTime  = new NDB_TICKS[NoOfTransactions];
633 
634   for(int i = 0; i<NP_MAX; i++){
635     SequenceLatency[i].reset();
636     for(int j = 0; j<4; j++)
637       SequenceStats[i][j].reset();
638   }
639   for(int i = 0; i<loops; i++){
640     ndbout << "Loop #" << (i+1) << endl;
641     SequenceOp(DBA_ArrayInsertRows, InsertB, NP_Insert);
642 
643     // BulkRead
644 
645     SequenceOp(DBA_ArrayUpdateRows, UpdateB, NP_Update);
646     SequenceOp(DBA_ArrayWriteRows,  InsertB, NP_WriteUpdate);
647     SequenceOp(DBA_ArrayDeleteRows, DeleteB, NP_Delete);
648     SequenceOp(DBA_ArrayWriteRows,  InsertB, NP_WriteInsert);
649     SequenceOp(DBA_ArrayDeleteRows, DeleteB, NP_Delete);
650     ndbout << "-------------------" << endl << endl;
651   }
652 
653   delete [] ReqHash;
654   delete [] ReqHashPos;
655   delete [] StartTime;
656 }
657