1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include <NdbApi.hpp>
28 #include <NdbSchemaCon.hpp>
29 #include <NdbCondition.h>
30 #include <NdbMutex.h>
31 #include <NdbSleep.h>
32 #include <NdbThread.h>
33 #include <NdbTick.h>
34 #include <NdbOut.hpp>
35
36 const char* const c_szDatabaseName = "TEST_DB";
37
38 const char* const c_szTableNameStored = "CCStored";
39 const char* const c_szTableNameTemp = "CCTemp";
40
41 const char* const c_szContextId = "ContextId";
42 const char* const c_szVersion = "Version";
43 const char* const c_szLockFlag = "LockFlag";
44 const char* const c_szLockTime = "LockTime";
45 const char* const c_szLockTimeUSec = "LockTimeUSec";
46 const char* const c_szContextData = "ContextData";
47
48 const char* g_szTableName = c_szTableNameStored;
49
50
51 #ifdef NDB_WIN32
52 HANDLE hShutdownEvent = 0;
53 #else
54 bool bShutdownEvent = false;
55 #endif
56 long g_nMaxContextIdPerThread = 5000;
57 long g_nNumThreads = 0;
58 long g_nMaxCallsPerSecond = 0;
59 long g_nMaxRetry = 50;
60 bool g_bWriteTuple = false;
61 bool g_bInsertInitial = false;
62 bool g_bVerifyInitial = false;
63
64 Ndb_cluster_connection* theConnection = 0;
65 NdbMutex* g_pNdbMutexPrintf = 0;
66 NdbMutex* g_pNdbMutexIncrement = 0;
67 long g_nNumCallsProcessed = 0;
68 Uint64 g_tStartTime = 0;
69 Uint64 g_tEndTime = 0;
70
71 long g_nNumberOfInitialInsert = 0;
72 long g_nNumberOfInitialVerify = 0;
73
74 const long c_nMaxMillisecForAllCall = 5000;
75 long* g_plCountMillisecForCall = 0;
76 const long c_nMaxMillisecForAllTrans = 5000;
77 long* g_plCountMillisecForTrans = 0;
78 bool g_bReport = false;
79 bool g_bReportPlus = false;
80
81
82 // data for CALL_CONTEXT and GROUP_RESOURCE
83 static char STATUS_DATA[]=
84 "000102030405060708090A0B0C0D0E0F000102030405060708090A0B0C0D0E0F"
85 "101112131415161718191A1B1C1D1E1F000102030405060708090A0B0C0D0E0F"
86 "202122232425262728292A2B2C2D2E2F000102030405060708090A0B0C0D0E0F"
87 "303132333435363738393A3B3C3D3E3F000102030405060708090A0B0C0D0E0F"
88 "404142434445464748494A4B4C4D4E4F000102030405060708090A0B0C0D0E0F"
89 "505152535455565758595A5B5C5D5E5F000102030405060708090A0B0C0D0E0F"
90 "606162636465666768696A6B6C6D6E6F000102030405060708090A0B0C0D0E0F"
91 "707172737475767778797A7B7C7D7E7F000102030405060708090A0B0C0D0E0F"
92 "808182838485868788898A8B8C8D8E8F000102030405060708090A0B0C0D0E0F"
93 "909192939495969798999A9B9C9D9E9F000102030405060708090A0B0C0D0E0F"
94 "10010110210310410510610710810910A000102030405060708090A0B0C0D0EF"
95 "10B10C10D10E10F110111112113114115000102030405060708090A0B0C0D0EF"
96 "11611711811911A11B11C11D11E11F120000102030405060708090A0B0C0D0EF"
97 "12112212312412512612712812912A12B000102030405060708090A0B0C0D0EF"
98 "12C12D12E12F130131132134135136137000102030405060708090A0B0C0D0EF"
99 "13813913A13B13C13D13E13F140141142000102030405060708090A0B0C0D0EF"
100 "14314414514614714814914A14B14C14D000102030405060708090A0B0C0D0EF"
101 "14E14F150151152153154155156157158000102030405060708090A0B0C0D0EF"
102 "15915A15B15C15D15E15F160161162163000102030405060708090A0B0C0D0EF"
103 "16416516616716816916A16B16C16D16E000102030405060708090A0B0C0D0EF"
104 "16F170171172173174175176177178179000102030405060708090A0B0C0D0EF"
105 "17A17B17C17D17E17F180181182183184000102030405060708090A0B0C0D0EF"
106 "18518618718818918A18B18C18D18E18F000102030405060708090A0B0C0D0EF"
107 "19019119219319419519619719819919A000102030405060708090A0B0C0D0EF"
108 "19B19C19D19E19F200201202203204205000102030405060708090A0B0C0D0EF"
109 "20620720820920A20B20C20D20F210211000102030405060708090A0B0C0D0EF"
110 "21221321421521621721821921A21B21C000102030405060708090A0B0C0D0EF"
111 "21D21E21F220221222223224225226227000102030405060708090A0B0C0D0EF"
112 "22822922A22B22C22D22E22F230231232000102030405060708090A0B0C0D0EF"
113 "23323423523623723823923A23B23C23D000102030405060708090A0B0C0D0EF"
114 "23E23F240241242243244245246247248000102030405060708090A0B0C0D0EF"
115 "24924A24B24C24D24E24F250251252253000102030405060708090A0B0C0D0EF"
116 "101112131415161718191A1B1C1D1E1F000102030405060708090A0B0C0D0E0F"
117 "202122232425262728292A2B2C2D2E2F000102030405060708090A0B0C0D0E0F"
118 "303132333435363738393A3B3C3D3E3F000102030405060708090A0B0C0D0E0F"
119 "404142434445464748494A4B4C4D4E4F000102030405060708090A0B0C0D0E0F"
120 "505152535455565758595A5B5C5D5E5F000102030405060708090A0B0C0D0E0F"
121 "606162636465666768696A6B6C6D6E6F000102030405060708090A0B0C0D0E0F"
122 "707172737475767778797A7B7C7D7E7F000102030405060708090A0B0C0D0E0F"
123 "808182838485868788898A8B8C8D8E8F000102030405060708090A0B0C0D0E0F"
124 "909192939495969798999A9B9C9D9E9F000102030405060708090A0B0C0D0E0F"
125 "10010110210310410510610710810910A000102030405060708090A0B0C0D0EF"
126 "10B10C10D10E10F110111112113114115000102030405060708090A0B0C0D0EF"
127 "11611711811911A11B11C11D11E11F120000102030405060708090A0B0C0D0EF"
128 "12112212312412512612712812912A12B000102030405060708090A0B0C0D0EF"
129 "12C12D12E12F130131132134135136137000102030405060708090A0B0C0D0EF"
130 "13813913A13B13C13D13E13F140141142000102030405060708090A0B0C0D0EF"
131 "14314414514614714814914A14B14C14D000102030405060708090A0B0C0D0EF"
132 "14E14F150151152153154155156157158000102030405060708090A0B0C0D0EF"
133 "15915A15B15C15D15E15F160161162163000102030405060708090A0B0C0D0EF"
134 "16416516616716816916A16B16C16D16E000102030405060708090A0B0C0D0EF"
135 "16F170171172173174175176177178179000102030405060708090A0B0C0D0EF"
136 "17A17B17C17D17E17F180181182183184000102030405060708090A0B0C0D0EF"
137 "18518618718818918A18B18C18D18E18F000102030405060708090A0B0C0D0EF"
138 "19019119219319419519619719819919A000102030405060708090A0B0C0D0EF"
139 "19B19C19D19E19F200201202203204205000102030405060708090A0B0C0D0EF"
140 "20620720820920A20B20C20D20F210211000102030405060708090A0B0C0D0EF"
141 "21221321421521621721821921A21B21C000102030405060708090A0B0C0D0EF"
142 "21D21E21F220221222223224225226227000102030405060708090A0B0C0D0EF"
143 "22822922A22B22C22D22E22F230231232000102030405060708090A0B0C0D0EF"
144 "23323423523623723823923A23B23C23D000102030405060708090A0B0C0D0EF"
145 "2366890FE1438751097E7F6325DC0E6326F"
146 "25425525625725825925A25B25C25D25E25F000102030405060708090A0B0C0F";
147
148 long g_nStatusDataSize = sizeof(STATUS_DATA);
149
150
151 // Thread function for Call Context Inserts
152
153
154 #ifdef NDB_WIN32
155
ConsoleCtrlHandler(DWORD dwCtrlType)156 BOOL WINAPI ConsoleCtrlHandler(DWORD dwCtrlType)
157 {
158 if(CTRL_C_EVENT == dwCtrlType)
159 {
160 SetEvent(hShutdownEvent);
161 return TRUE;
162 }
163 return FALSE;
164 }
165
166 #else
167
CtrlCHandler(int)168 void CtrlCHandler(int)
169 {
170 bShutdownEvent = true;
171 }
172
173 #endif
174
175
176
ReportNdbError(const char * szMsg,const NdbError & err)177 void ReportNdbError(const char* szMsg, const NdbError& err)
178 {
179 NdbMutex_Lock(g_pNdbMutexPrintf);
180 printf("%s: %d: %s\n", szMsg, err.code, (err.message ? err.message : ""));
181 NdbMutex_Unlock(g_pNdbMutexPrintf);
182 }
183
184
185 void
ReportCallsPerSecond(long nNumCallsProcessed,Uint64 tStartTime,Uint64 tEndTime)186 ReportCallsPerSecond(long nNumCallsProcessed,
187 Uint64 tStartTime,
188 Uint64 tEndTime)
189 {
190 Uint64 tElapsed = tEndTime - tStartTime;
191 long lCallsPerSec;
192 if(tElapsed>0)
193 lCallsPerSec = (long)((1000*nNumCallsProcessed)/tElapsed);
194 else
195 lCallsPerSec = 0;
196
197 NdbMutex_Lock(g_pNdbMutexPrintf);
198 printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)\n",
199 nNumCallsProcessed, (long)tElapsed, lCallsPerSec);
200 NdbMutex_Unlock(g_pNdbMutexPrintf);
201 }
202
203
204 #ifndef NDB_WIN32
InterlockedIncrement(long * lp)205 void InterlockedIncrement(long* lp) // expensive
206 {
207 NdbMutex_Lock(g_pNdbMutexIncrement);
208 (*lp)++;
209 NdbMutex_Unlock(g_pNdbMutexIncrement);
210 }
211 #endif
212
213
InterlockedIncrementAndReport(void)214 void InterlockedIncrementAndReport(void)
215 {
216 NdbMutex_Lock(g_pNdbMutexIncrement);
217 ++g_nNumCallsProcessed;
218 if((g_nNumCallsProcessed%1000)==0)
219 {
220 g_tEndTime = NdbTick_CurrentMillisecond();
221 if(g_tStartTime)
222 ReportCallsPerSecond(1000, g_tStartTime, g_tEndTime);
223
224 g_tStartTime = g_tEndTime;
225 }
226 NdbMutex_Unlock(g_pNdbMutexIncrement);
227 }
228
229
SleepOneCall(void)230 void SleepOneCall(void)
231 {
232 int iMillisecToSleep;
233 if(g_nMaxCallsPerSecond>0)
234 iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond;
235 else
236 iMillisecToSleep = 50;
237
238 if(iMillisecToSleep>0)
239 NdbSleep_MilliSleep(iMillisecToSleep);
240
241 }
242
243
244
QueryTransaction(Ndb * pNdb,long iContextId,long * piVersion,long * piLockFlag,long * piLockTime,long * piLockTimeUSec,char * pchContextData,NdbError & err)245 int QueryTransaction(Ndb* pNdb,
246 long iContextId,
247 long* piVersion,
248 long* piLockFlag,
249 long* piLockTime,
250 long* piLockTimeUSec,
251 char* pchContextData,
252 NdbError& err)
253 {
254 int iRes = -1;
255 NdbConnection* pNdbConnection = pNdb->startTransaction();
256 //0, (const char*)&iContextId, 4);
257 if(pNdbConnection)
258 {
259 NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
260 if(pNdbOperation)
261 {
262 NdbRecAttr* pNdbRecAttrVersion;
263 NdbRecAttr* pNdbRecAttrLockFlag;
264 NdbRecAttr* pNdbRecAttrLockTime;
265 NdbRecAttr* pNdbRecAttrLockTimeUSec;
266 NdbRecAttr* pNdbRecAttrContextData;
267 if(!pNdbOperation->readTuple()
268 && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)
269 && (pNdbRecAttrVersion=pNdbOperation->getValue(c_szVersion, (char*)piVersion))
270 && (pNdbRecAttrLockFlag=pNdbOperation->getValue(c_szLockFlag, (char*)piLockFlag))
271 && (pNdbRecAttrLockTime=pNdbOperation->getValue(c_szLockTime, (char*)piLockTime))
272 && (pNdbRecAttrLockTimeUSec=pNdbOperation->getValue(c_szLockTimeUSec, (char*)piLockTimeUSec))
273 && (pNdbRecAttrContextData=pNdbOperation->getValue(c_szContextData, pchContextData)))
274 {
275 if(!pNdbConnection->execute(Commit))
276 iRes = 0;
277 else
278 err = pNdbConnection->getNdbError();
279 }
280 else
281 err = pNdbOperation->getNdbError();
282 }
283 else
284 err = pNdbConnection->getNdbError();
285
286 pNdb->closeTransaction(pNdbConnection);
287 }
288 else
289 err = pNdb->getNdbError();
290
291 return iRes;
292 }
293
294
RetryQueryTransaction(Ndb * pNdb,long iContextId,long * piVersion,long * piLockFlag,long * piLockTime,long * piLockTimeUSec,char * pchContextData,NdbError & err,int & nRetry)295 int RetryQueryTransaction(Ndb* pNdb,
296 long iContextId,
297 long* piVersion,
298 long* piLockFlag,
299 long* piLockTime,
300 long* piLockTimeUSec,
301 char* pchContextData,
302 NdbError& err,
303 int& nRetry)
304 {
305 int iRes = -1;
306 nRetry = 0;
307 bool bRetry = true;
308 while(bRetry && nRetry<g_nMaxRetry)
309 {
310 if(!QueryTransaction(pNdb, iContextId, piVersion, piLockFlag,
311 piLockTime, piLockTimeUSec, pchContextData, err))
312 {
313 iRes = 0;
314 bRetry = false;
315 }
316 else
317 {
318 switch(err.status)
319 {
320 case NdbError::TemporaryError:
321 case NdbError::UnknownResult:
322 SleepOneCall();
323 ++nRetry;
324 break;
325
326 case NdbError::PermanentError:
327 default:
328 bRetry = false;
329 break;
330 }
331 }
332 }
333 return iRes;
334 }
335
336
DeleteTransaction(Ndb * pNdb,long iContextId,NdbError & err)337 int DeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err)
338 {
339 int iRes = -1;
340 NdbConnection* pNdbConnection = pNdb->startTransaction();
341 //0, (const char*)&iContextId, 4);
342 if(pNdbConnection)
343 {
344 NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
345 if(pNdbOperation)
346 {
347 if(!pNdbOperation->deleteTuple()
348 && !pNdbOperation->equal(c_szContextId, (Int32)iContextId))
349 {
350 if(pNdbConnection->execute(Commit) == 0)
351 iRes = 0;
352 else
353 err = pNdbConnection->getNdbError();
354 }
355 else
356 err = pNdbOperation->getNdbError();
357 }
358 else
359 err = pNdbConnection->getNdbError();
360
361 pNdb->closeTransaction(pNdbConnection);
362 }
363 else
364 err = pNdb->getNdbError();
365
366 return iRes;
367 }
368
369
370
RetryDeleteTransaction(Ndb * pNdb,long iContextId,NdbError & err,int & nRetry)371 int RetryDeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry)
372 {
373 int iRes = -1;
374 nRetry = 0;
375 bool bRetry = true;
376 bool bUnknown = false;
377 while(bRetry && nRetry<g_nMaxRetry)
378 {
379 if(!DeleteTransaction(pNdb, iContextId, err))
380 {
381 iRes = 0;
382 bRetry = false;
383 }
384 else
385 {
386 switch(err.status)
387 {
388 case NdbError::UnknownResult:
389 bUnknown = true;
390 ++nRetry;
391 break;
392
393 case NdbError::TemporaryError:
394 bUnknown = false;
395 SleepOneCall();
396 ++nRetry;
397 break;
398
399 case NdbError::PermanentError:
400 if(err.code==626 && bUnknown)
401 iRes = 0;
402 bRetry = false;
403 break;
404
405 default:
406 bRetry = false;
407 break;
408 }
409 }
410 }
411 return iRes;
412 }
413
414
415
InsertTransaction(Ndb * pNdb,long iContextID,long iVersion,long iLockFlag,long iLockTime,long iLockTimeUSec,const char * pchContextData,NdbError & err)416 int InsertTransaction(Ndb* pNdb,
417 long iContextID,
418 long iVersion,
419 long iLockFlag,
420 long iLockTime,
421 long iLockTimeUSec,
422 const char* pchContextData,
423 NdbError& err)
424 {
425 int iRes = -1;
426 NdbConnection* pNdbConnection = pNdb->startTransaction();
427 //0, (const char*)&iContextID, 4);
428 if(pNdbConnection)
429 {
430 NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
431 if(pNdbOperation)
432 {
433 if(!(g_bWriteTuple ? pNdbOperation->writeTuple() : pNdbOperation->insertTuple())
434 && !pNdbOperation->equal(c_szContextId, (Int32)iContextID)
435 && !pNdbOperation->setValue(c_szVersion, (Int32)iVersion)
436 && !pNdbOperation->setValue(c_szLockFlag, (Int32)iLockFlag)
437 && !pNdbOperation->setValue(c_szLockTime, (Int32)iLockTime)
438 && !pNdbOperation->setValue(c_szLockTimeUSec, (Int32)iLockTimeUSec)
439 && !pNdbOperation->setValue(c_szContextData, pchContextData, g_nStatusDataSize))
440 {
441 if(!pNdbConnection->execute(Commit))
442 iRes = 0;
443 else
444 err = pNdbConnection->getNdbError();
445 }
446 else
447 err = pNdbOperation->getNdbError();
448 }
449 else
450 err = pNdbConnection->getNdbError();
451
452 pNdb->closeTransaction(pNdbConnection);
453 }
454 else
455 err = pNdb->getNdbError();
456
457 return iRes;
458 }
459
460
461
RetryInsertTransaction(Ndb * pNdb,long iContextId,long iVersion,long iLockFlag,long iLockTime,long iLockTimeUSec,const char * pchContextData,NdbError & err,int & nRetry)462 int RetryInsertTransaction(Ndb* pNdb,
463 long iContextId,
464 long iVersion,
465 long iLockFlag,
466 long iLockTime,
467 long iLockTimeUSec,
468 const char* pchContextData,
469 NdbError& err, int& nRetry)
470 {
471 int iRes = -1;
472 nRetry = 0;
473 bool bRetry = true;
474 bool bUnknown = false;
475 while(bRetry && nRetry<g_nMaxRetry)
476 {
477 if(!InsertTransaction(pNdb, iContextId, iVersion, iLockFlag,
478 iLockTime, iLockTimeUSec, pchContextData, err))
479 {
480 iRes = 0;
481 bRetry = false;
482 }
483 else
484 {
485 switch(err.status)
486 {
487 case NdbError::UnknownResult:
488 bUnknown = true;
489 ++nRetry;
490 break;
491
492 case NdbError::TemporaryError:
493 bUnknown = false;
494 SleepOneCall();
495 ++nRetry;
496 break;
497
498 case NdbError::PermanentError:
499 if(err.code==630 && bUnknown)
500 iRes = 0;
501 bRetry = false;
502 break;
503
504 default:
505 bRetry = false;
506 break;
507 }
508 }
509 }
510 return iRes;
511 }
512
513
UpdateTransaction(Ndb * pNdb,long iContextId,NdbError & err)514 int UpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err)
515 {
516 int iRes = -1;
517 NdbConnection* pNdbConnection = pNdb->startTransaction();
518 //0, (const char*)&iContextId, 4);
519 if(pNdbConnection)
520 {
521 NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
522 if(pNdbOperation)
523 {
524 if(!pNdbOperation->updateTuple()
525 && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)
526 && !pNdbOperation->setValue(c_szContextData, STATUS_DATA, g_nStatusDataSize))
527 {
528 if(!pNdbConnection->execute(Commit))
529 iRes = 0;
530 else
531 err = pNdbConnection->getNdbError();
532 }
533 else
534 err = pNdbOperation->getNdbError();
535 }
536 else
537 err = pNdbConnection->getNdbError();
538
539 pNdb->closeTransaction(pNdbConnection);
540 }
541 else
542 err = pNdb->getNdbError();
543
544 return iRes;
545 }
546
547
RetryUpdateTransaction(Ndb * pNdb,long iContextId,NdbError & err,int & nRetry)548 int RetryUpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry)
549 {
550 int iRes = -1;
551 nRetry = 0;
552 bool bRetry = true;
553 while(bRetry && nRetry<g_nMaxRetry)
554 {
555 if(!UpdateTransaction(pNdb, iContextId, err))
556 {
557 iRes = 0;
558 bRetry = false;
559 }
560 else
561 {
562 switch(err.status)
563 {
564 case NdbError::TemporaryError:
565 case NdbError::UnknownResult:
566 SleepOneCall();
567 ++nRetry;
568 break;
569
570 case NdbError::PermanentError:
571 default:
572 bRetry = false;
573 break;
574 }
575 }
576 }
577 return iRes;
578 }
579
580
581
InsertInitialRecords(Ndb * pNdb,long nInsert,long nSeed)582 int InsertInitialRecords(Ndb* pNdb, long nInsert, long nSeed)
583 {
584 int iRes = -1;
585 char szMsg[100];
586 for(long i=0; i<nInsert; ++i)
587 {
588 int iContextID = i+nSeed;
589 int nRetry = 0;
590 NdbError err;
591 memset(&err, 0, sizeof(err));
592 Uint64 tStartTrans = NdbTick_CurrentMillisecond();
593 iRes = RetryInsertTransaction(pNdb, iContextID, nSeed, iContextID,
594 (long)(tStartTrans/1000), (long)((tStartTrans%1000)*1000),
595 STATUS_DATA, err, nRetry);
596 Uint64 tEndTrans = NdbTick_CurrentMillisecond();
597 long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
598 if(nRetry>0)
599 {
600 sprintf(szMsg, "insert retried %d times, time %ld msec.",
601 nRetry, lMillisecForThisTrans);
602 ReportNdbError(szMsg, err);
603 }
604 if(iRes)
605 {
606 ReportNdbError("Insert initial record failed", err);
607 return iRes;
608 }
609 InterlockedIncrement(&g_nNumberOfInitialInsert);
610 }
611 return iRes;
612 }
613
614
615
VerifyInitialRecords(Ndb * pNdb,long nVerify,long nSeed)616 int VerifyInitialRecords(Ndb* pNdb, long nVerify, long nSeed)
617 {
618 int iRes = -1;
619 char* pchContextData = new char[g_nStatusDataSize];
620 char szMsg[100];
621 long iPrevLockTime = -1;
622 long iPrevLockTimeUSec = -1;
623 for(long i=0; i<nVerify; ++i)
624 {
625 int iContextID = i+nSeed;
626 long iVersion = 0;
627 long iLockFlag = 0;
628 long iLockTime = 0;
629 long iLockTimeUSec = 0;
630 int nRetry = 0;
631 NdbError err;
632 memset(&err, 0, sizeof(err));
633 Uint64 tStartTrans = NdbTick_CurrentMillisecond();
634 iRes = RetryQueryTransaction(pNdb, iContextID, &iVersion, &iLockFlag,
635 &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry);
636 Uint64 tEndTrans = NdbTick_CurrentMillisecond();
637 long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
638 if(nRetry>0)
639 {
640 sprintf(szMsg, "verify retried %d times, time %ld msec.",
641 nRetry, lMillisecForThisTrans);
642 ReportNdbError(szMsg, err);
643 }
644 if(iRes)
645 {
646 ReportNdbError("Read initial record failed", err);
647 delete[] pchContextData;
648 return iRes;
649 }
650 if(memcmp(pchContextData, STATUS_DATA, g_nStatusDataSize))
651 {
652 sprintf(szMsg, "wrong context data in tuple %d", iContextID);
653 ReportNdbError(szMsg, err);
654 delete[] pchContextData;
655 return -1;
656 }
657 if(iVersion!=nSeed
658 || iLockFlag!=iContextID
659 || iLockTime<iPrevLockTime
660 || (iLockTime==iPrevLockTime && iLockTimeUSec<iPrevLockTimeUSec))
661 {
662 sprintf(szMsg, "wrong call data in tuple %d", iContextID);
663 ReportNdbError(szMsg, err);
664 delete[] pchContextData;
665 return -1;
666 }
667 iPrevLockTime = iLockTime;
668 iPrevLockTimeUSec = iLockTimeUSec;
669 InterlockedIncrement(&g_nNumberOfInitialVerify);
670 }
671 delete[] pchContextData;
672 return iRes;
673 }
674
675
676
677
678
RuntimeCallContext(void * lpParam)679 void* RuntimeCallContext(void* lpParam)
680 {
681 long nNumCallsProcessed = 0;
682 int nStartingRecordID = *(int*)lpParam;
683
684 Ndb* pNdb;
685 char* pchContextData = new char[g_nStatusDataSize];
686 char szMsg[100];
687
688 int iRes;
689 const char* szOp;
690 long iVersion;
691 long iLockFlag;
692 long iLockTime;
693 long iLockTimeUSec;
694
695 pNdb = new Ndb(theConnection, "TEST_DB");
696 if(!pNdb)
697 {
698 NdbMutex_Lock(g_pNdbMutexPrintf);
699 printf("new Ndb failed\n");
700 NdbMutex_Unlock(g_pNdbMutexPrintf);
701 delete[] pchContextData;
702 return 0;
703 }
704
705 if(pNdb->init(1) || pNdb->waitUntilReady())
706 {
707 ReportNdbError("init of Ndb failed", pNdb->getNdbError());
708 delete pNdb;
709 delete[] pchContextData;
710 return 0;
711 }
712
713 if(g_bInsertInitial)
714 {
715 if(InsertInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread))
716 {
717 delete pNdb;
718 delete[] pchContextData;
719 return 0;
720 }
721 }
722
723 if(g_bVerifyInitial)
724 {
725 NdbError err;
726 memset(&err, 0, sizeof(err));
727 if(VerifyInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread))
728 {
729 delete pNdb;
730 delete[] pchContextData;
731 return 0;
732 }
733 }
734 if(g_bInsertInitial || g_bVerifyInitial)
735 {
736 delete[] pchContextData;
737 return 0;
738 }
739
740 long nContextID = nStartingRecordID;
741 #ifdef NDB_WIN32
742 while(WaitForSingleObject(hShutdownEvent,0) != WAIT_OBJECT_0)
743 #else
744 while(!bShutdownEvent)
745 #endif
746 {
747 ++nContextID;
748 nContextID %= g_nMaxContextIdPerThread;
749 nContextID += nStartingRecordID;
750
751 bool bTimeLatency = (nContextID==100);
752
753 Uint64 tStartCall = NdbTick_CurrentMillisecond();
754 for (int i=0; i < 20; i++)
755 {
756 int nRetry = 0;
757 NdbError err;
758 memset(&err, 0, sizeof(err));
759 Uint64 tStartTrans = NdbTick_CurrentMillisecond();
760 switch(i)
761 {
762 case 3:
763 case 6:
764 case 9:
765 case 11:
766 case 12:
767 case 15:
768 case 18: // Query Record
769 szOp = "Read";
770 iRes = RetryQueryTransaction(pNdb, nContextID, &iVersion, &iLockFlag,
771 &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry);
772 break;
773
774 case 19: // Delete Record
775 szOp = "Delete";
776 iRes = RetryDeleteTransaction(pNdb, nContextID, err, nRetry);
777 break;
778
779 case 0: // Insert Record
780 szOp = "Insert";
781 iRes = RetryInsertTransaction(pNdb, nContextID, 1, 1, 1, 1, STATUS_DATA, err, nRetry);
782 break;
783
784 default: // Update Record
785 szOp = "Update";
786 iRes = RetryUpdateTransaction(pNdb, nContextID, err, nRetry);
787 break;
788 }
789 Uint64 tEndTrans = NdbTick_CurrentMillisecond();
790 long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
791
792 if(g_bReport)
793 {
794 require(lMillisecForThisTrans>=0 && lMillisecForThisTrans<c_nMaxMillisecForAllTrans);
795 InterlockedIncrement(g_plCountMillisecForTrans+lMillisecForThisTrans);
796 }
797
798 if(nRetry>0)
799 {
800 sprintf(szMsg, "%s retried %d times, time %ld msec.",
801 szOp, nRetry, lMillisecForThisTrans);
802 ReportNdbError(szMsg, err);
803 }
804 else if(bTimeLatency)
805 {
806 NdbMutex_Lock(g_pNdbMutexPrintf);
807 printf("%s = %ld msec.\n", szOp, lMillisecForThisTrans);
808 NdbMutex_Unlock(g_pNdbMutexPrintf);
809 }
810
811 if(iRes)
812 {
813 sprintf(szMsg, "%s failed after %ld calls, terminating thread",
814 szOp, nNumCallsProcessed);
815 ReportNdbError(szMsg, err);
816 delete pNdb;
817 delete[] pchContextData;
818 return 0;
819 }
820 }
821 Uint64 tEndCall = NdbTick_CurrentMillisecond();
822 long lMillisecForThisCall = (long)(tEndCall-tStartCall);
823
824 if(g_bReport)
825 {
826 require(lMillisecForThisCall>=0 && lMillisecForThisCall<c_nMaxMillisecForAllCall);
827 InterlockedIncrement(g_plCountMillisecForCall+lMillisecForThisCall);
828 }
829
830 if(bTimeLatency)
831 {
832 NdbMutex_Lock(g_pNdbMutexPrintf);
833 printf("Total time for call is %ld msec.\n", (long)lMillisecForThisCall);
834 NdbMutex_Unlock(g_pNdbMutexPrintf);
835 }
836
837 nNumCallsProcessed++;
838 InterlockedIncrementAndReport();
839 if(g_nMaxCallsPerSecond>0)
840 {
841 int iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond;
842 iMillisecToSleep -= lMillisecForThisCall;
843 if(iMillisecToSleep>0)
844 {
845 NdbSleep_MilliSleep(iMillisecToSleep);
846 }
847 }
848 }
849
850 NdbMutex_Lock(g_pNdbMutexPrintf);
851 printf("Terminating thread after %ld calls\n", nNumCallsProcessed);
852 NdbMutex_Unlock(g_pNdbMutexPrintf);
853
854 delete pNdb;
855 delete[] pchContextData;
856 return 0;
857 }
858
859
CreateCallContextTable(Ndb * pNdb,const char * szTableName,bool bStored)860 int CreateCallContextTable(Ndb* pNdb, const char* szTableName, bool bStored)
861 {
862 int iRes = -1;
863 NdbError err;
864 memset(&err, 0, sizeof(err));
865
866 NdbSchemaCon* pNdbSchemaCon = NdbSchemaCon::startSchemaTrans(pNdb);
867 if(pNdbSchemaCon)
868 {
869 NdbSchemaOp* pNdbSchemaOp = pNdbSchemaCon->getNdbSchemaOp();
870 if(pNdbSchemaOp)
871 {
872 if(!pNdbSchemaOp->createTable(szTableName, 8, TupleKey, 2,
873 All, 6, 78, 80, 1, bStored)
874 && !pNdbSchemaOp->createAttribute(c_szContextId, TupleKey, 32, 1, Signed)
875 && !pNdbSchemaOp->createAttribute(c_szVersion, NoKey, 32, 1, Signed)
876 && !pNdbSchemaOp->createAttribute(c_szLockFlag, NoKey, 32, 1, Signed)
877 && !pNdbSchemaOp->createAttribute(c_szLockTime, NoKey, 32, 1, Signed)
878 && !pNdbSchemaOp->createAttribute(c_szLockTimeUSec, NoKey, 32, 1, Signed)
879 && !pNdbSchemaOp->createAttribute(c_szContextData, NoKey, 8, g_nStatusDataSize, String))
880 {
881 if(!pNdbSchemaCon->execute())
882 iRes = 0;
883 else
884 err = pNdbSchemaCon->getNdbError();
885 }
886 else
887 err = pNdbSchemaOp->getNdbError();
888 }
889 else
890 err = pNdbSchemaCon->getNdbError();
891
892 NdbSchemaCon::closeSchemaTrans(pNdbSchemaCon);
893 }
894 else
895 err = pNdb->getNdbError();
896
897 if(iRes)
898 {
899 ReportNdbError("create call context table failed", err);
900 }
901 return iRes;
902 }
903
904
905
ReportResponseTimeStatistics(const char * szStat,long * plCount,const long lSize)906 void ReportResponseTimeStatistics(const char* szStat, long* plCount, const long lSize)
907 {
908 long lCount = 0;
909 Int64 llSum = 0;
910 Int64 llSum2 = 0;
911 long lMin = -1;
912 long lMax = -1;
913
914 for(long l=0; l<lSize; ++l)
915 {
916 if(plCount[l]>0)
917 {
918 lCount += plCount[l];
919 llSum += (Int64)l*(Int64)plCount[l];
920 llSum2 += (Int64)l*(Int64)l*(Int64)plCount[l];
921 if(lMin==-1 || l<lMin)
922 {
923 lMin = l;
924 }
925 if(lMax==-1 || l>lMax)
926 {
927 lMax = l;
928 }
929 }
930 }
931
932 long lAvg = long(llSum/lCount);
933 double dblVar = ((double)lCount*(double)llSum2 - (double)llSum*(double)llSum)/((double)lCount*(double)(lCount-1));
934 long lStd = long(sqrt(dblVar));
935
936 long lMed = -1;
937 long l95 = -1;
938 long lSel = -1;
939 for(long l=lMin; l<=lMax; ++l)
940 {
941 if(plCount[l]>0)
942 {
943 lSel += plCount[l];
944 if(lMed==-1 && lSel>=(lCount/2))
945 {
946 lMed = l;
947 }
948 if(l95==-1 && lSel>=((lCount*95)/100))
949 {
950 l95 = l;
951 }
952 if(g_bReportPlus)
953 {
954 printf("%ld\t%ld\n", l, plCount[l]);
955 }
956 }
957 }
958
959 printf("%s: Count=%ld, Min=%ld, Max=%ld, Avg=%ld, Std=%ld, Med=%ld, 95%%=%ld\n",
960 szStat, lCount, lMin, lMax, lAvg, lStd, lMed, l95);
961 }
962
963
964
ShowHelp(const char * szCmd)965 void ShowHelp(const char* szCmd)
966 {
967 printf("%s -t<threads> [-s<seed>] [-b<batch>] [-c<maxcps>] [-m<size>] [-d] [-i] [-v] [-f] [-w] [-r[+]]\n", szCmd);
968 printf("%s -?\n", szCmd);
969 puts("-d\t\tcreate the table");
970 puts("-i\t\tinsert initial records");
971 puts("-v\t\tverify initial records");
972 puts("-t<threads>\tnumber of threads making calls");
973 puts("-s<seed>\toffset for primary key");
974 puts("-b<batch>\tbatch size per thread");
975 puts("-c<maxcps>\tmax number of calls per second for this process");
976 puts("-m<size>\tsize of context data");
977 puts("-f\t\tno checkpointing and no logging");
978 puts("-w\t\tuse writeTuple instead of insertTuple");
979 puts("-r\t\treport response time statistics");
980 puts("-r+\t\treport response time distribution");
981 puts("-?\t\thelp");
982 }
983
984
main(int argc,char * argv[])985 int main(int argc, char* argv[])
986 {
987 ndb_init();
988 g_nNumThreads = 0;
989 g_nMaxCallsPerSecond = 0;
990 long nSeed = 0;
991 bool bStoredTable = true;
992 bool bCreateTable = false;
993 g_bWriteTuple = false;
994 g_bReport = false;
995 g_bReportPlus = false;
996
997 for(int i=1; i<argc; ++i)
998 {
999 if(argv[i][0]=='-' || argv[i][0]=='/')
1000 {
1001 switch(argv[i][1])
1002 {
1003 case 't':
1004 g_nNumThreads = atol(argv[i]+2);
1005 break;
1006 case 's':
1007 nSeed = atol(argv[i]+2);
1008 break;
1009 case 'b':
1010 g_nMaxContextIdPerThread = atol(argv[i]+2);
1011 break;
1012 case 'm':
1013 g_nStatusDataSize = atol(argv[i]+2);
1014 if(g_nStatusDataSize> (int) sizeof(STATUS_DATA))
1015 {
1016 g_nStatusDataSize = sizeof(STATUS_DATA);
1017 }
1018 break;
1019 case 'i':
1020 g_bInsertInitial = true;
1021 break;
1022 case 'v':
1023 g_bVerifyInitial = true;
1024 break;
1025 case 'd':
1026 bCreateTable = true;
1027 break;
1028 case 'f':
1029 bStoredTable = false;
1030 break;
1031 case 'w':
1032 g_bWriteTuple = true;
1033 break;
1034 case 'r':
1035 g_bReport = true;
1036 if(argv[i][2]=='+')
1037 {
1038 g_bReportPlus = true;
1039 }
1040 break;
1041 case 'c':
1042 g_nMaxCallsPerSecond = atol(argv[i]+2);
1043 break;
1044 case '?':
1045 default:
1046 ShowHelp(argv[0]);
1047 return -1;
1048 }
1049 }
1050 else
1051 {
1052 ShowHelp(argv[0]);
1053 return -1;
1054 }
1055 }
1056 if(bCreateTable)
1057 puts("-d\tcreate the table");
1058 if(g_bInsertInitial)
1059 printf("-i\tinsert initial records\n");
1060 if(g_bVerifyInitial)
1061 printf("-v\tverify initial records\n");
1062 if(g_nNumThreads>0)
1063 printf("-t%ld\tnumber of threads making calls\n", g_nNumThreads);
1064 if(g_nNumThreads>0)
1065 {
1066 printf("-s%ld\toffset for primary key\n", nSeed);
1067 printf("-b%ld\tbatch size per thread\n", g_nMaxContextIdPerThread);
1068 }
1069 if(g_nMaxCallsPerSecond>0)
1070 printf("-c%ld\tmax number of calls per second for this process\n", g_nMaxCallsPerSecond);
1071 if(!bStoredTable)
1072 puts("-f\tno checkpointing and no logging to disk");
1073 if(g_bWriteTuple)
1074 puts("-w\tuse writeTuple instead of insertTuple");
1075 if(g_bReport)
1076 puts("-r\treport response time statistics");
1077 if(g_bReportPlus)
1078 puts("-r+\treport response time distribution");
1079
1080 if(!bCreateTable && g_nNumThreads<=0)
1081 {
1082 ShowHelp(argv[0]);
1083 return -1;
1084 }
1085 printf("-m%ld\tsize of context data\n", g_nStatusDataSize);
1086
1087 g_szTableName = (bStoredTable ? c_szTableNameStored : c_szTableNameTemp);
1088
1089 #ifdef NDB_WIN32
1090 SetConsoleCtrlHandler(ConsoleCtrlHandler, true);
1091 #else
1092 signal(SIGINT, CtrlCHandler);
1093 #endif
1094
1095 if(g_bReport)
1096 {
1097 g_plCountMillisecForCall = new long[c_nMaxMillisecForAllCall];
1098 memset(g_plCountMillisecForCall, 0, c_nMaxMillisecForAllCall*sizeof(long));
1099 g_plCountMillisecForTrans = new long[c_nMaxMillisecForAllTrans];
1100 memset(g_plCountMillisecForTrans, 0, c_nMaxMillisecForAllTrans*sizeof(long));
1101 }
1102
1103 g_pNdbMutexIncrement = NdbMutex_Create();
1104 g_pNdbMutexPrintf = NdbMutex_Create();
1105 #ifdef NDB_WIN32
1106 hShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
1107 #endif
1108
1109 theConnection= new Ndb_cluster_connection();
1110 if (theConnection->connect(12, 5, 1) != 0)
1111 {
1112 ndbout << "Unable to connect to managment server." << endl;
1113 return -1;
1114 }
1115 if (theConnection->wait_until_ready(30,0) < 0)
1116 {
1117 ndbout << "Cluster nodes not ready in 30 seconds." << endl;
1118 return -1;
1119 }
1120
1121 Ndb* pNdb = new Ndb(theConnection, c_szDatabaseName);
1122 if(!pNdb)
1123 {
1124 printf("could not construct ndb\n");
1125 return 1;
1126 }
1127
1128 if(pNdb->init(1) || pNdb->waitUntilReady())
1129 {
1130 ReportNdbError("could not initialize ndb\n", pNdb->getNdbError());
1131 delete pNdb;
1132 return 2;
1133 }
1134
1135 if(bCreateTable)
1136 {
1137 printf("Create CallContext table\n");
1138 if (bStoredTable)
1139 {
1140 if (CreateCallContextTable(pNdb, c_szTableNameStored, true))
1141 {
1142 printf("Create table failed\n");
1143 delete pNdb;
1144 return 3;
1145 }
1146 }
1147 else
1148 {
1149 if (CreateCallContextTable(pNdb, c_szTableNameTemp, false))
1150 {
1151 printf("Create table failed\n");
1152 delete pNdb;
1153 return 3;
1154 }
1155 }
1156 }
1157
1158 if(g_nNumThreads>0)
1159 {
1160 printf("creating %d threads\n", (int)g_nNumThreads);
1161 if(g_bInsertInitial)
1162 {
1163 printf("each thread will insert %ld initial records, total %ld inserts\n",
1164 g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread);
1165 }
1166 if(g_bVerifyInitial)
1167 {
1168 printf("each thread will verify %ld initial records, total %ld reads\n",
1169 g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread);
1170 }
1171
1172 g_nNumberOfInitialInsert = 0;
1173 g_nNumberOfInitialVerify = 0;
1174
1175 Uint64 tStartTime = NdbTick_CurrentMillisecond();
1176 NdbThread* pThreads[256];
1177 int pnStartingRecordNum[256];
1178 int ij;
1179 for(ij=0;ij<g_nNumThreads;ij++)
1180 {
1181 pnStartingRecordNum[ij] = (ij*g_nMaxContextIdPerThread) + nSeed;
1182 }
1183
1184 for(ij=0;ij<g_nNumThreads;ij++)
1185 {
1186 pThreads[ij] = NdbThread_Create(RuntimeCallContext,
1187 (void**)(pnStartingRecordNum+ij),
1188 0, "RuntimeCallContext", NDB_THREAD_PRIO_LOW);
1189 }
1190
1191 //Wait for the threads to finish
1192 for(ij=0;ij<g_nNumThreads;ij++)
1193 {
1194 void* status;
1195 NdbThread_WaitFor(pThreads[ij], &status);
1196 }
1197 Uint64 tEndTime = NdbTick_CurrentMillisecond();
1198
1199 //Print time taken
1200 printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)\n",
1201 g_nNumCallsProcessed,
1202 (long)(tEndTime-tStartTime),
1203 (long)((1000*g_nNumCallsProcessed)/(tEndTime-tStartTime)));
1204
1205 if(g_bInsertInitial)
1206 printf("successfully inserted %ld tuples\n", g_nNumberOfInitialInsert);
1207 if(g_bVerifyInitial)
1208 printf("successfully verified %ld tuples\n", g_nNumberOfInitialVerify);
1209 }
1210
1211 delete pNdb;
1212
1213 #ifdef NDB_WIN32
1214 CloseHandle(hShutdownEvent);
1215 #endif
1216 NdbMutex_Destroy(g_pNdbMutexIncrement);
1217 NdbMutex_Destroy(g_pNdbMutexPrintf);
1218
1219 if(g_bReport)
1220 {
1221 ReportResponseTimeStatistics("Calls", g_plCountMillisecForCall, c_nMaxMillisecForAllCall);
1222 ReportResponseTimeStatistics("Transactions", g_plCountMillisecForTrans, c_nMaxMillisecForAllTrans);
1223
1224 delete[] g_plCountMillisecForCall;
1225 delete[] g_plCountMillisecForTrans;
1226 }
1227
1228 return 0;
1229 }
1230
1231