1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include <NdbApi.hpp>
28 #include <NdbSchemaCon.hpp>
29 #include <NdbCondition.h>
30 #include <NdbMutex.h>
31 #include <NdbSleep.h>
32 #include <NdbThread.h>
33 #include <NdbTick.h>
34 #include <NdbOut.hpp>
35 
36 const char* const c_szDatabaseName = "TEST_DB";
37 
38 const char* const c_szTableNameStored = "CCStored";
39 const char* const c_szTableNameTemp = "CCTemp";
40 
41 const char* const c_szContextId = "ContextId";
42 const char* const c_szVersion = "Version";
43 const char* const c_szLockFlag = "LockFlag";
44 const char* const c_szLockTime = "LockTime";
45 const char* const c_szLockTimeUSec = "LockTimeUSec";
46 const char* const c_szContextData = "ContextData";
47 
48 const char* g_szTableName = c_szTableNameStored;
49 
50 
51 #ifdef NDB_WIN32
52 HANDLE hShutdownEvent = 0;
53 #else
54 bool bShutdownEvent = false;
55 #endif
56 long g_nMaxContextIdPerThread = 5000;
57 long g_nNumThreads = 0;
58 long g_nMaxCallsPerSecond = 0;
59 long g_nMaxRetry = 50;
60 bool g_bWriteTuple = false;
61 bool g_bInsertInitial = false;
62 bool g_bVerifyInitial = false;
63 
64 Ndb_cluster_connection* theConnection = 0;
65 NdbMutex* g_pNdbMutexPrintf = 0;
66 NdbMutex* g_pNdbMutexIncrement = 0;
67 long g_nNumCallsProcessed = 0;
68 Uint64 g_tStartTime = 0;
69 Uint64 g_tEndTime = 0;
70 
71 long g_nNumberOfInitialInsert = 0;
72 long g_nNumberOfInitialVerify = 0;
73 
74 const long c_nMaxMillisecForAllCall = 5000;
75 long* g_plCountMillisecForCall = 0;
76 const long c_nMaxMillisecForAllTrans = 5000;
77 long* g_plCountMillisecForTrans = 0;
78 bool g_bReport = false;
79 bool g_bReportPlus = false;
80 
81 
82 // data for CALL_CONTEXT and GROUP_RESOURCE
83 static char STATUS_DATA[]=
84 "000102030405060708090A0B0C0D0E0F000102030405060708090A0B0C0D0E0F"
85 "101112131415161718191A1B1C1D1E1F000102030405060708090A0B0C0D0E0F"
86 "202122232425262728292A2B2C2D2E2F000102030405060708090A0B0C0D0E0F"
87 "303132333435363738393A3B3C3D3E3F000102030405060708090A0B0C0D0E0F"
88 "404142434445464748494A4B4C4D4E4F000102030405060708090A0B0C0D0E0F"
89 "505152535455565758595A5B5C5D5E5F000102030405060708090A0B0C0D0E0F"
90 "606162636465666768696A6B6C6D6E6F000102030405060708090A0B0C0D0E0F"
91 "707172737475767778797A7B7C7D7E7F000102030405060708090A0B0C0D0E0F"
92 "808182838485868788898A8B8C8D8E8F000102030405060708090A0B0C0D0E0F"
93 "909192939495969798999A9B9C9D9E9F000102030405060708090A0B0C0D0E0F"
94 "10010110210310410510610710810910A000102030405060708090A0B0C0D0EF"
95 "10B10C10D10E10F110111112113114115000102030405060708090A0B0C0D0EF"
96 "11611711811911A11B11C11D11E11F120000102030405060708090A0B0C0D0EF"
97 "12112212312412512612712812912A12B000102030405060708090A0B0C0D0EF"
98 "12C12D12E12F130131132134135136137000102030405060708090A0B0C0D0EF"
99 "13813913A13B13C13D13E13F140141142000102030405060708090A0B0C0D0EF"
100 "14314414514614714814914A14B14C14D000102030405060708090A0B0C0D0EF"
101 "14E14F150151152153154155156157158000102030405060708090A0B0C0D0EF"
102 "15915A15B15C15D15E15F160161162163000102030405060708090A0B0C0D0EF"
103 "16416516616716816916A16B16C16D16E000102030405060708090A0B0C0D0EF"
104 "16F170171172173174175176177178179000102030405060708090A0B0C0D0EF"
105 "17A17B17C17D17E17F180181182183184000102030405060708090A0B0C0D0EF"
106 "18518618718818918A18B18C18D18E18F000102030405060708090A0B0C0D0EF"
107 "19019119219319419519619719819919A000102030405060708090A0B0C0D0EF"
108 "19B19C19D19E19F200201202203204205000102030405060708090A0B0C0D0EF"
109 "20620720820920A20B20C20D20F210211000102030405060708090A0B0C0D0EF"
110 "21221321421521621721821921A21B21C000102030405060708090A0B0C0D0EF"
111 "21D21E21F220221222223224225226227000102030405060708090A0B0C0D0EF"
112 "22822922A22B22C22D22E22F230231232000102030405060708090A0B0C0D0EF"
113 "23323423523623723823923A23B23C23D000102030405060708090A0B0C0D0EF"
114 "23E23F240241242243244245246247248000102030405060708090A0B0C0D0EF"
115 "24924A24B24C24D24E24F250251252253000102030405060708090A0B0C0D0EF"
116 "101112131415161718191A1B1C1D1E1F000102030405060708090A0B0C0D0E0F"
117 "202122232425262728292A2B2C2D2E2F000102030405060708090A0B0C0D0E0F"
118 "303132333435363738393A3B3C3D3E3F000102030405060708090A0B0C0D0E0F"
119 "404142434445464748494A4B4C4D4E4F000102030405060708090A0B0C0D0E0F"
120 "505152535455565758595A5B5C5D5E5F000102030405060708090A0B0C0D0E0F"
121 "606162636465666768696A6B6C6D6E6F000102030405060708090A0B0C0D0E0F"
122 "707172737475767778797A7B7C7D7E7F000102030405060708090A0B0C0D0E0F"
123 "808182838485868788898A8B8C8D8E8F000102030405060708090A0B0C0D0E0F"
124 "909192939495969798999A9B9C9D9E9F000102030405060708090A0B0C0D0E0F"
125 "10010110210310410510610710810910A000102030405060708090A0B0C0D0EF"
126 "10B10C10D10E10F110111112113114115000102030405060708090A0B0C0D0EF"
127 "11611711811911A11B11C11D11E11F120000102030405060708090A0B0C0D0EF"
128 "12112212312412512612712812912A12B000102030405060708090A0B0C0D0EF"
129 "12C12D12E12F130131132134135136137000102030405060708090A0B0C0D0EF"
130 "13813913A13B13C13D13E13F140141142000102030405060708090A0B0C0D0EF"
131 "14314414514614714814914A14B14C14D000102030405060708090A0B0C0D0EF"
132 "14E14F150151152153154155156157158000102030405060708090A0B0C0D0EF"
133 "15915A15B15C15D15E15F160161162163000102030405060708090A0B0C0D0EF"
134 "16416516616716816916A16B16C16D16E000102030405060708090A0B0C0D0EF"
135 "16F170171172173174175176177178179000102030405060708090A0B0C0D0EF"
136 "17A17B17C17D17E17F180181182183184000102030405060708090A0B0C0D0EF"
137 "18518618718818918A18B18C18D18E18F000102030405060708090A0B0C0D0EF"
138 "19019119219319419519619719819919A000102030405060708090A0B0C0D0EF"
139 "19B19C19D19E19F200201202203204205000102030405060708090A0B0C0D0EF"
140 "20620720820920A20B20C20D20F210211000102030405060708090A0B0C0D0EF"
141 "21221321421521621721821921A21B21C000102030405060708090A0B0C0D0EF"
142 "21D21E21F220221222223224225226227000102030405060708090A0B0C0D0EF"
143 "22822922A22B22C22D22E22F230231232000102030405060708090A0B0C0D0EF"
144 "23323423523623723823923A23B23C23D000102030405060708090A0B0C0D0EF"
145 "2366890FE1438751097E7F6325DC0E6326F"
146 "25425525625725825925A25B25C25D25E25F000102030405060708090A0B0C0F";
147 
148 long g_nStatusDataSize = sizeof(STATUS_DATA);
149 
150 
151 // Thread function for Call Context Inserts
152 
153 
154 #ifdef NDB_WIN32
155 
ConsoleCtrlHandler(DWORD dwCtrlType)156 BOOL WINAPI ConsoleCtrlHandler(DWORD dwCtrlType)
157 {
158     if(CTRL_C_EVENT == dwCtrlType)
159     {
160         SetEvent(hShutdownEvent);
161         return TRUE;
162     }
163     return FALSE;
164 }
165 
166 #else
167 
CtrlCHandler(int)168 void CtrlCHandler(int)
169 {
170     bShutdownEvent = true;
171 }
172 
173 #endif
174 
175 
176 
ReportNdbError(const char * szMsg,const NdbError & err)177 void ReportNdbError(const char* szMsg, const NdbError& err)
178 {
179     NdbMutex_Lock(g_pNdbMutexPrintf);
180     printf("%s: %d: %s\n", szMsg, err.code, (err.message ? err.message : ""));
181     NdbMutex_Unlock(g_pNdbMutexPrintf);
182 }
183 
184 
185 void
ReportCallsPerSecond(long nNumCallsProcessed,Uint64 tStartTime,Uint64 tEndTime)186 ReportCallsPerSecond(long nNumCallsProcessed,
187                      Uint64 tStartTime,
188                      Uint64 tEndTime)
189 {
190     Uint64 tElapsed = tEndTime - tStartTime;
191     long lCallsPerSec;
192     if(tElapsed>0)
193         lCallsPerSec = (long)((1000*nNumCallsProcessed)/tElapsed);
194     else
195         lCallsPerSec = 0;
196 
197     NdbMutex_Lock(g_pNdbMutexPrintf);
198     printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)\n",
199         nNumCallsProcessed, (long)tElapsed, lCallsPerSec);
200     NdbMutex_Unlock(g_pNdbMutexPrintf);
201 }
202 
203 
204 #ifndef NDB_WIN32
InterlockedIncrement(long * lp)205 void InterlockedIncrement(long* lp)             // expensive
206 {
207     NdbMutex_Lock(g_pNdbMutexIncrement);
208     (*lp)++;
209     NdbMutex_Unlock(g_pNdbMutexIncrement);
210 }
211 #endif
212 
213 
InterlockedIncrementAndReport(void)214 void InterlockedIncrementAndReport(void)
215 {
216     NdbMutex_Lock(g_pNdbMutexIncrement);
217     ++g_nNumCallsProcessed;
218     if((g_nNumCallsProcessed%1000)==0)
219     {
220         g_tEndTime = NdbTick_CurrentMillisecond();
221         if(g_tStartTime)
222             ReportCallsPerSecond(1000, g_tStartTime, g_tEndTime);
223 
224         g_tStartTime = g_tEndTime;
225     }
226     NdbMutex_Unlock(g_pNdbMutexIncrement);
227 }
228 
229 
SleepOneCall(void)230 void SleepOneCall(void)
231 {
232     int iMillisecToSleep;
233     if(g_nMaxCallsPerSecond>0)
234         iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond;
235     else
236         iMillisecToSleep = 50;
237 
238     if(iMillisecToSleep>0)
239         NdbSleep_MilliSleep(iMillisecToSleep);
240 
241 }
242 
243 
244 
QueryTransaction(Ndb * pNdb,long iContextId,long * piVersion,long * piLockFlag,long * piLockTime,long * piLockTimeUSec,char * pchContextData,NdbError & err)245 int QueryTransaction(Ndb* pNdb,
246                      long iContextId,
247                      long* piVersion,
248                      long* piLockFlag,
249                      long* piLockTime,
250                      long* piLockTimeUSec,
251                      char* pchContextData,
252                      NdbError& err)
253 {
254     int iRes = -1;
255     NdbConnection* pNdbConnection = pNdb->startTransaction();
256     //0, (const char*)&iContextId, 4);
257     if(pNdbConnection)
258     {
259         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
260         if(pNdbOperation)
261         {
262             NdbRecAttr* pNdbRecAttrVersion;
263             NdbRecAttr* pNdbRecAttrLockFlag;
264             NdbRecAttr* pNdbRecAttrLockTime;
265             NdbRecAttr* pNdbRecAttrLockTimeUSec;
266             NdbRecAttr* pNdbRecAttrContextData;
267             if(!pNdbOperation->readTuple()
268             && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)
269             && (pNdbRecAttrVersion=pNdbOperation->getValue(c_szVersion, (char*)piVersion))
270             && (pNdbRecAttrLockFlag=pNdbOperation->getValue(c_szLockFlag, (char*)piLockFlag))
271             && (pNdbRecAttrLockTime=pNdbOperation->getValue(c_szLockTime, (char*)piLockTime))
272             && (pNdbRecAttrLockTimeUSec=pNdbOperation->getValue(c_szLockTimeUSec, (char*)piLockTimeUSec))
273             && (pNdbRecAttrContextData=pNdbOperation->getValue(c_szContextData, pchContextData)))
274             {
275                 if(!pNdbConnection->execute(Commit))
276                     iRes = 0;
277                 else
278                     err = pNdbConnection->getNdbError();
279             }
280             else
281                 err = pNdbOperation->getNdbError();
282         }
283         else
284             err = pNdbConnection->getNdbError();
285 
286         pNdb->closeTransaction(pNdbConnection);
287     }
288     else
289         err = pNdb->getNdbError();
290 
291     return iRes;
292 }
293 
294 
RetryQueryTransaction(Ndb * pNdb,long iContextId,long * piVersion,long * piLockFlag,long * piLockTime,long * piLockTimeUSec,char * pchContextData,NdbError & err,int & nRetry)295 int RetryQueryTransaction(Ndb* pNdb,
296                           long iContextId,
297                           long* piVersion,
298                           long* piLockFlag,
299                           long* piLockTime,
300                           long* piLockTimeUSec,
301                           char* pchContextData,
302                           NdbError& err,
303                           int& nRetry)
304 {
305     int iRes = -1;
306     nRetry = 0;
307     bool bRetry = true;
308     while(bRetry && nRetry<g_nMaxRetry)
309     {
310         if(!QueryTransaction(pNdb, iContextId, piVersion, piLockFlag,
311             piLockTime, piLockTimeUSec, pchContextData, err))
312         {
313             iRes = 0;
314             bRetry = false;
315         }
316         else
317         {
318             switch(err.status)
319             {
320             case NdbError::TemporaryError:
321             case NdbError::UnknownResult:
322                 SleepOneCall();
323                 ++nRetry;
324                 break;
325 
326             case NdbError::PermanentError:
327             default:
328                 bRetry = false;
329                 break;
330             }
331         }
332     }
333     return iRes;
334 }
335 
336 
DeleteTransaction(Ndb * pNdb,long iContextId,NdbError & err)337 int DeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err)
338 {
339     int iRes = -1;
340     NdbConnection* pNdbConnection = pNdb->startTransaction();
341     //0, (const char*)&iContextId, 4);
342     if(pNdbConnection)
343     {
344         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
345         if(pNdbOperation)
346         {
347             if(!pNdbOperation->deleteTuple()
348             && !pNdbOperation->equal(c_szContextId, (Int32)iContextId))
349             {
350                 if(pNdbConnection->execute(Commit) == 0)
351                     iRes = 0;
352                 else
353                     err = pNdbConnection->getNdbError();
354             }
355             else
356                 err = pNdbOperation->getNdbError();
357         }
358         else
359             err = pNdbConnection->getNdbError();
360 
361         pNdb->closeTransaction(pNdbConnection);
362     }
363     else
364         err = pNdb->getNdbError();
365 
366     return iRes;
367 }
368 
369 
370 
RetryDeleteTransaction(Ndb * pNdb,long iContextId,NdbError & err,int & nRetry)371 int RetryDeleteTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry)
372 {
373     int iRes = -1;
374     nRetry = 0;
375     bool bRetry = true;
376     bool bUnknown = false;
377     while(bRetry && nRetry<g_nMaxRetry)
378     {
379         if(!DeleteTransaction(pNdb, iContextId, err))
380         {
381             iRes = 0;
382             bRetry = false;
383         }
384         else
385         {
386             switch(err.status)
387             {
388             case NdbError::UnknownResult:
389                 bUnknown = true;
390                 ++nRetry;
391                 break;
392 
393             case NdbError::TemporaryError:
394                 bUnknown = false;
395                 SleepOneCall();
396                 ++nRetry;
397                 break;
398 
399             case NdbError::PermanentError:
400                 if(err.code==626 && bUnknown)
401                     iRes = 0;
402                 bRetry = false;
403                 break;
404 
405             default:
406                 bRetry = false;
407                 break;
408             }
409         }
410     }
411     return iRes;
412 }
413 
414 
415 
InsertTransaction(Ndb * pNdb,long iContextID,long iVersion,long iLockFlag,long iLockTime,long iLockTimeUSec,const char * pchContextData,NdbError & err)416 int InsertTransaction(Ndb* pNdb,
417                       long iContextID,
418                       long iVersion,
419                       long iLockFlag,
420                       long iLockTime,
421                       long iLockTimeUSec,
422                       const char* pchContextData,
423                       NdbError& err)
424 {
425     int iRes = -1;
426     NdbConnection* pNdbConnection = pNdb->startTransaction();
427     //0, (const char*)&iContextID, 4);
428     if(pNdbConnection)
429     {
430         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
431         if(pNdbOperation)
432         {
433             if(!(g_bWriteTuple ? pNdbOperation->writeTuple() : pNdbOperation->insertTuple())
434                 && !pNdbOperation->equal(c_szContextId, (Int32)iContextID)
435                 && !pNdbOperation->setValue(c_szVersion, (Int32)iVersion)
436                 && !pNdbOperation->setValue(c_szLockFlag, (Int32)iLockFlag)
437                 && !pNdbOperation->setValue(c_szLockTime, (Int32)iLockTime)
438                 && !pNdbOperation->setValue(c_szLockTimeUSec, (Int32)iLockTimeUSec)
439                 && !pNdbOperation->setValue(c_szContextData, pchContextData, g_nStatusDataSize))
440             {
441                 if(!pNdbConnection->execute(Commit))
442                     iRes = 0;
443                 else
444                     err = pNdbConnection->getNdbError();
445             }
446             else
447                 err = pNdbOperation->getNdbError();
448         }
449         else
450             err = pNdbConnection->getNdbError();
451 
452         pNdb->closeTransaction(pNdbConnection);
453     }
454     else
455         err = pNdb->getNdbError();
456 
457     return iRes;
458 }
459 
460 
461 
RetryInsertTransaction(Ndb * pNdb,long iContextId,long iVersion,long iLockFlag,long iLockTime,long iLockTimeUSec,const char * pchContextData,NdbError & err,int & nRetry)462 int RetryInsertTransaction(Ndb* pNdb,
463                            long iContextId,
464                            long iVersion,
465                            long iLockFlag,
466                            long iLockTime,
467                            long iLockTimeUSec,
468                            const char* pchContextData,
469                            NdbError& err, int& nRetry)
470 {
471     int iRes = -1;
472     nRetry = 0;
473     bool bRetry = true;
474     bool bUnknown = false;
475     while(bRetry && nRetry<g_nMaxRetry)
476     {
477         if(!InsertTransaction(pNdb, iContextId, iVersion, iLockFlag,
478             iLockTime, iLockTimeUSec, pchContextData, err))
479         {
480             iRes = 0;
481             bRetry = false;
482         }
483         else
484         {
485             switch(err.status)
486             {
487             case NdbError::UnknownResult:
488                 bUnknown = true;
489                 ++nRetry;
490                 break;
491 
492             case NdbError::TemporaryError:
493                 bUnknown = false;
494                 SleepOneCall();
495                 ++nRetry;
496                 break;
497 
498             case NdbError::PermanentError:
499                 if(err.code==630 && bUnknown)
500                     iRes = 0;
501                 bRetry = false;
502                 break;
503 
504             default:
505                 bRetry = false;
506                 break;
507             }
508         }
509     }
510     return iRes;
511 }
512 
513 
UpdateTransaction(Ndb * pNdb,long iContextId,NdbError & err)514 int UpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err)
515 {
516     int iRes = -1;
517     NdbConnection* pNdbConnection = pNdb->startTransaction();
518     //0, (const char*)&iContextId, 4);
519     if(pNdbConnection)
520     {
521         NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName);
522         if(pNdbOperation)
523         {
524             if(!pNdbOperation->updateTuple()
525             && !pNdbOperation->equal(c_szContextId, (Int32)iContextId)
526             && !pNdbOperation->setValue(c_szContextData, STATUS_DATA, g_nStatusDataSize))
527             {
528                 if(!pNdbConnection->execute(Commit))
529                     iRes = 0;
530                 else
531                     err = pNdbConnection->getNdbError();
532             }
533             else
534                 err = pNdbOperation->getNdbError();
535         }
536         else
537             err = pNdbConnection->getNdbError();
538 
539         pNdb->closeTransaction(pNdbConnection);
540     }
541     else
542         err = pNdb->getNdbError();
543 
544     return iRes;
545 }
546 
547 
RetryUpdateTransaction(Ndb * pNdb,long iContextId,NdbError & err,int & nRetry)548 int RetryUpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry)
549 {
550     int iRes = -1;
551     nRetry = 0;
552     bool bRetry = true;
553     while(bRetry && nRetry<g_nMaxRetry)
554     {
555         if(!UpdateTransaction(pNdb, iContextId, err))
556         {
557             iRes = 0;
558             bRetry = false;
559         }
560         else
561         {
562             switch(err.status)
563             {
564             case NdbError::TemporaryError:
565             case NdbError::UnknownResult:
566                 SleepOneCall();
567                 ++nRetry;
568                 break;
569 
570             case NdbError::PermanentError:
571             default:
572                 bRetry = false;
573                 break;
574             }
575         }
576     }
577     return iRes;
578 }
579 
580 
581 
InsertInitialRecords(Ndb * pNdb,long nInsert,long nSeed)582 int InsertInitialRecords(Ndb* pNdb, long nInsert, long nSeed)
583 {
584     int iRes = -1;
585     char szMsg[100];
586     for(long i=0; i<nInsert; ++i)
587     {
588         int iContextID = i+nSeed;
589         int nRetry = 0;
590         NdbError err;
591         memset(&err, 0, sizeof(err));
592         Uint64 tStartTrans = NdbTick_CurrentMillisecond();
593         iRes = RetryInsertTransaction(pNdb, iContextID, nSeed, iContextID,
594             (long)(tStartTrans/1000), (long)((tStartTrans%1000)*1000),
595             STATUS_DATA, err, nRetry);
596         Uint64 tEndTrans = NdbTick_CurrentMillisecond();
597         long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
598         if(nRetry>0)
599         {
600             sprintf(szMsg, "insert retried %d times, time %ld msec.",
601                 nRetry, lMillisecForThisTrans);
602             ReportNdbError(szMsg, err);
603         }
604         if(iRes)
605         {
606             ReportNdbError("Insert initial record failed", err);
607             return iRes;
608         }
609         InterlockedIncrement(&g_nNumberOfInitialInsert);
610     }
611     return iRes;
612 }
613 
614 
615 
VerifyInitialRecords(Ndb * pNdb,long nVerify,long nSeed)616 int VerifyInitialRecords(Ndb* pNdb, long nVerify, long nSeed)
617 {
618     int iRes = -1;
619     char* pchContextData = new char[g_nStatusDataSize];
620     char szMsg[100];
621     long iPrevLockTime = -1;
622     long iPrevLockTimeUSec = -1;
623     for(long i=0; i<nVerify; ++i)
624     {
625         int iContextID = i+nSeed;
626         long iVersion = 0;
627         long iLockFlag = 0;
628         long iLockTime = 0;
629         long iLockTimeUSec = 0;
630         int nRetry = 0;
631         NdbError err;
632         memset(&err, 0, sizeof(err));
633         Uint64 tStartTrans = NdbTick_CurrentMillisecond();
634         iRes = RetryQueryTransaction(pNdb, iContextID, &iVersion, &iLockFlag,
635                     &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry);
636         Uint64 tEndTrans = NdbTick_CurrentMillisecond();
637         long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
638         if(nRetry>0)
639         {
640             sprintf(szMsg, "verify retried %d times, time %ld msec.",
641                 nRetry, lMillisecForThisTrans);
642             ReportNdbError(szMsg, err);
643         }
644         if(iRes)
645         {
646             ReportNdbError("Read initial record failed", err);
647             delete[] pchContextData;
648             return iRes;
649         }
650         if(memcmp(pchContextData, STATUS_DATA, g_nStatusDataSize))
651         {
652             sprintf(szMsg, "wrong context data in tuple %d", iContextID);
653             ReportNdbError(szMsg, err);
654             delete[] pchContextData;
655             return -1;
656         }
657         if(iVersion!=nSeed
658             || iLockFlag!=iContextID
659             || iLockTime<iPrevLockTime
660             || (iLockTime==iPrevLockTime && iLockTimeUSec<iPrevLockTimeUSec))
661         {
662             sprintf(szMsg, "wrong call data in tuple %d", iContextID);
663             ReportNdbError(szMsg, err);
664             delete[] pchContextData;
665             return -1;
666         }
667         iPrevLockTime = iLockTime;
668         iPrevLockTimeUSec = iLockTimeUSec;
669         InterlockedIncrement(&g_nNumberOfInitialVerify);
670     }
671     delete[] pchContextData;
672     return iRes;
673 }
674 
675 
676 
677 
678 
RuntimeCallContext(void * lpParam)679 void* RuntimeCallContext(void* lpParam)
680 {
681     long nNumCallsProcessed = 0;
682     int nStartingRecordID = *(int*)lpParam;
683 
684     Ndb* pNdb;
685     char* pchContextData = new char[g_nStatusDataSize];
686     char szMsg[100];
687 
688     int iRes;
689     const char* szOp;
690     long iVersion;
691     long iLockFlag;
692     long iLockTime;
693     long iLockTimeUSec;
694 
695     pNdb = new Ndb(theConnection, "TEST_DB");
696     if(!pNdb)
697     {
698         NdbMutex_Lock(g_pNdbMutexPrintf);
699         printf("new Ndb failed\n");
700         NdbMutex_Unlock(g_pNdbMutexPrintf);
701         delete[] pchContextData;
702         return 0;
703     }
704 
705     if(pNdb->init(1) || pNdb->waitUntilReady())
706     {
707         ReportNdbError("init of Ndb failed", pNdb->getNdbError());
708         delete pNdb;
709         delete[] pchContextData;
710         return 0;
711     }
712 
713     if(g_bInsertInitial)
714     {
715         if(InsertInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread))
716         {
717             delete pNdb;
718             delete[] pchContextData;
719             return 0;
720         }
721     }
722 
723     if(g_bVerifyInitial)
724     {
725         NdbError err;
726         memset(&err, 0, sizeof(err));
727         if(VerifyInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread))
728         {
729             delete pNdb;
730             delete[] pchContextData;
731             return 0;
732         }
733     }
734     if(g_bInsertInitial || g_bVerifyInitial)
735     {
736         delete[] pchContextData;
737         return 0;
738     }
739 
740     long nContextID = nStartingRecordID;
741 #ifdef NDB_WIN32
742     while(WaitForSingleObject(hShutdownEvent,0) != WAIT_OBJECT_0)
743 #else
744     while(!bShutdownEvent)
745 #endif
746     {
747         ++nContextID;
748         nContextID %= g_nMaxContextIdPerThread;
749         nContextID += nStartingRecordID;
750 
751         bool bTimeLatency = (nContextID==100);
752 
753         Uint64 tStartCall = NdbTick_CurrentMillisecond();
754         for (int i=0; i < 20; i++)
755         {
756             int nRetry = 0;
757             NdbError err;
758             memset(&err, 0, sizeof(err));
759             Uint64 tStartTrans = NdbTick_CurrentMillisecond();
760             switch(i)
761             {
762             case 3:
763             case 6:
764             case 9:
765             case 11:
766             case 12:
767             case 15:
768             case 18:   // Query Record
769                 szOp = "Read";
770                 iRes = RetryQueryTransaction(pNdb, nContextID, &iVersion, &iLockFlag,
771                     &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry);
772                 break;
773 
774             case 19:    // Delete Record
775                 szOp = "Delete";
776                 iRes = RetryDeleteTransaction(pNdb, nContextID, err, nRetry);
777                 break;
778 
779             case 0: // Insert Record
780                 szOp = "Insert";
781                 iRes = RetryInsertTransaction(pNdb, nContextID, 1, 1, 1, 1, STATUS_DATA, err, nRetry);
782                 break;
783 
784             default:    // Update Record
785                 szOp = "Update";
786                 iRes = RetryUpdateTransaction(pNdb, nContextID, err, nRetry);
787                 break;
788             }
789             Uint64 tEndTrans = NdbTick_CurrentMillisecond();
790             long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans);
791 
792             if(g_bReport)
793             {
794               require(lMillisecForThisTrans>=0 && lMillisecForThisTrans<c_nMaxMillisecForAllTrans);
795               InterlockedIncrement(g_plCountMillisecForTrans+lMillisecForThisTrans);
796             }
797 
798             if(nRetry>0)
799             {
800                 sprintf(szMsg, "%s retried %d times, time %ld msec.",
801                     szOp, nRetry, lMillisecForThisTrans);
802                 ReportNdbError(szMsg, err);
803             }
804             else if(bTimeLatency)
805             {
806                 NdbMutex_Lock(g_pNdbMutexPrintf);
807                 printf("%s = %ld msec.\n", szOp, lMillisecForThisTrans);
808                 NdbMutex_Unlock(g_pNdbMutexPrintf);
809             }
810 
811             if(iRes)
812             {
813                 sprintf(szMsg, "%s failed after %ld calls, terminating thread",
814                     szOp, nNumCallsProcessed);
815                 ReportNdbError(szMsg, err);
816                 delete pNdb;
817                 delete[] pchContextData;
818                 return 0;
819             }
820         }
821         Uint64 tEndCall = NdbTick_CurrentMillisecond();
822         long lMillisecForThisCall = (long)(tEndCall-tStartCall);
823 
824         if(g_bReport)
825         {
826           require(lMillisecForThisCall>=0 && lMillisecForThisCall<c_nMaxMillisecForAllCall);
827           InterlockedIncrement(g_plCountMillisecForCall+lMillisecForThisCall);
828         }
829 
830         if(bTimeLatency)
831         {
832             NdbMutex_Lock(g_pNdbMutexPrintf);
833             printf("Total time for call is %ld msec.\n", (long)lMillisecForThisCall);
834             NdbMutex_Unlock(g_pNdbMutexPrintf);
835         }
836 
837         nNumCallsProcessed++;
838         InterlockedIncrementAndReport();
839         if(g_nMaxCallsPerSecond>0)
840         {
841             int iMillisecToSleep = (1000*g_nNumThreads)/g_nMaxCallsPerSecond;
842             iMillisecToSleep -= lMillisecForThisCall;
843             if(iMillisecToSleep>0)
844             {
845                 NdbSleep_MilliSleep(iMillisecToSleep);
846             }
847         }
848     }
849 
850     NdbMutex_Lock(g_pNdbMutexPrintf);
851     printf("Terminating thread after %ld calls\n", nNumCallsProcessed);
852     NdbMutex_Unlock(g_pNdbMutexPrintf);
853 
854     delete pNdb;
855     delete[] pchContextData;
856     return 0;
857 }
858 
859 
CreateCallContextTable(Ndb * pNdb,const char * szTableName,bool bStored)860 int CreateCallContextTable(Ndb* pNdb, const char* szTableName, bool bStored)
861 {
862     int iRes = -1;
863     NdbError err;
864     memset(&err, 0, sizeof(err));
865 
866     NdbSchemaCon* pNdbSchemaCon = NdbSchemaCon::startSchemaTrans(pNdb);
867     if(pNdbSchemaCon)
868     {
869         NdbSchemaOp* pNdbSchemaOp = pNdbSchemaCon->getNdbSchemaOp();
870         if(pNdbSchemaOp)
871         {
872             if(!pNdbSchemaOp->createTable(szTableName, 8, TupleKey, 2,
873                 All, 6, 78, 80, 1, bStored)
874                 && !pNdbSchemaOp->createAttribute(c_szContextId, TupleKey, 32, 1, Signed)
875                 && !pNdbSchemaOp->createAttribute(c_szVersion, NoKey, 32, 1, Signed)
876                 && !pNdbSchemaOp->createAttribute(c_szLockFlag, NoKey, 32, 1, Signed)
877                 && !pNdbSchemaOp->createAttribute(c_szLockTime, NoKey, 32, 1, Signed)
878                 && !pNdbSchemaOp->createAttribute(c_szLockTimeUSec, NoKey, 32, 1, Signed)
879                 && !pNdbSchemaOp->createAttribute(c_szContextData, NoKey, 8, g_nStatusDataSize, String))
880             {
881                 if(!pNdbSchemaCon->execute())
882                     iRes = 0;
883                 else
884                     err = pNdbSchemaCon->getNdbError();
885             }
886             else
887                 err = pNdbSchemaOp->getNdbError();
888         }
889         else
890             err = pNdbSchemaCon->getNdbError();
891 
892         NdbSchemaCon::closeSchemaTrans(pNdbSchemaCon);
893     }
894     else
895         err = pNdb->getNdbError();
896 
897     if(iRes)
898     {
899         ReportNdbError("create call context table failed", err);
900     }
901     return iRes;
902 }
903 
904 
905 
ReportResponseTimeStatistics(const char * szStat,long * plCount,const long lSize)906 void ReportResponseTimeStatistics(const char* szStat, long* plCount, const long lSize)
907 {
908   long lCount = 0;
909   Int64 llSum = 0;
910   Int64 llSum2 = 0;
911   long lMin = -1;
912   long lMax = -1;
913 
914   for(long l=0; l<lSize; ++l)
915   {
916     if(plCount[l]>0)
917     {
918       lCount += plCount[l];
919       llSum += (Int64)l*(Int64)plCount[l];
920       llSum2 += (Int64)l*(Int64)l*(Int64)plCount[l];
921       if(lMin==-1 || l<lMin)
922       {
923         lMin = l;
924       }
925       if(lMax==-1 || l>lMax)
926       {
927         lMax = l;
928       }
929     }
930   }
931 
932   long lAvg = long(llSum/lCount);
933   double dblVar = ((double)lCount*(double)llSum2 - (double)llSum*(double)llSum)/((double)lCount*(double)(lCount-1));
934   long lStd = long(sqrt(dblVar));
935 
936   long lMed = -1;
937   long l95 = -1;
938   long lSel = -1;
939   for(long l=lMin; l<=lMax; ++l)
940   {
941     if(plCount[l]>0)
942     {
943       lSel += plCount[l];
944       if(lMed==-1 && lSel>=(lCount/2))
945       {
946         lMed = l;
947       }
948       if(l95==-1 && lSel>=((lCount*95)/100))
949       {
950         l95 = l;
951       }
952       if(g_bReportPlus)
953       {
954         printf("%ld\t%ld\n", l, plCount[l]);
955       }
956     }
957   }
958 
959   printf("%s: Count=%ld, Min=%ld, Max=%ld, Avg=%ld, Std=%ld, Med=%ld, 95%%=%ld\n",
960     szStat, lCount, lMin, lMax, lAvg, lStd, lMed, l95);
961 }
962 
963 
964 
ShowHelp(const char * szCmd)965 void ShowHelp(const char* szCmd)
966 {
967     printf("%s -t<threads> [-s<seed>] [-b<batch>] [-c<maxcps>] [-m<size>] [-d] [-i] [-v] [-f] [-w] [-r[+]]\n", szCmd);
968     printf("%s -?\n", szCmd);
969     puts("-d\t\tcreate the table");
970     puts("-i\t\tinsert initial records");
971     puts("-v\t\tverify initial records");
972     puts("-t<threads>\tnumber of threads making calls");
973     puts("-s<seed>\toffset for primary key");
974     puts("-b<batch>\tbatch size per thread");
975     puts("-c<maxcps>\tmax number of calls per second for this process");
976     puts("-m<size>\tsize of context data");
977     puts("-f\t\tno checkpointing and no logging");
978     puts("-w\t\tuse writeTuple instead of insertTuple");
979     puts("-r\t\treport response time statistics");
980     puts("-r+\t\treport response time distribution");
981     puts("-?\t\thelp");
982 }
983 
984 
main(int argc,char * argv[])985 int main(int argc, char* argv[])
986 {
987     ndb_init();
988     g_nNumThreads = 0;
989     g_nMaxCallsPerSecond = 0;
990     long nSeed = 0;
991     bool bStoredTable = true;
992     bool bCreateTable = false;
993     g_bWriteTuple = false;
994     g_bReport = false;
995     g_bReportPlus = false;
996 
997     for(int i=1; i<argc; ++i)
998     {
999         if(argv[i][0]=='-' || argv[i][0]=='/')
1000         {
1001             switch(argv[i][1])
1002             {
1003             case 't':
1004                 g_nNumThreads = atol(argv[i]+2);
1005                 break;
1006             case 's':
1007                 nSeed = atol(argv[i]+2);
1008                 break;
1009             case 'b':
1010                 g_nMaxContextIdPerThread = atol(argv[i]+2);
1011                 break;
1012             case 'm':
1013                 g_nStatusDataSize = atol(argv[i]+2);
1014                 if(g_nStatusDataSize> (int) sizeof(STATUS_DATA))
1015                 {
1016                     g_nStatusDataSize = sizeof(STATUS_DATA);
1017                 }
1018                 break;
1019             case 'i':
1020                 g_bInsertInitial = true;
1021                 break;
1022             case 'v':
1023                 g_bVerifyInitial = true;
1024                 break;
1025             case 'd':
1026                 bCreateTable = true;
1027                 break;
1028             case 'f':
1029                 bStoredTable = false;
1030                 break;
1031             case 'w':
1032                 g_bWriteTuple = true;
1033                 break;
1034             case 'r':
1035                 g_bReport = true;
1036                 if(argv[i][2]=='+')
1037                 {
1038                   g_bReportPlus = true;
1039                 }
1040                 break;
1041             case 'c':
1042                 g_nMaxCallsPerSecond = atol(argv[i]+2);
1043                 break;
1044             case '?':
1045             default:
1046                 ShowHelp(argv[0]);
1047                 return -1;
1048             }
1049         }
1050         else
1051         {
1052             ShowHelp(argv[0]);
1053             return -1;
1054         }
1055     }
1056     if(bCreateTable)
1057         puts("-d\tcreate the table");
1058     if(g_bInsertInitial)
1059         printf("-i\tinsert initial records\n");
1060     if(g_bVerifyInitial)
1061         printf("-v\tverify initial records\n");
1062     if(g_nNumThreads>0)
1063         printf("-t%ld\tnumber of threads making calls\n", g_nNumThreads);
1064     if(g_nNumThreads>0)
1065     {
1066         printf("-s%ld\toffset for primary key\n", nSeed);
1067         printf("-b%ld\tbatch size per thread\n", g_nMaxContextIdPerThread);
1068     }
1069     if(g_nMaxCallsPerSecond>0)
1070         printf("-c%ld\tmax number of calls per second for this process\n", g_nMaxCallsPerSecond);
1071     if(!bStoredTable)
1072         puts("-f\tno checkpointing and no logging to disk");
1073     if(g_bWriteTuple)
1074         puts("-w\tuse writeTuple instead of insertTuple");
1075     if(g_bReport)
1076         puts("-r\treport response time statistics");
1077     if(g_bReportPlus)
1078         puts("-r+\treport response time distribution");
1079 
1080     if(!bCreateTable && g_nNumThreads<=0)
1081     {
1082         ShowHelp(argv[0]);
1083         return -1;
1084     }
1085     printf("-m%ld\tsize of context data\n", g_nStatusDataSize);
1086 
1087     g_szTableName = (bStoredTable ? c_szTableNameStored : c_szTableNameTemp);
1088 
1089 #ifdef NDB_WIN32
1090     SetConsoleCtrlHandler(ConsoleCtrlHandler, true);
1091 #else
1092     signal(SIGINT, CtrlCHandler);
1093 #endif
1094 
1095     if(g_bReport)
1096     {
1097       g_plCountMillisecForCall = new long[c_nMaxMillisecForAllCall];
1098       memset(g_plCountMillisecForCall, 0, c_nMaxMillisecForAllCall*sizeof(long));
1099       g_plCountMillisecForTrans = new long[c_nMaxMillisecForAllTrans];
1100       memset(g_plCountMillisecForTrans, 0, c_nMaxMillisecForAllTrans*sizeof(long));
1101     }
1102 
1103     g_pNdbMutexIncrement = NdbMutex_Create();
1104     g_pNdbMutexPrintf = NdbMutex_Create();
1105 #ifdef NDB_WIN32
1106     hShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
1107 #endif
1108 
1109     theConnection= new Ndb_cluster_connection();
1110     if (theConnection->connect(12, 5, 1) != 0)
1111     {
1112       ndbout << "Unable to connect to managment server." << endl;
1113       return -1;
1114     }
1115     if (theConnection->wait_until_ready(30,0) < 0)
1116     {
1117       ndbout << "Cluster nodes not ready in 30 seconds." << endl;
1118       return -1;
1119     }
1120 
1121     Ndb* pNdb = new Ndb(theConnection, c_szDatabaseName);
1122     if(!pNdb)
1123     {
1124         printf("could not construct ndb\n");
1125         return 1;
1126     }
1127 
1128     if(pNdb->init(1) || pNdb->waitUntilReady())
1129     {
1130         ReportNdbError("could not initialize ndb\n", pNdb->getNdbError());
1131         delete pNdb;
1132         return 2;
1133     }
1134 
1135     if(bCreateTable)
1136     {
1137         printf("Create CallContext table\n");
1138 	if (bStoredTable)
1139 	{
1140 	  if (CreateCallContextTable(pNdb, c_szTableNameStored, true))
1141 	  {
1142             printf("Create table failed\n");
1143             delete pNdb;
1144             return 3;
1145 	  }
1146 	}
1147 	else
1148 	{
1149 	  if (CreateCallContextTable(pNdb, c_szTableNameTemp, false))
1150 	  {
1151             printf("Create table failed\n");
1152             delete pNdb;
1153             return 3;
1154 	  }
1155 	}
1156     }
1157 
1158     if(g_nNumThreads>0)
1159     {
1160         printf("creating %d threads\n", (int)g_nNumThreads);
1161         if(g_bInsertInitial)
1162         {
1163             printf("each thread will insert %ld initial records, total %ld inserts\n",
1164                 g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread);
1165         }
1166         if(g_bVerifyInitial)
1167         {
1168             printf("each thread will verify %ld initial records, total %ld reads\n",
1169                 g_nMaxContextIdPerThread, g_nNumThreads*g_nMaxContextIdPerThread);
1170         }
1171 
1172         g_nNumberOfInitialInsert = 0;
1173         g_nNumberOfInitialVerify = 0;
1174 
1175         Uint64 tStartTime = NdbTick_CurrentMillisecond();
1176         NdbThread* pThreads[256];
1177         int pnStartingRecordNum[256];
1178         int ij;
1179         for(ij=0;ij<g_nNumThreads;ij++)
1180         {
1181             pnStartingRecordNum[ij] = (ij*g_nMaxContextIdPerThread) + nSeed;
1182         }
1183 
1184         for(ij=0;ij<g_nNumThreads;ij++)
1185         {
1186             pThreads[ij] = NdbThread_Create(RuntimeCallContext,
1187                 (void**)(pnStartingRecordNum+ij),
1188                 0, "RuntimeCallContext", NDB_THREAD_PRIO_LOW);
1189         }
1190 
1191         //Wait for the threads to finish
1192         for(ij=0;ij<g_nNumThreads;ij++)
1193         {
1194             void* status;
1195             NdbThread_WaitFor(pThreads[ij], &status);
1196         }
1197         Uint64 tEndTime = NdbTick_CurrentMillisecond();
1198 
1199         //Print time taken
1200         printf("Time Taken for %ld Calls is %ld msec (= %ld calls/sec)\n",
1201             g_nNumCallsProcessed,
1202             (long)(tEndTime-tStartTime),
1203             (long)((1000*g_nNumCallsProcessed)/(tEndTime-tStartTime)));
1204 
1205         if(g_bInsertInitial)
1206             printf("successfully inserted %ld tuples\n", g_nNumberOfInitialInsert);
1207         if(g_bVerifyInitial)
1208             printf("successfully verified %ld tuples\n", g_nNumberOfInitialVerify);
1209     }
1210 
1211     delete pNdb;
1212 
1213 #ifdef NDB_WIN32
1214     CloseHandle(hShutdownEvent);
1215 #endif
1216     NdbMutex_Destroy(g_pNdbMutexIncrement);
1217     NdbMutex_Destroy(g_pNdbMutexPrintf);
1218 
1219     if(g_bReport)
1220     {
1221       ReportResponseTimeStatistics("Calls", g_plCountMillisecForCall, c_nMaxMillisecForAllCall);
1222       ReportResponseTimeStatistics("Transactions", g_plCountMillisecForTrans, c_nMaxMillisecForAllTrans);
1223 
1224       delete[] g_plCountMillisecForCall;
1225       delete[] g_plCountMillisecForTrans;
1226     }
1227 
1228     return 0;
1229 }
1230 
1231