1 /*
2    Copyright (C) 2003-2006 MySQL AB
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 #include <NdbApi.hpp>
27 #include <NdbSchemaCon.hpp>
28 #include <NdbMutex.h>
29 #include <NdbOut.hpp>
30 #include <NdbSleep.h>
31 #include <NdbThread.h>
32 #include <NdbTick.h>
33 #include <NdbMain.h>
34 #include <NdbTest.hpp>
35 #include <random.h>
36 
37 //#define TRACE
38 #define DEBUG
39 //#define RELEASE
40 #define NODE_REC // epaulsa: introduces pointer checks to help 'acid' keep core
41 // during node recovery
42 
43 #ifdef TRACE
44 
45 #define VerifyMethodInt(c, m) (ReportMethodInt(c->m, c, #c, #m, __FILE__, __LINE__))
46 #define VerifyMethodPtr(v, c, m) (v=ReportMethodPtr(c->m, c, #v, #c, #m, __FILE__, __LINE__))
47 #define VerifyMethodVoid(c, m) (c->m, ReportMethodVoid(c, #c, #m, __FILE__, __LINE__))
48 
ReportMethodInt(int iRes,NdbConnection * pNdbConnection,const char * szClass,const char * szMethod,const char * szFile,const int iLine)49 int ReportMethodInt(int iRes, NdbConnection* pNdbConnection, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
50 {
51   ndbout << szFile << "(" << iLine << ") : ";
52   ndbout << szClass << "->" << szMethod << " return " << iRes << " : ";
53   ndbout << pNdbConnection->getNdbError();
54   NdbOperation* pNdbOperation = pNdbConnection->getNdbErrorOperation();
55   if(pNdbOperation) {
56     ndbout << " : " << pNdbOperation->getNdbError();
57   }
58   ndbout << " : " << pNdbConnection->getNdbErrorLine();
59   ndbout << endl;
60   return iRes;
61 }
62 
63 template <class C>
ReportMethodInt(int iRes,C * pC,const char * szClass,const char * szMethod,const char * szFile,const int iLine)64 int ReportMethodInt(int iRes, C* pC, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
65 {
66   ndbout << szFile << "(" << iLine << ") : ";
67   ndbout << szClass << "->" << szMethod << " return " << iRes << " : ";
68   ndbout << pC->getNdbError();
69   ndbout << endl;
70   return iRes;
71 }
72 
73 template <class R, class C>
74 R* ReportMethodPtr(R* pR, C* pC, const char* szVariable, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
75 {
76   ndbout << szFile << "(" << iLine << ") : ";
77   ndbout << szVariable << " = " << szClass << "->" << szMethod << " return " << (long)(void*)pR << " : ";
78   ndbout << pC->getNdbError();
79   ndbout << endl;
80   return pR;
81 }
82 
83 template <class C>
ReportMethodVoid(C * pC,const char * szClass,const char * szMethod,const char * szFile,const int iLine)84 void ReportMethodVoid(C* pC, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
85 {
86   ndbout << szFile << "(" << iLine << ") : ";
87   ndbout << szClass << "->" << szMethod << " : ";
88   ndbout << pC->getNdbError();
89   ndbout << endl;
90 }
91 #endif /* TRACE */
92 
93 
94 #ifdef DEBUG
95 
96 #define VerifyMethodInt(c, m) (ReportMethodInt(c->m, c, #c, #m, __FILE__, __LINE__))
97 #define VerifyMethodPtr(v, c, m) (v=ReportMethodPtr(c->m, c, #v, #c, #m, __FILE__, __LINE__))
98 #define VerifyMethodVoid(c, m) (c->m, ReportMethodVoid(c, #c, #m, __FILE__, __LINE__))
99 
ReportMethodInt(int iRes,NdbConnection * pNdbConnection,const char * szClass,const char * szMethod,const char * szFile,const int iLine)100 int ReportMethodInt(int iRes, NdbConnection* pNdbConnection, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
101 {
102   if(iRes<0) {
103     ndbout << szFile << "(" << iLine << ") : ";
104     ndbout << szClass << "->" << szMethod << " return " << iRes << " : ";
105     ndbout << pNdbConnection->getNdbError();
106     NdbOperation* pNdbOperation = pNdbConnection->getNdbErrorOperation();
107     if(pNdbOperation) {
108       ndbout << " : " << pNdbOperation->getNdbError();
109     }
110     ndbout << " : " << pNdbConnection->getNdbErrorLine();
111     ndbout << " : ";
112     ndbout << endl;
113   }
114   return iRes;
115 }
116 
117 template <class C>
ReportMethodInt(int iRes,C * pC,const char * szClass,const char * szMethod,const char * szFile,const int iLine)118 int ReportMethodInt(int iRes, C* pC, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
119 {
120   if(iRes<0) {
121     ndbout << szFile << "(" << iLine << ") : ";
122     ndbout << szClass << "->" << szMethod << " return " << iRes << " : ";
123     ndbout << pC->getNdbError();
124     ndbout << endl;
125   }
126   return iRes;
127 }
128 
129 template <class R, class C>
130 R* ReportMethodPtr(R* pR, C* pC, const char* szVariable, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
131 {
132   if(!pR) {
133     ndbout << szFile << "(" << iLine << ") : ";
134     ndbout << szVariable << " = " << szClass << "->" << szMethod << " return " << " : ";
135     ndbout << pC->getNdbError();
136     ndbout << endl;
137   }
138   return pR;
139 }
140 
141 template <class C>
ReportMethodVoid(C * pC,const char * szClass,const char * szMethod,const char * szFile,const int iLine)142 void ReportMethodVoid(C* pC, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
143 {
144   if(pC->getNdbError().code) {
145     ndbout << szFile << "(" << iLine << ") : ";
146     ndbout << szClass << "->" << szMethod << " : ";
147     ndbout << pC->getNdbError();
148     ndbout << endl;
149   }
150 }
151 
152 
153 #endif /* DEBUG */
154 
155 
156 #ifdef RELEASE
157 
158 #define VerifyMethodInt(c, m) (c->m)
159 #define VerifyMethodPtr(v, c, m) (v=(c->m))
160 #define VerifyMethodVoid(c, m) (c->m)
161 
ReportMethodInt(int iRes,NdbConnection * pNdbConnection,const char * szClass,const char * szMethod,const char * szFile,const int iLine)162 int ReportMethodInt(int iRes, NdbConnection* pNdbConnection, const char* szClass, const char* szMethod, const char* szFile, const int iLine)
163 {
164   if(iRes<0) {
165     ndbout << szFile << "(" << iLine << ") : ";
166     ndbout << szClass << "->" << szMethod << " return " << iRes << " : ";
167     ndbout << pNdbConnection->getNdbError();
168     NdbOperation* pNdbOperation = pNdbConnection->getNdbErrorOperation();
169     if(pNdbOperation) {
170       ndbout << " : " << pNdbOperation->getNdbError();
171     }
172     ndbout << " : " << pNdbConnection->getNdbErrorLine();
173     ndbout << endl;
174   }
175   return iRes;
176 }
177 
178 #endif /* RELEASE */
179 
180 // epaulsa =>
181 #ifndef NODE_REC
182 #define CHK_TR(p)
183 #else
184 #define CHK_TR(p) if(!p){													\
185 			ndbout <<"startTransaction failed, returning now." << endl ;	\
186 			delete pNdb ;													\
187 			pNdb = NULL ;													\
188 			return 0 ;														\
189 		}
190 #endif // NODE_REC
191 // <= epaulsa
192 
193 const char* c_szWarehouse = "WAREHOUSE";
194 const char* c_szWarehouseNumber = "W_ID";
195 const char* c_szWarehouseSum = "W_SUM";
196 const char* c_szWarehouseCount = "W_CNT";
197 const char* c_szDistrict = "DISTRICT";
198 const char* c_szDistrictWarehouseNumber = "D_W_ID";
199 const char* c_szDistrictNumber = "D_ID";
200 const char* c_szDistrictSum = "D_SUM";
201 const char* c_szDistrictCount = "D_CNT";
202 
203 Uint32 g_nWarehouseCount = 10;
204 Uint32 g_nDistrictPerWarehouse = 10;
205 Uint32 g_nThreadCount = 1;
206 NdbMutex* g_pNdbMutex = 0;
207 
NdbThreadFuncInsert(void * pArg)208 extern "C" void* NdbThreadFuncInsert(void* pArg)
209 {
210   myRandom48Init((long int)NdbTick_CurrentMillisecond());
211   unsigned nSucc = 0;
212   unsigned nFail = 0;
213   Ndb* pNdb = NULL ;
214   pNdb = new Ndb("TEST_DB");
215   VerifyMethodInt(pNdb, init());
216   VerifyMethodInt(pNdb, waitUntilReady());
217 
218   while(NdbMutex_Trylock(g_pNdbMutex)) {
219     Uint32 nWarehouse = myRandom48(g_nWarehouseCount);
220     NdbConnection* pNdbConnection = NULL ;
221     VerifyMethodPtr(pNdbConnection, pNdb, startTransaction());
222     CHK_TR(pNdbConnection);
223     NdbOperation* pNdbOperationW = NULL ;
224     VerifyMethodPtr(pNdbOperationW, pNdbConnection, getNdbOperation(c_szWarehouse));
225     VerifyMethodInt(pNdbOperationW, insertTuple());
226     VerifyMethodInt(pNdbOperationW, equal(c_szWarehouseNumber, nWarehouse));
227     VerifyMethodInt(pNdbOperationW, setValue(c_szWarehouseCount, Uint32(1)));
228     Uint32 nWarehouseSum = 0;
229     for(Uint32 nDistrict=0; nDistrict<g_nDistrictPerWarehouse; ++nDistrict) {
230       NdbOperation* pNdbOperationD = NULL ;
231       VerifyMethodPtr(pNdbOperationD, pNdbConnection, getNdbOperation(c_szDistrict));
232       VerifyMethodInt(pNdbOperationD, insertTuple());
233       VerifyMethodInt(pNdbOperationD, equal(c_szDistrictWarehouseNumber, nWarehouse));
234       VerifyMethodInt(pNdbOperationD, equal(c_szDistrictNumber, nDistrict));
235       VerifyMethodInt(pNdbOperationD, setValue(c_szDistrictCount, Uint32(1)));
236       Uint32 nDistrictSum = myRandom48(100);
237       nWarehouseSum += nDistrictSum;
238       VerifyMethodInt(pNdbOperationD, setValue(c_szDistrictSum, nDistrictSum));
239     }
240     VerifyMethodInt(pNdbOperationW, setValue(c_szWarehouseSum, nWarehouseSum));
241     int iExec = pNdbConnection->execute(Commit);
242     int iError = pNdbConnection->getNdbError().code;
243 
244     if(iExec<0 && iError!=0 && iError!=266 && iError!=630) {
245       ReportMethodInt(iExec, pNdbConnection, "pNdbConnection", "execute(Commit)", __FILE__, __LINE__);
246     }
247     if(iExec==0) {
248       ++nSucc;
249     } else {
250       ++nFail;
251     }
252     VerifyMethodVoid(pNdb, closeTransaction(pNdbConnection));
253   }
254   ndbout << "insert: " << nSucc << " succeeded, " << nFail << " failed " << endl;
255   NdbMutex_Unlock(g_pNdbMutex);
256   delete pNdb;
257   pNdb = NULL ;
258   return NULL;
259 }
260 
261 
NdbThreadFuncUpdate(void * pArg)262 extern "C" void* NdbThreadFuncUpdate(void* pArg)
263 {
264   myRandom48Init((long int)NdbTick_CurrentMillisecond());
265   unsigned nSucc = 0;
266   unsigned nFail = 0;
267   Ndb* pNdb = NULL ;
268   pNdb = new Ndb("TEST_DB");
269   VerifyMethodInt(pNdb, init());
270   VerifyMethodInt(pNdb, waitUntilReady());
271 
272   while(NdbMutex_Trylock(g_pNdbMutex)) {
273     Uint32 nWarehouse = myRandom48(g_nWarehouseCount);
274     NdbConnection* pNdbConnection = NULL ;
275     VerifyMethodPtr(pNdbConnection, pNdb, startTransaction());
276     CHK_TR(pNdbConnection) ; // epaulsa
277     NdbOperation* pNdbOperationW = NULL ;
278     VerifyMethodPtr(pNdbOperationW, pNdbConnection, getNdbOperation(c_szWarehouse));
279     VerifyMethodInt(pNdbOperationW, interpretedUpdateTuple());
280     VerifyMethodInt(pNdbOperationW, equal(c_szWarehouseNumber, nWarehouse));
281     VerifyMethodInt(pNdbOperationW, incValue(c_szWarehouseCount, Uint32(1)));
282     Uint32 nWarehouseSum = 0;
283     for(Uint32 nDistrict=0; nDistrict<g_nDistrictPerWarehouse; ++nDistrict) {
284       NdbOperation* pNdbOperationD = NULL ;
285       VerifyMethodPtr(pNdbOperationD, pNdbConnection, getNdbOperation(c_szDistrict));
286       VerifyMethodInt(pNdbOperationD, interpretedUpdateTuple());
287       VerifyMethodInt(pNdbOperationD, equal(c_szDistrictWarehouseNumber, nWarehouse));
288       VerifyMethodInt(pNdbOperationD, equal(c_szDistrictNumber, nDistrict));
289       VerifyMethodInt(pNdbOperationD, incValue(c_szDistrictCount, Uint32(1)));
290       Uint32 nDistrictSum = myRandom48(100);
291       nWarehouseSum += nDistrictSum;
292       VerifyMethodInt(pNdbOperationD, setValue(c_szDistrictSum, nDistrictSum));
293     }
294     VerifyMethodInt(pNdbOperationW, setValue(c_szWarehouseSum, nWarehouseSum));
295     int iExec = pNdbConnection->execute(Commit);
296     int iError = pNdbConnection->getNdbError().code;
297 
298     if(iExec<0 && iError!=0 && iError!=266 && iError!=626) {
299       ReportMethodInt(iExec, pNdbConnection, "pNdbConnection", "execute(Commit)", __FILE__, __LINE__);
300     }
301     if(iExec==0) {
302       ++nSucc;
303     } else {
304       ++nFail;
305     }
306     VerifyMethodVoid(pNdb, closeTransaction(pNdbConnection));
307   }
308   ndbout << "update: " << nSucc << " succeeded, " << nFail << " failed " << endl;
309   NdbMutex_Unlock(g_pNdbMutex);
310   delete pNdb;
311   pNdb = NULL ;
312   return NULL;
313 }
314 
315 
NdbThreadFuncDelete(void * pArg)316 extern "C" void* NdbThreadFuncDelete(void* pArg)
317 {
318   myRandom48Init((long int)NdbTick_CurrentMillisecond());
319   unsigned nSucc = 0;
320   unsigned nFail = 0;
321   Ndb* pNdb = NULL ;
322   pNdb = new Ndb("TEST_DB");
323   VerifyMethodInt(pNdb, init());
324   VerifyMethodInt(pNdb, waitUntilReady());
325 
326   while(NdbMutex_Trylock(g_pNdbMutex)) {
327     Uint32 nWarehouse = myRandom48(g_nWarehouseCount);
328     NdbConnection* pNdbConnection = NULL ;
329     VerifyMethodPtr(pNdbConnection, pNdb, startTransaction());
330     CHK_TR(pNdbConnection) ; // epaulsa
331     NdbOperation* pNdbOperationW = NULL ;
332     VerifyMethodPtr(pNdbOperationW, pNdbConnection, getNdbOperation(c_szWarehouse));
333     VerifyMethodInt(pNdbOperationW, deleteTuple());
334     VerifyMethodInt(pNdbOperationW, equal(c_szWarehouseNumber, nWarehouse));
335     for(Uint32 nDistrict=0; nDistrict<g_nDistrictPerWarehouse; ++nDistrict) {
336       NdbOperation* pNdbOperationD = NULL ;
337       VerifyMethodPtr(pNdbOperationD, pNdbConnection, getNdbOperation(c_szDistrict));
338       VerifyMethodInt(pNdbOperationD, deleteTuple());
339       VerifyMethodInt(pNdbOperationD, equal(c_szDistrictWarehouseNumber, nWarehouse));
340       VerifyMethodInt(pNdbOperationD, equal(c_szDistrictNumber, nDistrict));
341     }
342     int iExec = pNdbConnection->execute(Commit);
343     int iError = pNdbConnection->getNdbError().code;
344 
345     if(iExec<0 && iError!=0 && iError!=266 && iError!=626) {
346       ReportMethodInt(iExec, pNdbConnection, "pNdbConnection", "execute(Commit)", __FILE__, __LINE__);
347     }
348     if(iExec==0) {
349       ++nSucc;
350     } else {
351       ++nFail;
352     }
353     VerifyMethodVoid(pNdb, closeTransaction(pNdbConnection));
354   }
355   ndbout << "delete: " << nSucc << " succeeded, " << nFail << " failed " << endl;
356   NdbMutex_Unlock(g_pNdbMutex);
357   delete pNdb;
358   pNdb = NULL ;
359   return NULL;
360 }
361 
362 
NdbThreadFuncRead(void * pArg)363 extern "C" void* NdbThreadFuncRead(void* pArg)
364 {
365   myRandom48Init((long int)NdbTick_CurrentMillisecond());
366   unsigned nSucc = 0;
367   unsigned nFail = 0;
368   NdbRecAttr** ppNdbRecAttrDSum = new NdbRecAttr*[g_nDistrictPerWarehouse];
369   NdbRecAttr** ppNdbRecAttrDCnt = new NdbRecAttr*[g_nDistrictPerWarehouse];
370   Ndb* pNdb = NULL ;
371   pNdb = new Ndb("TEST_DB");
372   VerifyMethodInt(pNdb, init());
373   VerifyMethodInt(pNdb, waitUntilReady());
374 
375   while(NdbMutex_Trylock(g_pNdbMutex)) {
376     Uint32 nWarehouse = myRandom48(g_nWarehouseCount);
377     NdbConnection* pNdbConnection = NULL ;
378     VerifyMethodPtr(pNdbConnection, pNdb, startTransaction());
379     CHK_TR(pNdbConnection) ; // epaulsa
380     NdbOperation* pNdbOperationW = NULL ;
381     VerifyMethodPtr(pNdbOperationW, pNdbConnection, getNdbOperation(c_szWarehouse));
382     VerifyMethodInt(pNdbOperationW, readTuple());
383     VerifyMethodInt(pNdbOperationW, equal(c_szWarehouseNumber, nWarehouse));
384     NdbRecAttr* pNdbRecAttrWSum;
385     VerifyMethodPtr(pNdbRecAttrWSum, pNdbOperationW, getValue(c_szWarehouseSum, 0));
386     NdbRecAttr* pNdbRecAttrWCnt;
387     VerifyMethodPtr(pNdbRecAttrWCnt, pNdbOperationW, getValue(c_szWarehouseCount, 0));
388     for(Uint32 nDistrict=0; nDistrict<g_nDistrictPerWarehouse; ++nDistrict) {
389       NdbOperation* pNdbOperationD = NULL ;
390       VerifyMethodPtr(pNdbOperationD, pNdbConnection, getNdbOperation(c_szDistrict));
391       VerifyMethodInt(pNdbOperationD, readTuple());
392       VerifyMethodInt(pNdbOperationD, equal(c_szDistrictWarehouseNumber, nWarehouse));
393       VerifyMethodInt(pNdbOperationD, equal(c_szDistrictNumber, nDistrict));
394       VerifyMethodPtr(ppNdbRecAttrDSum[nDistrict], pNdbOperationD, getValue(c_szDistrictSum, 0));
395       VerifyMethodPtr(ppNdbRecAttrDCnt[nDistrict], pNdbOperationD, getValue(c_szDistrictCount, 0));
396     }
397     int iExec = pNdbConnection->execute(Commit);
398     int iError = pNdbConnection->getNdbError().code;
399 
400     if(iExec<0 && iError!=0 && iError!=266 && iError!=626) {
401       ReportMethodInt(iExec, pNdbConnection, "pNdbConnection", "execute(Commit)", __FILE__, __LINE__);
402     }
403     if(iExec==0) {
404       Uint32 nSum = 0;
405       Uint32 nCnt = 0;
406       for(Uint32 nDistrict=0; nDistrict<g_nDistrictPerWarehouse; ++nDistrict) {
407         nSum += ppNdbRecAttrDSum[nDistrict]->u_32_value();
408         nCnt += ppNdbRecAttrDCnt[nDistrict]->u_32_value();
409       }
410       if(nSum!=pNdbRecAttrWSum->u_32_value()
411          || nCnt!=g_nDistrictPerWarehouse*pNdbRecAttrWCnt->u_32_value()) {
412         ndbout << "INCONSISTENT!" << endl;
413         ndbout << "iExec==" << iExec << endl;
414         ndbout << "iError==" << iError << endl;
415         ndbout << endl;
416         ndbout << c_szWarehouseSum << "==" << pNdbRecAttrWSum->u_32_value() << ", ";
417         ndbout << c_szWarehouseCount << "==" << pNdbRecAttrWCnt->u_32_value() << endl;
418         ndbout << "nSum==" << nSum << ", nCnt=" << nCnt << endl;
419         for(Uint32 nDistrict=0; nDistrict<g_nDistrictPerWarehouse; ++nDistrict) {
420           ndbout << c_szDistrictSum << "[" << nDistrict << "]==" << ppNdbRecAttrDSum[nDistrict]->u_32_value() << ", ";
421           ndbout << c_szDistrictCount << "[" << nDistrict << "]==" << ppNdbRecAttrDCnt[nDistrict]->u_32_value() << endl;
422         }
423         VerifyMethodVoid(pNdb, closeTransaction(pNdbConnection));
424         delete pNdb; pNdb = NULL ;
425         delete[] ppNdbRecAttrDSum; ppNdbRecAttrDSum = NULL ;
426         delete[] ppNdbRecAttrDCnt; ppNdbRecAttrDCnt = NULL ;
427         NDBT_ProgramExit(NDBT_FAILED);
428       }
429       ++nSucc;
430     } else {
431       ++nFail;
432     }
433     VerifyMethodVoid(pNdb, closeTransaction(pNdbConnection));
434   }
435   ndbout << "read: " << nSucc << " succeeded, " << nFail << " failed " << endl;
436   NdbMutex_Unlock(g_pNdbMutex);
437   delete pNdb; pNdb = NULL ;
438   delete[] ppNdbRecAttrDSum; ppNdbRecAttrDSum = NULL ;
439   delete[] ppNdbRecAttrDCnt; ppNdbRecAttrDCnt = NULL ;
440   return NULL;
441 }
442 
443 
444 NDB_COMMAND(acid, "acid", "acid", "acid", 65535)
445 {
446   ndb_init();
447   long nSeconds = 60;
448   int rc = NDBT_OK;
449 
450   for(int i=1; i<argc; ++i) {
451     if(argv[i][0]=='-' || argv[i][0]=='/') {
452       switch(argv[i][1]) {
453       case 'w': g_nWarehouseCount=atol(argv[i]+2); break;
454       case 'd': g_nDistrictPerWarehouse=atol(argv[i]+2); break;
455       case 's': nSeconds=atol(argv[i]+2); break;
456       case 't': g_nThreadCount=atol(argv[i]+2); break;
457       default: ndbout << "invalid option" << endl; return 1;
458       }
459     } else {
460       ndbout << "invalid operand" << endl;
461       return 1;
462     }
463   }
464   ndbout << argv[0];
465   ndbout << " -w" << g_nWarehouseCount;
466   ndbout << " -d" << g_nDistrictPerWarehouse;
467   ndbout << " -s" << (int)nSeconds;
468   ndbout << " -t" << g_nThreadCount;
469   ndbout << endl;
470 
471   Ndb* pNdb = NULL ;
472   pNdb = new Ndb("TEST_DB");
473   VerifyMethodInt(pNdb, init());
474   VerifyMethodInt(pNdb, waitUntilReady());
475 
476   NdbSchemaCon* pNdbSchemaCon= NdbSchemaCon::startSchemaTrans(pNdb);
477   if(!pNdbSchemaCon){
478     ndbout <<"startSchemaTransaction failed, exiting now" << endl ;
479     delete pNdb ;
480     NDBT_ProgramExit(NDBT_FAILED) ;
481   }
482   NdbSchemaOp* pNdbSchemaOp = NULL ;
483   VerifyMethodPtr(pNdbSchemaOp, pNdbSchemaCon, getNdbSchemaOp());
484   VerifyMethodInt(pNdbSchemaOp, createTable(
485                                             c_szWarehouse,
486                                             (4+4+4+12)*1.02*g_nWarehouseCount/1024+1,
487                                             TupleKey,
488                                             (4+14)*g_nWarehouseCount/8/1024+1));
489 
490   VerifyMethodInt(pNdbSchemaOp, createAttribute(c_szWarehouseNumber, TupleKey, 32, 1, UnSigned, MMBased, false));
491   VerifyMethodInt(pNdbSchemaOp, createAttribute(c_szWarehouseSum, NoKey, 32, 1, UnSigned, MMBased, false));
492   VerifyMethodInt(pNdbSchemaOp, createAttribute(c_szWarehouseCount, NoKey, 32, 1, UnSigned, MMBased, false));
493   VerifyMethodInt(pNdbSchemaCon, execute());
494   NdbSchemaCon::closeSchemaTrans(pNdbSchemaCon);
495 
496   pNdbSchemaCon= NdbSchemaCon::startSchemaTrans(pNdb);
497   VerifyMethodPtr(pNdbSchemaOp, pNdbSchemaCon, getNdbSchemaOp());
498   VerifyMethodInt(pNdbSchemaOp, createTable(
499                                             c_szDistrict,
500                                             (4+4+4+4+12)*1.02*g_nWarehouseCount*g_nDistrictPerWarehouse/1024+1,
501                                             TupleKey,
502                                             (4+4+14)*g_nWarehouseCount*g_nDistrictPerWarehouse/8/1024+1));
503 
504 
505   VerifyMethodInt(pNdbSchemaOp, createAttribute(c_szDistrictWarehouseNumber, TupleKey, 32, 1, UnSigned, MMBased, false));
506   VerifyMethodInt(pNdbSchemaOp, createAttribute(c_szDistrictNumber, TupleKey, 32, 1, UnSigned, MMBased, false));
507   VerifyMethodInt(pNdbSchemaOp, createAttribute(c_szDistrictSum, NoKey, 32, 1, UnSigned, MMBased, false));
508   VerifyMethodInt(pNdbSchemaOp, createAttribute(c_szDistrictCount, NoKey, 32, 1, UnSigned, MMBased, false));
509   VerifyMethodInt(pNdbSchemaCon, execute());
510   NdbSchemaCon::closeSchemaTrans(pNdbSchemaCon);
511   g_pNdbMutex = NdbMutex_Create();
512   NdbMutex_Lock(g_pNdbMutex);
513 
514   NdbThread** ppNdbThread = new NdbThread*[g_nThreadCount*4];
515   for(Uint32 nThread=0; nThread<g_nThreadCount; ++nThread) {
516     ppNdbThread[nThread*4+0] = NdbThread_Create(NdbThreadFuncInsert, 0, 65535, "insert",
517                                                 NDB_THREAD_PRIO_LOW);
518     ppNdbThread[nThread*4+1] = NdbThread_Create(NdbThreadFuncUpdate, 0, 65535, "update",
519                                                 NDB_THREAD_PRIO_LOW);
520     ppNdbThread[nThread*4+2] = NdbThread_Create(NdbThreadFuncDelete, 0, 65535, "delete",
521                                                 NDB_THREAD_PRIO_LOW);
522     ppNdbThread[nThread*4+3] = NdbThread_Create(NdbThreadFuncRead, 0, 65535, "read",
523                                                 NDB_THREAD_PRIO_LOW);
524   }
525 
526   NdbSleep_SecSleep(nSeconds);
527   NdbMutex_Unlock(g_pNdbMutex);
528 
529   void* pStatus;
530   for(Uint32 nThread=0; nThread<g_nThreadCount; ++nThread) {
531     NdbThread_WaitFor(ppNdbThread[nThread*4+0], &pStatus);
532     NdbThread_WaitFor(ppNdbThread[nThread*4+1], &pStatus);
533     NdbThread_WaitFor(ppNdbThread[nThread*4+2], &pStatus);
534     NdbThread_WaitFor(ppNdbThread[nThread*4+3], &pStatus);
535   }
536 
537   NdbMutex_Destroy(g_pNdbMutex);
538   delete[] ppNdbThread;
539   delete pNdb;
540   return NDBT_ProgramExit(rc);
541 }
542 
543