1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 #include <ndb_global.h>
27 
28 #include <NdbHost.h>
29 #include <NdbSleep.h>
30 #include <NdbThread.h>
31 #include <NdbMain.h>
32 #include <NdbOut.hpp>
33 #include <NdbEnv.h>
34 #include <NdbTest.hpp>
35 
36 #include "userInterface.h"
37 #include "dbGenerator.h"
38 
39 static int   numProcesses;
40 static int   numSeconds;
41 static int   numWarmSeconds;
42 static int   parallellism;
43 static int   millisSendPoll;
44 static int   minEventSendPoll;
45 static int   forceSendPoll;
46 
47 static ThreadData *data;
48 
usage(const char * prog)49 static void usage(const char *prog)
50 {
51   const char  *progname;
52 
53    /*--------------------------------------------*/
54    /* Get the name of the program (without path) */
55    /*--------------------------------------------*/
56    progname = strrchr(prog, '/');
57 
58    if (progname == 0)
59      progname = prog;
60    else
61      ++progname;
62 
63    ndbout_c(
64            "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] "
65 	   "[-t <num> ] [ -e <num> ] [ -f <num>] \n"
66            "  -proc <num>    Specifies that <num> is the number of\n"
67            "                 threads. The default is 1.\n"
68            "  -time <num>    Specifies that the test will run for <num> sec.\n"
69            "                 The default is 10 sec\n"
70            "  -warm <num>    Specifies the warm-up/cooldown period of <num> "
71 	   "sec.\n"
72            "                 The default is 10 sec\n"
73 	   "  -p <num>       The no of parallell transactions started by "
74 	   "one thread\n"
75 	   "  -e <num>       Minimum no of events before wake up in call to "
76 	   "sendPoll\n"
77 	   "                 Default is 1\n"
78 	   "  -f <num>       force parameter to sendPoll\n"
79 	   "                 Default is 0\n",
80            progname);
81 }
82 
83 static
84 int
parse_args(int argc,const char ** argv)85 parse_args(int argc, const char **argv)
86 {
87    int i;
88 
89    numProcesses     = 1;
90    numSeconds       = 10;
91    numWarmSeconds   = 10;
92    parallellism     = 1;
93    millisSendPoll   = 10000;
94    minEventSendPoll = 1;
95    forceSendPoll    = 0;
96 
97 
98    i = 1;
99    while (i < argc){
100      if (strcmp("-proc",argv[i]) == 0) {
101        if (i + 1 >= argc) {
102 	 return 1;
103        }
104        if (sscanf(argv[i+1], "%d", &numProcesses) == -1 ||
105 	   numProcesses <= 0 || numProcesses > 127) {
106 	 ndbout_c("-proc flag requires a positive integer argument [1..127]");
107 	 return 1;
108        }
109        i += 2;
110      } else if (strcmp("-p", argv[i]) == 0){
111        if(i + 1 >= argc){
112 	 usage(argv[0]);
113 	 return 1;
114        }
115        if (sscanf(argv[i+1], "%d", &parallellism) == -1 ||
116 	   parallellism <= 0){
117 	 ndbout_c("-p flag requires a positive integer argument");
118 	 return 1;
119        }
120        i += 2;
121      }
122      else if (strcmp("-time",argv[i]) == 0) {
123        if (i + 1 >= argc) {
124 	 return 1;
125        }
126        if (sscanf(argv[i+1], "%d", &numSeconds) == -1 ||
127 	   numSeconds < 0) {
128 	 ndbout_c("-time flag requires a positive integer argument");
129 	 return 1;
130        }
131        i += 2;
132      }
133      else if (strcmp("-warm",argv[i]) == 0) {
134        if (i + 1 >= argc) {
135 	 return 1;
136        }
137        if (sscanf(argv[i+1], "%d", &numWarmSeconds) == -1 ||
138 	   numWarmSeconds < 0) {
139 	 ndbout_c("-warm flag requires a positive integer argument");
140 	 return 1;
141        }
142        i += 2;
143      }
144      else if (strcmp("-e",argv[i]) == 0) {
145        if (i + 1 >= argc) {
146 	 return 1;
147        }
148        if (sscanf(argv[i+1], "%d", &minEventSendPoll) == -1 ||
149 	   minEventSendPoll < 0) {
150 	 ndbout_c("-e flag requires a positive integer argument");
151 	 return 1;
152        }
153        i += 2;
154      }
155      else if (strcmp("-f",argv[i]) == 0) {
156        if (i + 1 >= argc) {
157 	 usage(argv[0]);
158 	 return 1;
159        }
160        if (sscanf(argv[i+1], "%d", &forceSendPoll) == -1 ||
161 	   forceSendPoll < 0) {
162 	 ndbout_c("-f flag requires a positive integer argument");
163 	 return 1;
164        }
165        i += 2;
166      }
167      else {
168        return 1;
169      }
170    }
171 
172    if(minEventSendPoll > parallellism){
173      ndbout_c("minEventSendPoll(%d) > parallellism(%d)",
174 	     minEventSendPoll, parallellism);
175      ndbout_c("not very good...");
176      ndbout_c("very bad...");
177      ndbout_c("exiting...");
178      return 1;
179    }
180    return 0;
181 }
182 
183 static
184 void
print_transaction(const char * header,unsigned long totalCount,TransactionDefinition * trans,unsigned int printBranch,unsigned int printRollback)185 print_transaction(const char            *header,
186 		  unsigned long          totalCount,
187 		  TransactionDefinition *trans,
188 		  unsigned int           printBranch,
189 		  unsigned int           printRollback)
190 {
191   double f;
192 
193   ndbout_c("  %s: %d (%.2f%%) "
194 	   "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
195 	   header,
196 	   trans->count,
197 	   (double)trans->count / (double)totalCount * 100.0,
198 	   (int)trans->latency.getMean(),
199 	   (int)trans->latency.getMin(),
200 	   (int)trans->latency.getMax(),
201 	   (int)trans->latency.getStddev(),
202 	   (int)trans->latency.getCount()
203 	   );
204 
205   if( printBranch ){
206     if( trans->count == 0 )
207       f = 0.0;
208     else
209       f = (double)trans->branchExecuted / (double)trans->count * 100.0;
210     ndbout_c("      Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
211   }
212 
213   if( printRollback ){
214     if( trans->count == 0 )
215       f = 0.0;
216     else
217       f = (double)trans->rollbackExecuted / (double)trans->count * 100.0;
218     ndbout_c("      Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
219   }
220 }
221 
222 void
print_stats(const char * title,unsigned int length,unsigned int transactionFlag,GeneratorStatistics * gen,int numProc,int parallellism)223 print_stats(const char       *title,
224 	    unsigned int      length,
225 	    unsigned int      transactionFlag,
226 	    GeneratorStatistics *gen,
227 	    int numProc, int parallellism)
228 {
229   int    i;
230   char buf[10];
231   char name[MAXHOSTNAMELEN];
232 
233   name[0] = 0;
234   NdbHost_GetHostName(name);
235 
236   ndbout_c("\n------ %s ------",title);
237   ndbout_c("Length        : %d %s",
238 	 length,
239 	 transactionFlag ? "Transactions" : "sec");
240   ndbout_c("Processor     : %s", name);
241   ndbout_c("Number of Proc: %d",numProc);
242   ndbout_c("Parallellism  : %d", parallellism);
243   ndbout_c("\n");
244 
245   if( gen->totalTransactions == 0 ) {
246     ndbout_c("   No Transactions for this test");
247   }
248   else {
249     for(i = 0; i < 5; i++) {
250       sprintf(buf, "T%d",i+1);
251       print_transaction(buf,
252 			gen->totalTransactions,
253 			&gen->transactions[i],
254 			i >= 2,
255 			i >= 3 );
256     }
257 
258     ndbout_c("\n");
259     ndbout_c("  Overall Statistics:");
260     ndbout_c("     Transactions: %d", gen->totalTransactions);
261     ndbout_c("     Outer       : %.0f TPS",gen->outerTps);
262     ndbout_c("\n");
263   }
264 }
265 
266 static
267 void *
threadRoutine(void * arg)268 threadRoutine(void *arg)
269 {
270   int i;
271   ThreadData *data = (ThreadData *)arg;
272   Ndb * pNDB;
273 
274   pNDB = asyncDbConnect(parallellism);
275   /* NdbSleep_MilliSleep(rand() % 10); */
276 
277   for(i = 0; i<parallellism; i++){
278     data[i].pNDB = pNDB;
279   }
280   millisSendPoll = 30000;
281   asyncGenerator(data, parallellism,
282 		 millisSendPoll, minEventSendPoll, forceSendPoll);
283 
284   asyncDbDisconnect(pNDB);
285 
286   return NULL;
287 }
288 
289 NDB_COMMAND(DbAsyncGenerator, "DbAsyncGenerator",
290 	    "DbAsyncGenerator", "DbAsyncGenerator", 65535)
291 {
292   ndb_init();
293   int i;
294   int j;
295   int k;
296   struct NdbThread* pThread = NULL;
297   GeneratorStatistics  stats;
298   GeneratorStatistics *p;
299   char threadName[32];
300   int rc = NDBT_OK;
301   void* tmp = NULL;
302   if(parse_args(argc,argv) != 0){
303     usage(argv[0]);
304     return NDBT_ProgramExit(NDBT_WRONGARGS);
305   }
306 
307 
308   ndbout_c("\nStarting Test with %d process(es) for %d %s parallellism %d",
309 	   numProcesses,
310 	   numSeconds,
311 	   "sec",
312 	   parallellism);
313 
314   ndbout_c("   WarmUp/coolDown = %d sec", numWarmSeconds);
315 
316   data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
317 
318   for(i = 0; i < numProcesses; i++) {
319     for(j = 0; j<parallellism; j++){
320       data[i*parallellism+j].warmUpSeconds   = numWarmSeconds;
321       data[i*parallellism+j].testSeconds     = numSeconds;
322       data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
323       data[i*parallellism+j].randomSeed      =
324 	NdbTick_CurrentMillisecond()+i+j;
325       data[i*parallellism+j].changedTime     = 0;
326       data[i*parallellism+j].runState        = Runnable;
327     }
328     sprintf(threadName, "AsyncThread[%d]", i);
329     pThread = NdbThread_Create(threadRoutine,
330 			      (void**)&data[i*parallellism],
331 			      65535,
332 			      threadName,
333                               NDB_THREAD_PRIO_LOW);
334     if(pThread != 0 && pThread != NULL){
335       (&data[i*parallellism])->pThread = pThread;
336     } else {
337       perror("Failed to create thread");
338       rc = NDBT_FAILED;
339     }
340   }
341 
342   showTime();
343 
344   /*--------------------------------*/
345   /* Wait for all processes to exit */
346   /*--------------------------------*/
347   for(i = 0; i < numProcesses; i++) {
348     NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
349     NdbThread_Destroy(&data[i*parallellism].pThread);
350   }
351 
352   ndbout_c("All threads have finished");
353 
354   /*-------------------------------------------*/
355   /* Clear all structures for total statistics */
356   /*-------------------------------------------*/
357   stats.totalTransactions = 0;
358   stats.outerTps          = 0.0;
359 
360   for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
361     stats.transactions[i].count            = 0;
362     stats.transactions[i].branchExecuted   = 0;
363     stats.transactions[i].rollbackExecuted = 0;
364     stats.transactions[i].latency.reset();
365   }
366 
367   /*--------------------------------*/
368   /* Add the values for all Threads */
369   /*--------------------------------*/
370   for(i = 0; i < numProcesses; i++) {
371     for(k = 0; k<parallellism; k++){
372       p = &data[i*parallellism+k].generator;
373 
374       stats.totalTransactions += p->totalTransactions;
375       stats.outerTps          += p->outerTps;
376 
377       for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
378 	stats.transactions[j].count +=
379 	  p->transactions[j].count;
380 	stats.transactions[j].branchExecuted +=
381 	  p->transactions[j].branchExecuted;
382 	stats.transactions[j].rollbackExecuted +=
383 	  p->transactions[j].rollbackExecuted;
384 	stats.transactions[j].latency +=
385 	  p->transactions[j].latency;
386       }
387     }
388   }
389 
390   print_stats("Test Results",
391 	      numSeconds,
392 	      0,
393 	      &stats,
394 	      numProcesses,
395 	      parallellism);
396 
397   free(data);
398 
399   NDBT_ProgramExit(rc);
400 }
401