1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 All rights reserved. Use is subject to license terms.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #include <ndb_global.h>
27
28 #include <NdbHost.h>
29 #include <NdbSleep.h>
30 #include <NdbThread.h>
31 #include <NdbMain.h>
32 #include <NdbOut.hpp>
33 #include <NdbEnv.h>
34 #include <NdbTest.hpp>
35
36 #include "userInterface.h"
37 #include "dbGenerator.h"
38
39 static int numProcesses;
40 static int numSeconds;
41 static int numWarmSeconds;
42 static int parallellism;
43 static int millisSendPoll;
44 static int minEventSendPoll;
45 static int forceSendPoll;
46
47 static ThreadData *data;
48
usage(const char * prog)49 static void usage(const char *prog)
50 {
51 const char *progname;
52
53 /*--------------------------------------------*/
54 /* Get the name of the program (without path) */
55 /*--------------------------------------------*/
56 progname = strrchr(prog, '/');
57
58 if (progname == 0)
59 progname = prog;
60 else
61 ++progname;
62
63 ndbout_c(
64 "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] "
65 "[-t <num> ] [ -e <num> ] [ -f <num>] \n"
66 " -proc <num> Specifies that <num> is the number of\n"
67 " threads. The default is 1.\n"
68 " -time <num> Specifies that the test will run for <num> sec.\n"
69 " The default is 10 sec\n"
70 " -warm <num> Specifies the warm-up/cooldown period of <num> "
71 "sec.\n"
72 " The default is 10 sec\n"
73 " -p <num> The no of parallell transactions started by "
74 "one thread\n"
75 " -e <num> Minimum no of events before wake up in call to "
76 "sendPoll\n"
77 " Default is 1\n"
78 " -f <num> force parameter to sendPoll\n"
79 " Default is 0\n",
80 progname);
81 }
82
83 static
84 int
parse_args(int argc,const char ** argv)85 parse_args(int argc, const char **argv)
86 {
87 int i;
88
89 numProcesses = 1;
90 numSeconds = 10;
91 numWarmSeconds = 10;
92 parallellism = 1;
93 millisSendPoll = 10000;
94 minEventSendPoll = 1;
95 forceSendPoll = 0;
96
97
98 i = 1;
99 while (i < argc){
100 if (strcmp("-proc",argv[i]) == 0) {
101 if (i + 1 >= argc) {
102 return 1;
103 }
104 if (sscanf(argv[i+1], "%d", &numProcesses) == -1 ||
105 numProcesses <= 0 || numProcesses > 127) {
106 ndbout_c("-proc flag requires a positive integer argument [1..127]");
107 return 1;
108 }
109 i += 2;
110 } else if (strcmp("-p", argv[i]) == 0){
111 if(i + 1 >= argc){
112 usage(argv[0]);
113 return 1;
114 }
115 if (sscanf(argv[i+1], "%d", ¶llellism) == -1 ||
116 parallellism <= 0){
117 ndbout_c("-p flag requires a positive integer argument");
118 return 1;
119 }
120 i += 2;
121 }
122 else if (strcmp("-time",argv[i]) == 0) {
123 if (i + 1 >= argc) {
124 return 1;
125 }
126 if (sscanf(argv[i+1], "%d", &numSeconds) == -1 ||
127 numSeconds < 0) {
128 ndbout_c("-time flag requires a positive integer argument");
129 return 1;
130 }
131 i += 2;
132 }
133 else if (strcmp("-warm",argv[i]) == 0) {
134 if (i + 1 >= argc) {
135 return 1;
136 }
137 if (sscanf(argv[i+1], "%d", &numWarmSeconds) == -1 ||
138 numWarmSeconds < 0) {
139 ndbout_c("-warm flag requires a positive integer argument");
140 return 1;
141 }
142 i += 2;
143 }
144 else if (strcmp("-e",argv[i]) == 0) {
145 if (i + 1 >= argc) {
146 return 1;
147 }
148 if (sscanf(argv[i+1], "%d", &minEventSendPoll) == -1 ||
149 minEventSendPoll < 0) {
150 ndbout_c("-e flag requires a positive integer argument");
151 return 1;
152 }
153 i += 2;
154 }
155 else if (strcmp("-f",argv[i]) == 0) {
156 if (i + 1 >= argc) {
157 usage(argv[0]);
158 return 1;
159 }
160 if (sscanf(argv[i+1], "%d", &forceSendPoll) == -1 ||
161 forceSendPoll < 0) {
162 ndbout_c("-f flag requires a positive integer argument");
163 return 1;
164 }
165 i += 2;
166 }
167 else {
168 return 1;
169 }
170 }
171
172 if(minEventSendPoll > parallellism){
173 ndbout_c("minEventSendPoll(%d) > parallellism(%d)",
174 minEventSendPoll, parallellism);
175 ndbout_c("not very good...");
176 ndbout_c("very bad...");
177 ndbout_c("exiting...");
178 return 1;
179 }
180 return 0;
181 }
182
183 static
184 void
print_transaction(const char * header,unsigned long totalCount,TransactionDefinition * trans,unsigned int printBranch,unsigned int printRollback)185 print_transaction(const char *header,
186 unsigned long totalCount,
187 TransactionDefinition *trans,
188 unsigned int printBranch,
189 unsigned int printRollback)
190 {
191 double f;
192
193 ndbout_c(" %s: %d (%.2f%%) "
194 "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
195 header,
196 trans->count,
197 (double)trans->count / (double)totalCount * 100.0,
198 (int)trans->latency.getMean(),
199 (int)trans->latency.getMin(),
200 (int)trans->latency.getMax(),
201 (int)trans->latency.getStddev(),
202 (int)trans->latency.getCount()
203 );
204
205 if( printBranch ){
206 if( trans->count == 0 )
207 f = 0.0;
208 else
209 f = (double)trans->branchExecuted / (double)trans->count * 100.0;
210 ndbout_c(" Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
211 }
212
213 if( printRollback ){
214 if( trans->count == 0 )
215 f = 0.0;
216 else
217 f = (double)trans->rollbackExecuted / (double)trans->count * 100.0;
218 ndbout_c(" Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
219 }
220 }
221
222 void
print_stats(const char * title,unsigned int length,unsigned int transactionFlag,GeneratorStatistics * gen,int numProc,int parallellism)223 print_stats(const char *title,
224 unsigned int length,
225 unsigned int transactionFlag,
226 GeneratorStatistics *gen,
227 int numProc, int parallellism)
228 {
229 int i;
230 char buf[10];
231 char name[MAXHOSTNAMELEN];
232
233 name[0] = 0;
234 NdbHost_GetHostName(name);
235
236 ndbout_c("\n------ %s ------",title);
237 ndbout_c("Length : %d %s",
238 length,
239 transactionFlag ? "Transactions" : "sec");
240 ndbout_c("Processor : %s", name);
241 ndbout_c("Number of Proc: %d",numProc);
242 ndbout_c("Parallellism : %d", parallellism);
243 ndbout_c("\n");
244
245 if( gen->totalTransactions == 0 ) {
246 ndbout_c(" No Transactions for this test");
247 }
248 else {
249 for(i = 0; i < 5; i++) {
250 sprintf(buf, "T%d",i+1);
251 print_transaction(buf,
252 gen->totalTransactions,
253 &gen->transactions[i],
254 i >= 2,
255 i >= 3 );
256 }
257
258 ndbout_c("\n");
259 ndbout_c(" Overall Statistics:");
260 ndbout_c(" Transactions: %d", gen->totalTransactions);
261 ndbout_c(" Outer : %.0f TPS",gen->outerTps);
262 ndbout_c("\n");
263 }
264 }
265
266 static
267 void *
threadRoutine(void * arg)268 threadRoutine(void *arg)
269 {
270 int i;
271 ThreadData *data = (ThreadData *)arg;
272 Ndb * pNDB;
273
274 pNDB = asyncDbConnect(parallellism);
275 /* NdbSleep_MilliSleep(rand() % 10); */
276
277 for(i = 0; i<parallellism; i++){
278 data[i].pNDB = pNDB;
279 }
280 millisSendPoll = 30000;
281 asyncGenerator(data, parallellism,
282 millisSendPoll, minEventSendPoll, forceSendPoll);
283
284 asyncDbDisconnect(pNDB);
285
286 return NULL;
287 }
288
289 NDB_COMMAND(DbAsyncGenerator, "DbAsyncGenerator",
290 "DbAsyncGenerator", "DbAsyncGenerator", 65535)
291 {
292 ndb_init();
293 int i;
294 int j;
295 int k;
296 struct NdbThread* pThread = NULL;
297 GeneratorStatistics stats;
298 GeneratorStatistics *p;
299 char threadName[32];
300 int rc = NDBT_OK;
301 void* tmp = NULL;
302 if(parse_args(argc,argv) != 0){
303 usage(argv[0]);
304 return NDBT_ProgramExit(NDBT_WRONGARGS);
305 }
306
307
308 ndbout_c("\nStarting Test with %d process(es) for %d %s parallellism %d",
309 numProcesses,
310 numSeconds,
311 "sec",
312 parallellism);
313
314 ndbout_c(" WarmUp/coolDown = %d sec", numWarmSeconds);
315
316 data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
317
318 for(i = 0; i < numProcesses; i++) {
319 for(j = 0; j<parallellism; j++){
320 data[i*parallellism+j].warmUpSeconds = numWarmSeconds;
321 data[i*parallellism+j].testSeconds = numSeconds;
322 data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
323 data[i*parallellism+j].randomSeed =
324 NdbTick_CurrentMillisecond()+i+j;
325 data[i*parallellism+j].changedTime = 0;
326 data[i*parallellism+j].runState = Runnable;
327 }
328 sprintf(threadName, "AsyncThread[%d]", i);
329 pThread = NdbThread_Create(threadRoutine,
330 (void**)&data[i*parallellism],
331 65535,
332 threadName,
333 NDB_THREAD_PRIO_LOW);
334 if(pThread != 0 && pThread != NULL){
335 (&data[i*parallellism])->pThread = pThread;
336 } else {
337 perror("Failed to create thread");
338 rc = NDBT_FAILED;
339 }
340 }
341
342 showTime();
343
344 /*--------------------------------*/
345 /* Wait for all processes to exit */
346 /*--------------------------------*/
347 for(i = 0; i < numProcesses; i++) {
348 NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
349 NdbThread_Destroy(&data[i*parallellism].pThread);
350 }
351
352 ndbout_c("All threads have finished");
353
354 /*-------------------------------------------*/
355 /* Clear all structures for total statistics */
356 /*-------------------------------------------*/
357 stats.totalTransactions = 0;
358 stats.outerTps = 0.0;
359
360 for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
361 stats.transactions[i].count = 0;
362 stats.transactions[i].branchExecuted = 0;
363 stats.transactions[i].rollbackExecuted = 0;
364 stats.transactions[i].latency.reset();
365 }
366
367 /*--------------------------------*/
368 /* Add the values for all Threads */
369 /*--------------------------------*/
370 for(i = 0; i < numProcesses; i++) {
371 for(k = 0; k<parallellism; k++){
372 p = &data[i*parallellism+k].generator;
373
374 stats.totalTransactions += p->totalTransactions;
375 stats.outerTps += p->outerTps;
376
377 for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
378 stats.transactions[j].count +=
379 p->transactions[j].count;
380 stats.transactions[j].branchExecuted +=
381 p->transactions[j].branchExecuted;
382 stats.transactions[j].rollbackExecuted +=
383 p->transactions[j].rollbackExecuted;
384 stats.transactions[j].latency +=
385 p->transactions[j].latency;
386 }
387 }
388 }
389
390 print_stats("Test Results",
391 numSeconds,
392 0,
393 &stats,
394 numProcesses,
395 parallellism);
396
397 free(data);
398
399 NDBT_ProgramExit(rc);
400 }
401