1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include "NDBT_Test.hpp"
27 #include "NDBT_ReturnCodes.h"
28 #include "HugoTransactions.hpp"
29 #include "HugoAsynchTransactions.hpp"
30 #include "UtilTransactions.hpp"
31 #include "random.h"
32 #include "../../src/ndbapi/NdbWaitGroup.hpp"
33
34
35 class NdbPool : private NdbLockable {
36 public:
NdbPool(Ndb_cluster_connection * _conn)37 NdbPool(Ndb_cluster_connection *_conn) : conn(_conn), list(0), size(0),
38 created(0) {};
39 Ndb * getNdb();
40 void recycleNdb(Ndb *n);
41 void closeAll();
42
43 private:
44 Ndb_cluster_connection *conn;
45 Ndb * list;
46 int size, created;
47 };
48
getNdb()49 Ndb * NdbPool::getNdb() {
50 Ndb * n;
51 lock();
52 if(list) {
53 n = list;
54 list = (Ndb *) n->getCustomData();
55 size--;
56 }
57 else {
58 n = new Ndb(conn);
59 n->init();
60 created++;
61 }
62 unlock();
63 return n;
64 }
65
recycleNdb(Ndb * n)66 void NdbPool::recycleNdb(Ndb *n) {
67 lock();
68 n->setCustomData(list);
69 list = n;
70 size++;
71 unlock();
72 }
73
closeAll()74 void NdbPool::closeAll() {
75 lock();
76 while(list) {
77 Ndb *n = list;
78 list = (Ndb *) n->getCustomData();
79 delete n;
80 }
81 size = 0;
82 unlock();
83 }
84
85 NdbWaitGroup * global_poll_group;
86 NdbPool * global_ndb_pool;
87
88 #define check(b, e) \
89 if (!(b)) { g_err << "ERR: " << step->getName() << " failed on line " \
90 << __LINE__ << ": " << e.getNdbError() << endl; return NDBT_FAILED; }
91
92
runSetup(NDBT_Context * ctx,NDBT_Step * step,int waitGroupSize)93 int runSetup(NDBT_Context* ctx, NDBT_Step* step, int waitGroupSize){
94
95 int records = ctx->getNumRecords();
96 int batchSize = ctx->getProperty("BatchSize", 1);
97 int transactions = (records / 100) + 1;
98 int operations = (records / transactions) + 1;
99 Ndb* pNdb = GETNDB(step);
100
101 HugoAsynchTransactions hugoTrans(*ctx->getTab());
102 if (hugoTrans.loadTableAsynch(pNdb, records, batchSize,
103 transactions, operations) != 0){
104 return NDBT_FAILED;
105 }
106
107 Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
108
109 /* The first call to create_multi_ndb_wait_group() should succeed ... */
110 global_poll_group = conn->create_ndb_wait_group(waitGroupSize);
111 if(global_poll_group == 0) {
112 return NDBT_FAILED;
113 }
114
115 /* and subsequent calls should fail */
116 if(conn->create_ndb_wait_group(waitGroupSize) != 0) {
117 return NDBT_FAILED;
118 }
119
120 return NDBT_OK;
121 }
122
123 /* NdbWaitGroup version 1 API uses a fixed-size wait group.
124 It cannot grow. We size it at 1000 Ndbs.
125 */
runSetup_v1(NDBT_Context * ctx,NDBT_Step * step)126 int runSetup_v1(NDBT_Context* ctx, NDBT_Step* step) {
127 return runSetup(ctx, step, 1000);
128 }
129
130 /* Version 2 of the API will allow the wait group to grow on
131 demand, so we start small.
132 */
runSetup_v2(NDBT_Context * ctx,NDBT_Step * step)133 int runSetup_v2(NDBT_Context* ctx, NDBT_Step* step) {
134 Ndb* pNdb = GETNDB(step);
135 Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
136 global_ndb_pool = new NdbPool(conn);
137 return runSetup(ctx, step, 7);
138 }
139
runCleanup(NDBT_Context * ctx,NDBT_Step * step)140 int runCleanup(NDBT_Context* ctx, NDBT_Step* step){
141 int records = ctx->getNumRecords();
142 int batchSize = ctx->getProperty("BatchSize", 1);
143 int transactions = (records / 100) + 1;
144 int operations = (records / transactions) + 1;
145 Ndb* pNdb = GETNDB(step);
146
147 HugoAsynchTransactions hugoTrans(*ctx->getTab());
148 if (hugoTrans.pkDelRecordsAsynch(pNdb, records, batchSize,
149 transactions, operations) != 0){
150 return NDBT_FAILED;
151 }
152
153 pNdb->get_ndb_cluster_connection().release_ndb_wait_group(global_poll_group);
154
155 return NDBT_OK;
156 }
157
158
runPkReadMultiBasic(NDBT_Context * ctx,NDBT_Step * step)159 int runPkReadMultiBasic(NDBT_Context* ctx, NDBT_Step* step){
160 int loops = ctx->getNumLoops();
161 int records = ctx->getNumRecords();
162 const int MAX_NDBS = 200;
163 Ndb* pNdb = GETNDB(step);
164 Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
165
166 int i = 0;
167 HugoOperations hugoOps(*ctx->getTab());
168
169 Ndb* ndbObjs[ MAX_NDBS ];
170 NdbTransaction* transArray[ MAX_NDBS ];
171 Ndb ** ready_ndbs;
172
173 for (int j=0; j < MAX_NDBS; j++)
174 {
175 Ndb* ndb = new Ndb(conn);
176 check(ndb->init() == 0, (*ndb));
177 ndbObjs[ j ] = ndb;
178 }
179
180 while (i<loops) {
181 ndbout << "Loop : " << i << ": ";
182 int recordsLeft = records;
183
184 do
185 {
186 /* Define and execute Pk read requests on
187 * different Ndb objects
188 */
189 int ndbcnt = 0;
190 int pollcnt = 0;
191 int lumpsize = 1 + myRandom48(MIN(recordsLeft, MAX_NDBS));
192 while(lumpsize &&
193 recordsLeft &&
194 ndbcnt < MAX_NDBS)
195 {
196 Ndb* ndb = ndbObjs[ ndbcnt ];
197 NdbTransaction* trans = ndb->startTransaction();
198 check(trans != NULL, (*ndb));
199 NdbOperation* readOp = trans->getNdbOperation(ctx->getTab());
200 check(readOp != NULL, (*trans));
201 check(readOp->readTuple() == 0, (*readOp));
202 check(hugoOps.equalForRow(readOp, recordsLeft) == 0, hugoOps);
203
204 /* Read all other cols */
205 for (int k=0; k < ctx->getTab()->getNoOfColumns(); k++)
206 {
207 check(readOp->getValue(ctx->getTab()->getColumn(k)) != NULL,
208 (*readOp));
209 }
210
211 /* Now send em off */
212 trans->executeAsynchPrepare(NdbTransaction::Commit,
213 NULL,
214 NULL,
215 NdbOperation::AbortOnError);
216 ndb->sendPreparedTransactions();
217
218 transArray[ndbcnt] = trans;
219 global_poll_group->addNdb(ndb);
220
221 ndbcnt++;
222 pollcnt++;
223 recordsLeft--;
224 lumpsize--;
225 };
226
227 /* Ok, now wait for the Ndbs to complete */
228 while (pollcnt)
229 {
230 /* Occasionally check with no timeout */
231 Uint32 timeout_millis = myRandom48(2)?10000:0;
232 int count = global_poll_group->wait(ready_ndbs, timeout_millis);
233
234 if (count > 0)
235 {
236 for (int y=0; y < count; y++)
237 {
238 Ndb *ndb = ready_ndbs[y];
239 check(ndb->pollNdb(0, 1) != 0, (*ndb));
240 }
241 pollcnt -= count;
242 }
243 }
244
245 /* Ok, now close the transactions */
246 for (int t=0; t < ndbcnt; t++)
247 {
248 transArray[t]->close();
249 }
250 } while (recordsLeft);
251
252 i++;
253 }
254
255 for (int j=0; j < MAX_NDBS; j++)
256 {
257 delete ndbObjs[ j ];
258 }
259
260 return NDBT_OK;
261 }
262
runPkReadMultiWakeupT1(NDBT_Context * ctx,NDBT_Step * step)263 int runPkReadMultiWakeupT1(NDBT_Context* ctx, NDBT_Step* step)
264 {
265 HugoOperations hugoOps(*ctx->getTab());
266 Ndb* ndb = GETNDB(step);
267 Uint32 phase = ctx->getProperty("PHASE");
268
269 if (phase != 0)
270 {
271 ndbout << "Thread 1 : Error, initial phase should be 0 not " << phase << endl;
272 return NDBT_FAILED;
273 };
274
275 /* We now start a transaction, locking row 0 */
276 ndbout << "Thread 1 : Starting transaction locking row 0..." << endl;
277 check(hugoOps.startTransaction(ndb) == 0, hugoOps);
278 check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0,
279 hugoOps);
280 check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps);
281
282 ndbout << "Thread 1 : Lock taken." << endl;
283 ndbout << "Thread 1 : Triggering Thread 2 by move to phase 1" << endl;
284 /* Ok, now get thread 2 to try to read row */
285 ctx->incProperty("PHASE"); /* Set to 1 */
286
287 /* Here, loop waking up waiter on the cluster connection */
288 /* Check the property has not moved to phase 2 */
289 ndbout << "Thread 1 : Performing async wakeup until phase changes to 2"
290 << endl;
291 while (ctx->getProperty("PHASE") != 2)
292 {
293 global_poll_group->wakeup();
294 NdbSleep_MilliSleep(500);
295 }
296
297 ndbout << "Thread 1 : Phase changed to 2, committing transaction "
298 << "and releasing lock" << endl;
299
300 /* Ok, give them a break, commit transaction */
301 check(hugoOps.execute_Commit(ndb) ==0, hugoOps);
302 hugoOps.closeTransaction(ndb);
303
304 ndbout << "Thread 1 : Finished" << endl;
305 return NDBT_OK;
306 }
307
runPkReadMultiWakeupT2(NDBT_Context * ctx,NDBT_Step * step)308 int runPkReadMultiWakeupT2(NDBT_Context* ctx, NDBT_Step* step)
309 {
310 ndbout << "Thread 2 : Waiting for phase 1 notification from Thread 1" << endl;
311 ctx->getPropertyWait("PHASE", 1);
312
313 /* Ok, now thread 1 has locked row 1, we'll attempt to read
314 * it, using the multi_ndb_wait Api to block
315 */
316 HugoOperations hugoOps(*ctx->getTab());
317 Ndb* ndb = GETNDB(step);
318
319 ndbout << "Thread 2 : Starting async transaction to read row" << endl;
320 check(hugoOps.startTransaction(ndb) == 0, hugoOps);
321 check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0,
322 hugoOps);
323 /* Prepare, Send */
324 check(hugoOps.execute_async(ndb,
325 NdbTransaction::Commit,
326 NdbOperation::AbortOnError) == 0,
327 hugoOps);
328
329 global_poll_group->addNdb(ndb);
330 Ndb ** ready_ndbs;
331 int wait_rc = 0;
332 int acknowledged = 0;
333 do
334 {
335 ndbout << "Thread 2 : Calling NdbWaitGroup::wait()" << endl;
336 wait_rc = global_poll_group->wait(ready_ndbs, 10000);
337 ndbout << " Result : " << wait_rc << endl;
338 if (wait_rc == 0)
339 {
340 if (!acknowledged)
341 {
342 ndbout << "Thread 2 : Woken up, moving to phase 2" << endl;
343 ctx->incProperty("PHASE");
344 acknowledged = 1;
345 }
346 }
347 else if (wait_rc > 0)
348 {
349 ndbout << "Thread 2 : Transaction completed" << endl;
350 ndb->pollNdb(1,0);
351 hugoOps.closeTransaction(ndb);
352 }
353 } while (wait_rc == 0);
354
355 return (wait_rc == 1 ? NDBT_OK : NDBT_FAILED);
356 }
357
358 /* Version 2 API tests */
359 #define V2_NLOOPS 32
360
361 /* Producer thread */
runV2MultiWait_Producer(NDBT_Context * ctx,NDBT_Step * step,int thd_id,int nthreads)362 int runV2MultiWait_Producer(NDBT_Context* ctx, NDBT_Step* step,
363 int thd_id, int nthreads)
364 {
365 int records = ctx->getNumRecords();
366 HugoOperations hugoOps(*ctx->getTab());
367
368 /* For three threads (2 producers + 1 consumer) we loop 0-7.
369 producer 0 is slow if (loop & 1)
370 producer 1 is slow if (loop & 2)
371 consumer is slow if (loop & 4)
372 */
373 for (int loop = 0; loop < V2_NLOOPS; loop++)
374 {
375 ctx->getPropertyWait("LOOP", loop+1);
376 bool slow = loop & (thd_id+1);
377 for (int j=0; j < records; j++)
378 {
379 if(j % nthreads == thd_id)
380 {
381 Ndb* ndb = global_ndb_pool->getNdb();
382 NdbTransaction* trans = ndb->startTransaction();
383 check(trans != NULL, (*ndb));
384 ndb->setCustomData(trans);
385
386 NdbOperation* readOp = trans->getNdbOperation(ctx->getTab());
387 check(readOp != NULL, (*trans));
388 check(readOp->readTuple() == 0, (*readOp));
389 check(hugoOps.equalForRow(readOp, j) == 0, hugoOps);
390
391 /* Read all other cols */
392 for (int k=0; k < ctx->getTab()->getNoOfColumns(); k++)
393 {
394 check(readOp->getValue(ctx->getTab()->getColumn(k)) != NULL,
395 (*readOp));
396 }
397
398 trans->executeAsynchPrepare(NdbTransaction::Commit,
399 NULL,
400 NULL,
401 NdbOperation::AbortOnError);
402 ndb->sendPreparedTransactions();
403 global_poll_group->push(ndb);
404 if(slow)
405 {
406 int tm = myRandom48(3) * myRandom48(3);
407 if(tm) NdbSleep_MilliSleep(tm);
408 }
409 }
410 }
411 }
412 return NDBT_OK;
413 }
414
runV2MultiWait_Push_Thd0(NDBT_Context * ctx,NDBT_Step * step)415 int runV2MultiWait_Push_Thd0(NDBT_Context* ctx, NDBT_Step* step)
416 {
417 return runV2MultiWait_Producer(ctx, step, 0, 2);
418 }
419
runV2MultiWait_Push_Thd1(NDBT_Context * ctx,NDBT_Step * step)420 int runV2MultiWait_Push_Thd1(NDBT_Context* ctx, NDBT_Step* step)
421 {
422 return runV2MultiWait_Producer(ctx, step, 1, 2);
423 }
424
425
426 /* Consumer */
runV2MultiWait_WaitPop_Thread(NDBT_Context * ctx,NDBT_Step * step)427 int runV2MultiWait_WaitPop_Thread(NDBT_Context* ctx, NDBT_Step* step)
428 {
429 static int iter = 0; // keeps incrementing when test case is repeated
430 int records = ctx->getNumRecords();
431 const char * d[5] = { " fast"," slow"," slow",""," slow" };
432 const int timeout[3] = { 100, 1, 0 };
433 const int pct_wait[9] = { 0,0,0,50,50,50,100,100,100 };
434 for (int loop = 0; loop < V2_NLOOPS; loop++, iter++)
435 {
436 ctx->incProperty("LOOP");
437 ndbout << "V2 test: " << d[loop&1] << d[loop&2] << d[loop&4];
438 ndbout << " " << timeout[iter%3] << "/" << pct_wait[iter%9] << endl;
439 bool slow = loop & 4;
440 int nrec = 0;
441 while(nrec < records)
442 {
443 /* Occasionally check with no timeout */
444 global_poll_group->wait(timeout[iter%3], pct_wait[iter%9]);
445 Ndb * ndb = global_poll_group->pop();
446 while(ndb)
447 {
448 check(ndb->pollNdb(0, 1) != 0, (*ndb));
449 nrec++;
450 NdbTransaction *tx = (NdbTransaction *) ndb->getCustomData();
451 tx->close();
452 global_ndb_pool->recycleNdb(ndb);
453 ndb = global_poll_group->pop();
454 }
455 if(slow)
456 {
457 NdbSleep_MilliSleep(myRandom48(6));
458 }
459 }
460 }
461 ctx->stopTest();
462 global_ndb_pool->closeAll();
463 return NDBT_OK;
464 }
465
runMiscUntilStopped(NDBT_Context * ctx,NDBT_Step * step)466 int runMiscUntilStopped(NDBT_Context* ctx, NDBT_Step* step){
467 int records = ctx->getNumRecords();
468 int i = 0;
469 Ndb * ndb = GETNDB(step);
470 HugoTransactions hugoTrans(*ctx->getTab());
471 while (ctx->isTestStopped() == false) {
472 int r = 0;
473 switch(i % 5) {
474 case 0: // batch size = 2, random = 1
475 r = hugoTrans.pkReadRecords(ndb, records / 20, 2,
476 NdbOperation::LM_Read, 1);
477 break;
478 case 1:
479 r = hugoTrans.pkUpdateRecords(ndb, records / 20);
480 break;
481 case 2:
482 r = hugoTrans.scanReadRecords(ndb, records);
483 break;
484 case 3:
485 r = hugoTrans.scanUpdateRecords(ndb, records / 10);
486 break;
487 case 4:
488 NdbSleep_MilliSleep(records);
489 break;
490 }
491 if(r != 0) return NDBT_FAILED;
492 i++;
493 }
494 ndbout << "V2 Test misc thread: " << i << " transactions" << endl;
495 return NDBT_OK;
496 }
497
sleepAndStop(NDBT_Context * ctx,NDBT_Step * step)498 int sleepAndStop(NDBT_Context* ctx, NDBT_Step* step){
499 sleep(20);
500 ctx->stopTest();
501 return NDBT_OK;
502 }
503
504 NDBT_TESTSUITE(testAsynchMultiwait);
505 TESTCASE("AsynchMultiwaitPkRead",
506 "Verify NdbWaitGroup API (1 thread)") {
507 INITIALIZER(runSetup_v1);
508 STEP(runPkReadMultiBasic);
509 FINALIZER(runCleanup);
510 }
511 TESTCASE("AsynchMultiwaitWakeup",
512 "Verify wait-multi-ndb wakeup Api code") {
513 INITIALIZER(runSetup_v1);
514 TC_PROPERTY("PHASE", Uint32(0));
515 STEP(runPkReadMultiWakeupT1);
516 STEP(runPkReadMultiWakeupT2);
517 FINALIZER(runCleanup);
518 }
519 TESTCASE("AsynchMultiwait_Version2",
520 "Verify NdbWaitGroup API version 2") {
521 INITIALIZER(runSetup_v2);
522 TC_PROPERTY("LOOP", Uint32(0));
523 STEP(runV2MultiWait_Push_Thd0);
524 STEP(runV2MultiWait_Push_Thd1);
525 STEP(runV2MultiWait_WaitPop_Thread);
526 STEP(runMiscUntilStopped);
527 FINALIZER(runCleanup);
528 }
529 TESTCASE("JustMisc", "Just run the Scan test") {
530 INITIALIZER(runSetup_v2);
531 STEP(runMiscUntilStopped);
532 STEP(sleepAndStop);
533 FINALIZER(runCleanup);
534 }
535 NDBT_TESTSUITE_END(testAsynchMultiwait);
536
main(int argc,const char ** argv)537 int main(int argc, const char** argv){
538 ndb_init();
539 NDBT_TESTSUITE_INSTANCE(testAsynchMultiwait);
540 return testAsynchMultiwait.execute(argc, argv);
541 }
542
543