1 /*
2  Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License, version 2.0,
6  as published by the Free Software Foundation.
7 
8  This program is also distributed with certain software (including
9  but not limited to OpenSSL) that is licensed under separate terms,
10  as designated in a particular file or component or in included license
11  documentation.  The authors of MySQL hereby grant you an additional
12  permission to link the program and your derivative works with the
13  separately licensed software that they have included with MySQL.
14 
15  This program is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  GNU General Public License, version 2.0, for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with this program; if not, write to the Free Software
22  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include "NDBT_Test.hpp"
27 #include "NDBT_ReturnCodes.h"
28 #include "HugoTransactions.hpp"
29 #include "HugoAsynchTransactions.hpp"
30 #include "UtilTransactions.hpp"
31 #include "random.h"
32 #include "../../src/ndbapi/NdbWaitGroup.hpp"
33 
34 
35 class NdbPool : private NdbLockable {
36   public:
NdbPool(Ndb_cluster_connection * _conn)37     NdbPool(Ndb_cluster_connection *_conn) : conn(_conn), list(0), size(0),
38                                              created(0) {};
39     Ndb * getNdb();
40     void recycleNdb(Ndb *n);
41     void closeAll();
42 
43   private:
44     Ndb_cluster_connection *conn;
45     Ndb * list;
46     int size, created;
47 };
48 
getNdb()49 Ndb * NdbPool::getNdb() {
50   Ndb * n;
51   lock();
52   if(list) {
53     n = list;
54     list = (Ndb *) n->getCustomData();
55     size--;
56   }
57   else {
58     n = new Ndb(conn);
59     n->init();
60     created++;
61   }
62   unlock();
63   return n;
64 }
65 
recycleNdb(Ndb * n)66 void NdbPool::recycleNdb(Ndb *n) {
67   lock();
68   n->setCustomData(list);
69   list = n;
70   size++;
71   unlock();
72 }
73 
closeAll()74 void NdbPool::closeAll() {
75   lock();
76   while(list) {
77     Ndb *n = list;
78     list = (Ndb *) n->getCustomData();
79     delete n;
80   }
81   size = 0;
82   unlock();
83 }
84 
85 NdbWaitGroup * global_poll_group;
86 NdbPool * global_ndb_pool;
87 
88 #define check(b, e) \
89   if (!(b)) { g_err << "ERR: " << step->getName() << " failed on line " \
90   << __LINE__ << ": " << e.getNdbError() << endl; return NDBT_FAILED; }
91 
92 
runSetup(NDBT_Context * ctx,NDBT_Step * step,int waitGroupSize)93 int runSetup(NDBT_Context* ctx, NDBT_Step* step, int waitGroupSize){
94 
95   int records = ctx->getNumRecords();
96   int batchSize = ctx->getProperty("BatchSize", 1);
97   int transactions = (records / 100) + 1;
98   int operations = (records / transactions) + 1;
99   Ndb* pNdb = GETNDB(step);
100 
101   HugoAsynchTransactions hugoTrans(*ctx->getTab());
102   if (hugoTrans.loadTableAsynch(pNdb, records, batchSize,
103 				transactions, operations) != 0){
104     return NDBT_FAILED;
105   }
106 
107   Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
108 
109   /* The first call to create_multi_ndb_wait_group() should succeed ... */
110   global_poll_group = conn->create_ndb_wait_group(waitGroupSize);
111   if(global_poll_group == 0) {
112     return NDBT_FAILED;
113   }
114 
115   /* and subsequent calls should fail */
116   if(conn->create_ndb_wait_group(waitGroupSize) != 0) {
117     return NDBT_FAILED;
118   }
119 
120   return NDBT_OK;
121 }
122 
123 /* NdbWaitGroup version 1 API uses a fixed-size wait group.
124    It cannot grow.  We size it at 1000 Ndbs.
125 */
runSetup_v1(NDBT_Context * ctx,NDBT_Step * step)126 int runSetup_v1(NDBT_Context* ctx, NDBT_Step* step) {
127   return runSetup(ctx, step, 1000);
128 }
129 
130 /* Version 2 of the API will allow the wait group to grow on
131    demand, so we start small.
132 */
runSetup_v2(NDBT_Context * ctx,NDBT_Step * step)133 int runSetup_v2(NDBT_Context* ctx, NDBT_Step* step) {
134   Ndb* pNdb = GETNDB(step);
135   Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
136   global_ndb_pool = new NdbPool(conn);
137   return runSetup(ctx, step, 7);
138 }
139 
runCleanup(NDBT_Context * ctx,NDBT_Step * step)140 int runCleanup(NDBT_Context* ctx, NDBT_Step* step){
141   int records = ctx->getNumRecords();
142   int batchSize = ctx->getProperty("BatchSize", 1);
143   int transactions = (records / 100) + 1;
144   int operations = (records / transactions) + 1;
145   Ndb* pNdb = GETNDB(step);
146 
147   HugoAsynchTransactions hugoTrans(*ctx->getTab());
148   if (hugoTrans.pkDelRecordsAsynch(pNdb,  records, batchSize,
149 				   transactions, operations) != 0){
150     return NDBT_FAILED;
151   }
152 
153   pNdb->get_ndb_cluster_connection().release_ndb_wait_group(global_poll_group);
154 
155   return NDBT_OK;
156 }
157 
158 
runPkReadMultiBasic(NDBT_Context * ctx,NDBT_Step * step)159 int runPkReadMultiBasic(NDBT_Context* ctx, NDBT_Step* step){
160   int loops = ctx->getNumLoops();
161   int records = ctx->getNumRecords();
162   const int MAX_NDBS = 200;
163   Ndb* pNdb = GETNDB(step);
164   Ndb_cluster_connection* conn = &pNdb->get_ndb_cluster_connection();
165 
166   int i = 0;
167   HugoOperations hugoOps(*ctx->getTab());
168 
169   Ndb* ndbObjs[ MAX_NDBS ];
170   NdbTransaction* transArray[ MAX_NDBS ];
171   Ndb ** ready_ndbs;
172 
173   for (int j=0; j < MAX_NDBS; j++)
174   {
175     Ndb* ndb = new Ndb(conn);
176     check(ndb->init() == 0, (*ndb));
177     ndbObjs[ j ] = ndb;
178   }
179 
180   while (i<loops) {
181     ndbout << "Loop : " << i << ": ";
182     int recordsLeft = records;
183 
184     do
185     {
186       /* Define and execute Pk read requests on
187        * different Ndb objects
188        */
189       int ndbcnt = 0;
190       int pollcnt = 0;
191       int lumpsize = 1 + myRandom48(MIN(recordsLeft, MAX_NDBS));
192       while(lumpsize &&
193             recordsLeft &&
194             ndbcnt < MAX_NDBS)
195       {
196         Ndb* ndb = ndbObjs[ ndbcnt ];
197         NdbTransaction* trans = ndb->startTransaction();
198         check(trans != NULL, (*ndb));
199         NdbOperation* readOp = trans->getNdbOperation(ctx->getTab());
200         check(readOp != NULL, (*trans));
201         check(readOp->readTuple() == 0, (*readOp));
202         check(hugoOps.equalForRow(readOp, recordsLeft) == 0, hugoOps);
203 
204         /* Read all other cols */
205         for (int k=0; k < ctx->getTab()->getNoOfColumns(); k++)
206         {
207           check(readOp->getValue(ctx->getTab()->getColumn(k)) != NULL,
208                 (*readOp));
209         }
210 
211         /* Now send em off */
212         trans->executeAsynchPrepare(NdbTransaction::Commit,
213                                     NULL,
214                                     NULL,
215                                     NdbOperation::AbortOnError);
216         ndb->sendPreparedTransactions();
217 
218         transArray[ndbcnt] = trans;
219         global_poll_group->addNdb(ndb);
220 
221         ndbcnt++;
222         pollcnt++;
223         recordsLeft--;
224         lumpsize--;
225       };
226 
227       /* Ok, now wait for the Ndbs to complete */
228       while (pollcnt)
229       {
230         /* Occasionally check with no timeout */
231         Uint32 timeout_millis = myRandom48(2)?10000:0;
232         int count = global_poll_group->wait(ready_ndbs, timeout_millis);
233 
234         if (count > 0)
235         {
236           for (int y=0; y < count; y++)
237           {
238             Ndb *ndb = ready_ndbs[y];
239             check(ndb->pollNdb(0, 1) != 0, (*ndb));
240           }
241           pollcnt -= count;
242         }
243       }
244 
245       /* Ok, now close the transactions */
246       for (int t=0; t < ndbcnt; t++)
247       {
248         transArray[t]->close();
249       }
250     } while (recordsLeft);
251 
252     i++;
253   }
254 
255   for (int j=0; j < MAX_NDBS; j++)
256   {
257     delete ndbObjs[ j ];
258   }
259 
260   return NDBT_OK;
261 }
262 
runPkReadMultiWakeupT1(NDBT_Context * ctx,NDBT_Step * step)263 int runPkReadMultiWakeupT1(NDBT_Context* ctx, NDBT_Step* step)
264 {
265   HugoOperations hugoOps(*ctx->getTab());
266   Ndb* ndb = GETNDB(step);
267   Uint32 phase = ctx->getProperty("PHASE");
268 
269   if (phase != 0)
270   {
271     ndbout << "Thread 1 : Error, initial phase should be 0 not " << phase << endl;
272     return NDBT_FAILED;
273   };
274 
275   /* We now start a transaction, locking row 0 */
276   ndbout << "Thread 1 : Starting transaction locking row 0..." << endl;
277   check(hugoOps.startTransaction(ndb) == 0, hugoOps);
278   check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0,
279         hugoOps);
280   check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps);
281 
282   ndbout << "Thread 1 : Lock taken." << endl;
283   ndbout << "Thread 1 : Triggering Thread 2 by move to phase 1" << endl;
284   /* Ok, now get thread 2 to try to read row */
285   ctx->incProperty("PHASE"); /* Set to 1 */
286 
287   /* Here, loop waking up waiter on the cluster connection */
288   /* Check the property has not moved to phase 2 */
289   ndbout << "Thread 1 : Performing async wakeup until phase changes to 2"
290   << endl;
291   while (ctx->getProperty("PHASE") != 2)
292   {
293     global_poll_group->wakeup();
294     NdbSleep_MilliSleep(500);
295   }
296 
297   ndbout << "Thread 1 : Phase changed to 2, committing transaction "
298   << "and releasing lock" << endl;
299 
300   /* Ok, give them a break, commit transaction */
301   check(hugoOps.execute_Commit(ndb) ==0, hugoOps);
302   hugoOps.closeTransaction(ndb);
303 
304   ndbout << "Thread 1 : Finished" << endl;
305   return NDBT_OK;
306 }
307 
runPkReadMultiWakeupT2(NDBT_Context * ctx,NDBT_Step * step)308 int runPkReadMultiWakeupT2(NDBT_Context* ctx, NDBT_Step* step)
309 {
310   ndbout << "Thread 2 : Waiting for phase 1 notification from Thread 1" << endl;
311   ctx->getPropertyWait("PHASE", 1);
312 
313   /* Ok, now thread 1 has locked row 1, we'll attempt to read
314    * it, using the multi_ndb_wait Api to block
315    */
316   HugoOperations hugoOps(*ctx->getTab());
317   Ndb* ndb = GETNDB(step);
318 
319   ndbout << "Thread 2 : Starting async transaction to read row" << endl;
320   check(hugoOps.startTransaction(ndb) == 0, hugoOps);
321   check(hugoOps.pkReadRecord(ndb, 0, 1, NdbOperation::LM_Exclusive) == 0,
322         hugoOps);
323   /* Prepare, Send */
324   check(hugoOps.execute_async(ndb,
325                               NdbTransaction::Commit,
326                               NdbOperation::AbortOnError) == 0,
327         hugoOps);
328 
329   global_poll_group->addNdb(ndb);
330   Ndb ** ready_ndbs;
331   int wait_rc = 0;
332   int acknowledged = 0;
333   do
334   {
335     ndbout << "Thread 2 : Calling NdbWaitGroup::wait()" << endl;
336     wait_rc = global_poll_group->wait(ready_ndbs, 10000);
337     ndbout << "           Result : " << wait_rc << endl;
338     if (wait_rc == 0)
339     {
340       if (!acknowledged)
341       {
342         ndbout << "Thread 2 : Woken up, moving to phase 2" << endl;
343         ctx->incProperty("PHASE");
344         acknowledged = 1;
345       }
346     }
347     else if (wait_rc > 0)
348     {
349       ndbout << "Thread 2 : Transaction completed" << endl;
350       ndb->pollNdb(1,0);
351       hugoOps.closeTransaction(ndb);
352     }
353   } while (wait_rc == 0);
354 
355   return (wait_rc == 1 ? NDBT_OK : NDBT_FAILED);
356 }
357 
358 /* Version 2 API tests */
359 #define V2_NLOOPS 32
360 
361 /* Producer thread */
runV2MultiWait_Producer(NDBT_Context * ctx,NDBT_Step * step,int thd_id,int nthreads)362 int runV2MultiWait_Producer(NDBT_Context* ctx, NDBT_Step* step,
363                            int thd_id, int nthreads)
364 {
365   int records = ctx->getNumRecords();
366   HugoOperations hugoOps(*ctx->getTab());
367 
368   /* For three threads (2 producers + 1 consumer) we loop 0-7.
369      producer 0 is slow if (loop & 1)
370      producer 1 is slow if (loop & 2)
371      consumer is slow if (loop & 4)
372   */
373   for (int loop = 0; loop < V2_NLOOPS; loop++)
374   {
375     ctx->getPropertyWait("LOOP", loop+1);
376     bool slow = loop & (thd_id+1);
377     for (int j=0; j < records; j++)
378     {
379       if(j % nthreads == thd_id)
380       {
381         Ndb* ndb = global_ndb_pool->getNdb();
382         NdbTransaction* trans = ndb->startTransaction();
383         check(trans != NULL, (*ndb));
384         ndb->setCustomData(trans);
385 
386         NdbOperation* readOp = trans->getNdbOperation(ctx->getTab());
387         check(readOp != NULL, (*trans));
388         check(readOp->readTuple() == 0, (*readOp));
389         check(hugoOps.equalForRow(readOp, j) == 0, hugoOps);
390 
391         /* Read all other cols */
392         for (int k=0; k < ctx->getTab()->getNoOfColumns(); k++)
393         {
394           check(readOp->getValue(ctx->getTab()->getColumn(k)) != NULL,
395                 (*readOp));
396         }
397 
398         trans->executeAsynchPrepare(NdbTransaction::Commit,
399                                     NULL,
400                                     NULL,
401                                     NdbOperation::AbortOnError);
402         ndb->sendPreparedTransactions();
403         global_poll_group->push(ndb);
404         if(slow)
405         {
406           int tm = myRandom48(3) * myRandom48(3);
407           if(tm) NdbSleep_MilliSleep(tm);
408         }
409       }
410     }
411   }
412   return NDBT_OK;
413 }
414 
runV2MultiWait_Push_Thd0(NDBT_Context * ctx,NDBT_Step * step)415 int runV2MultiWait_Push_Thd0(NDBT_Context* ctx, NDBT_Step* step)
416 {
417   return runV2MultiWait_Producer(ctx, step, 0, 2);
418 }
419 
runV2MultiWait_Push_Thd1(NDBT_Context * ctx,NDBT_Step * step)420 int runV2MultiWait_Push_Thd1(NDBT_Context* ctx, NDBT_Step* step)
421 {
422   return runV2MultiWait_Producer(ctx, step, 1, 2);
423 }
424 
425 
426 /* Consumer */
runV2MultiWait_WaitPop_Thread(NDBT_Context * ctx,NDBT_Step * step)427 int runV2MultiWait_WaitPop_Thread(NDBT_Context* ctx, NDBT_Step* step)
428 {
429   static int iter = 0;  // keeps incrementing when test case is repeated
430   int records = ctx->getNumRecords();
431   const char * d[5] = { " fast"," slow"," slow",""," slow" };
432   const int timeout[3] = { 100, 1, 0 };
433   const int pct_wait[9] = { 0,0,0,50,50,50,100,100,100 };
434   for (int loop = 0; loop < V2_NLOOPS; loop++, iter++)
435   {
436     ctx->incProperty("LOOP");
437     ndbout << "V2 test: " << d[loop&1] << d[loop&2] << d[loop&4];
438     ndbout << " " << timeout[iter%3] << "/" << pct_wait[iter%9] << endl;
439     bool slow = loop & 4;
440     int nrec = 0;
441     while(nrec < records)
442     {
443       /* Occasionally check with no timeout */
444       global_poll_group->wait(timeout[iter%3], pct_wait[iter%9]);
445       Ndb * ndb = global_poll_group->pop();
446       while(ndb)
447       {
448         check(ndb->pollNdb(0, 1) != 0, (*ndb));
449         nrec++;
450         NdbTransaction *tx = (NdbTransaction *) ndb->getCustomData();
451         tx->close();
452         global_ndb_pool->recycleNdb(ndb);
453         ndb = global_poll_group->pop();
454       }
455       if(slow)
456       {
457          NdbSleep_MilliSleep(myRandom48(6));
458       }
459     }
460   }
461   ctx->stopTest();
462   global_ndb_pool->closeAll();
463   return NDBT_OK;
464 }
465 
runMiscUntilStopped(NDBT_Context * ctx,NDBT_Step * step)466 int runMiscUntilStopped(NDBT_Context* ctx, NDBT_Step* step){
467   int records = ctx->getNumRecords();
468   int i = 0;
469   Ndb * ndb = GETNDB(step);
470   HugoTransactions hugoTrans(*ctx->getTab());
471   while (ctx->isTestStopped() == false) {
472     int r = 0;
473     switch(i % 5) {
474       case 0:  // batch size = 2, random = 1
475         r = hugoTrans.pkReadRecords(ndb, records / 20, 2,
476                                     NdbOperation::LM_Read, 1);
477         break;
478       case 1:
479         r = hugoTrans.pkUpdateRecords(ndb, records / 20);
480         break;
481       case 2:
482         r = hugoTrans.scanReadRecords(ndb, records);
483         break;
484       case 3:
485         r = hugoTrans.scanUpdateRecords(ndb, records / 10);
486         break;
487       case 4:
488         NdbSleep_MilliSleep(records);
489         break;
490     }
491     if(r != 0) return NDBT_FAILED;
492     i++;
493   }
494   ndbout << "V2 Test misc thread: " << i << " transactions" << endl;
495   return NDBT_OK;
496 }
497 
sleepAndStop(NDBT_Context * ctx,NDBT_Step * step)498 int sleepAndStop(NDBT_Context* ctx, NDBT_Step* step){
499   sleep(20);
500   ctx->stopTest();
501   return NDBT_OK;
502 }
503 
504 NDBT_TESTSUITE(testAsynchMultiwait);
505 TESTCASE("AsynchMultiwaitPkRead",
506          "Verify NdbWaitGroup API (1 thread)") {
507   INITIALIZER(runSetup_v1);
508   STEP(runPkReadMultiBasic);
509   FINALIZER(runCleanup);
510 }
511 TESTCASE("AsynchMultiwaitWakeup",
512          "Verify wait-multi-ndb wakeup Api code") {
513   INITIALIZER(runSetup_v1);
514   TC_PROPERTY("PHASE", Uint32(0));
515   STEP(runPkReadMultiWakeupT1);
516   STEP(runPkReadMultiWakeupT2);
517   FINALIZER(runCleanup);
518 }
519 TESTCASE("AsynchMultiwait_Version2",
520          "Verify NdbWaitGroup API version 2") {
521   INITIALIZER(runSetup_v2);
522   TC_PROPERTY("LOOP", Uint32(0));
523   STEP(runV2MultiWait_Push_Thd0);
524   STEP(runV2MultiWait_Push_Thd1);
525   STEP(runV2MultiWait_WaitPop_Thread);
526   STEP(runMiscUntilStopped);
527   FINALIZER(runCleanup);
528 }
529 TESTCASE("JustMisc", "Just run the Scan test") {
530   INITIALIZER(runSetup_v2);
531   STEP(runMiscUntilStopped);
532   STEP(sleepAndStop);
533   FINALIZER(runCleanup);
534 }
535 NDBT_TESTSUITE_END(testAsynchMultiwait);
536 
main(int argc,const char ** argv)537 int main(int argc, const char** argv){
538   ndb_init();
539   NDBT_TESTSUITE_INSTANCE(testAsynchMultiwait);
540   return testAsynchMultiwait.execute(argc, argv);
541 }
542 
543