1 /*
2    Copyright (C) 2003-2008 MySQL AB, 2008, 2009 Sun Microsystems, Inc.
3     All rights reserved. Use is subject to license terms.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License, version 2.0,
7    as published by the Free Software Foundation.
8 
9    This program is also distributed with certain software (including
10    but not limited to OpenSSL) that is licensed under separate terms,
11    as designated in a particular file or component or in included license
12    documentation.  The authors of MySQL hereby grant you an additional
13    permission to link the program and your derivative works with the
14    separately licensed software that they have included with MySQL.
15 
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License, version 2.0, for more details.
20 
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 */
25 
26 #include "UtilTransactions.hpp"
27 #include <NdbSleep.h>
28 #include <NdbScanFilter.hpp>
29 
30 #define VERBOSE 0
31 
UtilTransactions(const NdbDictionary::Table & _tab,const NdbDictionary::Index * _idx)32 UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab,
33 				   const NdbDictionary::Index* _idx):
34   tab(_tab), idx(_idx), pTrans(0)
35 {
36   m_defaultClearMethod = 3;
37 }
38 
UtilTransactions(Ndb * ndb,const char * name,const char * index)39 UtilTransactions::UtilTransactions(Ndb* ndb,
40 				   const char * name,
41 				   const char * index) :
42   tab(* ndb->getDictionary()->getTable(name)),
43   idx(index ? ndb->getDictionary()->getIndex(index, name) : 0),
44   pTrans(0)
45 {
46   m_defaultClearMethod = 3;
47 }
48 
49 #define RESTART_SCAN 99
50 
51 #define RETURN_FAIL(err) return (err.code != 0 ? err.code : NDBT_FAILED)
52 
53 int
clearTable(Ndb * pNdb,NdbScanOperation::ScanFlag flags,int records,int parallelism)54 UtilTransactions::clearTable(Ndb* pNdb,
55                              NdbScanOperation::ScanFlag flags,
56                              int records,
57                              int parallelism){
58   // Scan all records exclusive and delete
59   // them one by one
60   int                  retryAttempt = 0;
61   const int            retryMax = 10;
62   int deletedRows = 0;
63   int check;
64   NdbScanOperation *pOp;
65   NdbError err;
66 
67   int par = parallelism;
68   while (true){
69   restart:
70     if (retryAttempt++ >= retryMax){
71       g_info << "ERROR: has retried this operation " << retryAttempt
72 	     << " times, failing!" << endl;
73       return NDBT_FAILED;
74     }
75 
76     pTrans = pNdb->startTransaction();
77     if (pTrans == NULL) {
78       err = pNdb->getNdbError();
79       if (err.status == NdbError::TemporaryError){
80 	ERR(err);
81 	NdbSleep_MilliSleep(50);
82 	continue;
83       }
84       goto failed;
85     }
86 
87     pOp = getScanOperation(pTrans);
88     if (pOp == NULL) {
89       err = pTrans->getNdbError();
90       if(err.status == NdbError::TemporaryError){
91 	ERR(err);
92 	closeTransaction(pNdb);
93 	NdbSleep_MilliSleep(50);
94 	par = 1;
95 	goto restart;
96       }
97       goto failed;
98     }
99 
100     if( pOp->readTuples(NdbOperation::LM_Exclusive, flags, par) ) {
101       err = pTrans->getNdbError();
102       goto failed;
103     }
104 
105     if(pTrans->execute(NoCommit, AbortOnError) != 0){
106       err = pTrans->getNdbError();
107       if(err.status == NdbError::TemporaryError){
108 	ERR(err);
109 	closeTransaction(pNdb);
110 	NdbSleep_MilliSleep(50);
111 	continue;
112       }
113       goto failed;
114     }
115 
116     while((check = pOp->nextResult(true)) == 0){
117       do {
118 	if (pOp->deleteCurrentTuple() != 0){
119 	  goto failed;
120 	}
121 	deletedRows++;
122       } while((check = pOp->nextResult(false)) == 0);
123 
124       if(check != -1){
125 	check = pTrans->execute(Commit, AbortOnError);
126 	pTrans->restart();
127       }
128 
129       err = pTrans->getNdbError();
130       if(check == -1){
131 	if(err.status == NdbError::TemporaryError){
132 	  ERR(err);
133 	  closeTransaction(pNdb);
134 	  NdbSleep_MilliSleep(50);
135 	  par = 1;
136 	  goto restart;
137 	}
138 	goto failed;
139       }
140     }
141     if(check == -1){
142       err = pTrans->getNdbError();
143       if(err.status == NdbError::TemporaryError){
144 	ERR(err);
145 	closeTransaction(pNdb);
146 	NdbSleep_MilliSleep(50);
147 	par = 1;
148 	goto restart;
149       }
150       goto failed;
151     }
152     closeTransaction(pNdb);
153     return NDBT_OK;
154   }
155   return NDBT_FAILED;
156 
157  failed:
158   if(pTrans != 0) closeTransaction(pNdb);
159   ERR(err);
160   return (err.code != 0 ? err.code : NDBT_FAILED);
161 }
162 
163 int
clearTable(Ndb * pNdb,int records,int parallelism)164 UtilTransactions::clearTable(Ndb* pNdb,
165 			     int records,
166 			     int parallelism){
167 
168   return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
169                     records, parallelism);
170 }
171 
172 
173 int
clearTable1(Ndb * pNdb,int records,int parallelism)174 UtilTransactions::clearTable1(Ndb* pNdb,
175 			     int records,
176 			     int parallelism)
177 {
178   return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
179                     records, 1);
180 }
181 
182 int
clearTable2(Ndb * pNdb,int records,int parallelism)183 UtilTransactions::clearTable2(Ndb* pNdb,
184 			      int records,
185 			      int parallelism)
186 {
187   return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
188                     records, parallelism);
189 }
190 
191 int
clearTable3(Ndb * pNdb,int records,int parallelism)192 UtilTransactions::clearTable3(Ndb* pNdb,
193 			      int records,
194 			      int parallelism)
195 {
196   return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
197                     records, parallelism);
198 }
199 
200 int
copyTableData(Ndb * pNdb,const char * destName)201 UtilTransactions::copyTableData(Ndb* pNdb,
202 			    const char* destName){
203   // Scan all records and copy
204   // them to destName table
205   int                  retryAttempt = 0;
206   const int            retryMax = 10;
207   int insertedRows = 0;
208   int parallelism = 240;
209   int check;
210   NdbScanOperation		*pOp;
211   NDBT_ResultRow       row(tab);
212 
213   while (true){
214 
215     if (retryAttempt >= retryMax){
216       g_info << "ERROR: has retried this operation " << retryAttempt
217 	     << " times, failing!" << endl;
218       return NDBT_FAILED;
219     }
220 
221 
222     pTrans = pNdb->startTransaction();
223     if (pTrans == NULL) {
224       const NdbError err = pNdb->getNdbError();
225 
226       if (err.status == NdbError::TemporaryError){
227 	ERR(err);
228 	NdbSleep_MilliSleep(50);
229 	retryAttempt++;
230 	continue;
231       }
232       ERR(err);
233       return NDBT_FAILED;
234     }
235 
236     pOp = pTrans->getNdbScanOperation(tab.getName());
237     if (pOp == NULL) {
238       ERR(pTrans->getNdbError());
239       closeTransaction(pNdb);
240       return NDBT_FAILED;
241     }
242 
243     if( pOp->readTuples(NdbScanOperation::LM_Read, parallelism) ) {
244       ERR(pTrans->getNdbError());
245       closeTransaction(pNdb);
246       return NDBT_FAILED;
247     }
248 
249     // Read all attributes
250     for (int a = 0; a < tab.getNoOfColumns(); a++){
251       if ((row.attributeStore(a) =
252 	   pOp->getValue(tab.getColumn(a)->getName())) == 0) {
253 	ERR(pTrans->getNdbError());
254 	closeTransaction(pNdb);
255 	return NDBT_FAILED;
256       }
257     }
258 
259     check = pTrans->execute(NoCommit, AbortOnError);
260     if( check == -1 ) {
261       ERR(pTrans->getNdbError());
262       closeTransaction(pNdb);
263       return NDBT_FAILED;
264     }
265 
266     int eof;
267     while((eof = pOp->nextResult(true)) == 0){
268       do {
269 	insertedRows++;
270 	if (addRowToInsert(pNdb, pTrans, row, destName) != 0){
271 	  closeTransaction(pNdb);
272 	  return NDBT_FAILED;
273 	}
274       } while((eof = pOp->nextResult(false)) == 0);
275 
276       check = pTrans->execute(Commit, AbortOnError);
277       pTrans->restart();
278       if( check == -1 ) {
279 	const NdbError err = pTrans->getNdbError();
280 	ERR(err);
281 	closeTransaction(pNdb);
282 	return NDBT_FAILED;
283       }
284     }
285     if (eof == -1) {
286       const NdbError err = pTrans->getNdbError();
287 
288       if (err.status == NdbError::TemporaryError){
289 	ERR(err);
290 	closeTransaction(pNdb);
291 	NdbSleep_MilliSleep(50);
292 	// If error = 488 there should be no limit on number of retry attempts
293 	if (err.code != 488)
294 	  retryAttempt++;
295 	continue;
296       }
297       ERR(err);
298       closeTransaction(pNdb);
299       return NDBT_FAILED;
300     }
301 
302     closeTransaction(pNdb);
303 
304     g_info << insertedRows << " rows copied" << endl;
305 
306     return NDBT_OK;
307   }
308   return NDBT_FAILED;
309 }
310 
311 int
addRowToInsert(Ndb * pNdb,NdbConnection * pInsTrans,NDBT_ResultRow & row,const char * insertTabName)312 UtilTransactions::addRowToInsert(Ndb* pNdb,
313 				 NdbConnection* pInsTrans,
314 				 NDBT_ResultRow & row,
315 				 const char *insertTabName){
316 
317   int check;
318   NdbOperation* pInsOp;
319 
320   pInsOp = pInsTrans->getNdbOperation(insertTabName);
321   if (pInsOp == NULL) {
322     ERR(pInsTrans->getNdbError());
323     return NDBT_FAILED;
324   }
325 
326   check = pInsOp->insertTuple();
327   if( check == -1 ) {
328     ERR(pInsTrans->getNdbError());
329     return NDBT_FAILED;
330   }
331 
332   // Set all attributes
333   for (int a = 0; a < tab.getNoOfColumns(); a++){
334     NdbRecAttr* r =  row.attributeStore(a);
335     int	 sz = r->get_size_in_bytes();
336     if (pInsOp->setValue(tab.getColumn(a)->getName(),
337 			 r->aRef(),
338 			 sz) != 0) {
339       ERR(pInsTrans->getNdbError());
340       return NDBT_FAILED;
341     }
342   }
343 
344   return NDBT_OK;
345 }
346 
347 
348 int
scanReadRecords(Ndb * pNdb,int parallelism,NdbOperation::LockMode lm,int records,int noAttribs,int * attrib_list,ReadCallBackFn * fn)349 UtilTransactions::scanReadRecords(Ndb* pNdb,
350 				  int parallelism,
351 				  NdbOperation::LockMode lm,
352 				  int records,
353 				  int noAttribs,
354 				  int *attrib_list,
355 				  ReadCallBackFn* fn){
356 
357   int                  retryAttempt = 0;
358   const int            retryMax = 100;
359   int                  check;
360   NdbScanOperation	       *pOp;
361   NDBT_ResultRow       row(tab);
362 
363   while (true){
364 
365     if (retryAttempt >= retryMax){
366       g_info << "ERROR: has retried this operation " << retryAttempt
367 	     << " times, failing!" << endl;
368       return NDBT_FAILED;
369     }
370 
371     pTrans = pNdb->startTransaction();
372     if (pTrans == NULL) {
373       const NdbError err = pNdb->getNdbError();
374 
375       if (err.status == NdbError::TemporaryError){
376 	ERR(err);
377 	NdbSleep_MilliSleep(50);
378 	retryAttempt++;
379 	continue;
380       }
381       ERR(err);
382       return NDBT_FAILED;
383     }
384 
385     pOp = getScanOperation(pTrans);
386     if (pOp == NULL) {
387       const NdbError err = pNdb->getNdbError();
388       closeTransaction(pNdb);
389 
390       if (err.status == NdbError::TemporaryError){
391 	ERR(err);
392 	NdbSleep_MilliSleep(50);
393 	retryAttempt++;
394 	continue;
395       }
396       ERR(err);
397       return NDBT_FAILED;
398     }
399 
400     if( pOp->readTuples(lm, 0, parallelism) ) {
401       ERR(pTrans->getNdbError());
402       closeTransaction(pNdb);
403       return NDBT_FAILED;
404     }
405 
406     // Call getValue for all the attributes supplied in attrib_list
407     // ************************************************
408     for (int a = 0; a < noAttribs; a++){
409       if (attrib_list[a] < tab.getNoOfColumns()){
410 	g_info << "getValue(" << attrib_list[a] << ")" << endl;
411 	if ((row.attributeStore(attrib_list[a]) =
412 	     pOp->getValue(tab.getColumn(attrib_list[a])->getName())) == 0) {
413 	  ERR(pTrans->getNdbError());
414 	  closeTransaction(pNdb);
415 	  return NDBT_FAILED;
416 	}
417       }
418     }
419     // *************************************************
420 
421     check = pTrans->execute(NoCommit, AbortOnError);
422     if( check == -1 ) {
423       const NdbError err = pTrans->getNdbError();
424 
425       if (err.status == NdbError::TemporaryError){
426 	ERR(err);
427 	closeTransaction(pNdb);
428 	NdbSleep_MilliSleep(50);
429 	retryAttempt++;
430 	continue;
431       }
432       ERR(err);
433       closeTransaction(pNdb);
434       return NDBT_FAILED;
435     }
436 
437     int eof;
438     int rows = 0;
439 
440 
441     while((eof = pOp->nextResult()) == 0){
442       rows++;
443 
444       // Call callback for each record returned
445       if(fn != NULL)
446 	fn(&row);
447     }
448     if (eof == -1) {
449       const NdbError err = pTrans->getNdbError();
450 
451       if (err.status == NdbError::TemporaryError){
452 	ERR(err);
453 	closeTransaction(pNdb);
454 	NdbSleep_MilliSleep(50);
455 	retryAttempt++;
456 	continue;
457       }
458       ERR(err);
459       closeTransaction(pNdb);
460       return NDBT_FAILED;
461     }
462 
463     closeTransaction(pNdb);
464     g_info << rows << " rows have been read" << endl;
465     if (records != 0 && rows != records){
466       g_info << "Check expected number of records failed" << endl
467 	     << "  expected=" << records <<", " << endl
468 	     << "  read=" << rows << endl;
469       return NDBT_FAILED;
470     }
471 
472     return NDBT_OK;
473   }
474   return NDBT_FAILED;
475 }
476 
477 int
selectCount(Ndb * pNdb,int parallelism,int * count_rows,NdbOperation::LockMode lm)478 UtilTransactions::selectCount(Ndb* pNdb,
479 			      int parallelism,
480 			      int* count_rows,
481 			      NdbOperation::LockMode lm)
482 {
483 
484   int                  retryAttempt = 0;
485   const int            retryMax = 100;
486   int                  check;
487 
488   while (true){
489 
490     if (retryAttempt >= retryMax){
491       g_info << "ERROR: has retried this operation " << retryAttempt
492 	     << " times, failing!" << endl;
493       return NDBT_FAILED;
494     }
495 
496     pTrans = pNdb->startTransaction();
497     if (pTrans == NULL)
498     {
499       if (pNdb->getNdbError().status == NdbError::TemporaryError)
500       {
501 	NdbSleep_MilliSleep(50);
502 	retryAttempt++;
503 	continue;
504       }
505       ERR(pNdb->getNdbError());
506       return NDBT_FAILED;
507     }
508 
509 
510     NdbScanOperation *pOp = getScanOperation(pTrans);
511     if (pOp == NULL)
512     {
513       NdbError err = pTrans->getNdbError();
514       closeTransaction(pNdb);
515       if (err.status == NdbError::TemporaryError)
516       {
517 	NdbSleep_MilliSleep(50);
518 	retryAttempt++;
519 	continue;
520       }
521       ERR(err);
522       return NDBT_FAILED;
523     }
524 
525     if( pOp->readTuples(lm) )
526     {
527       ERR(pTrans->getNdbError());
528       closeTransaction(pNdb);
529       return NDBT_FAILED;
530     }
531 
532     if(0){
533       NdbScanFilter sf(pOp);
534       sf.begin(NdbScanFilter::OR);
535       sf.eq(2, (Uint32)30);
536       sf.end();
537     }
538 
539     check = pTrans->execute(NoCommit, AbortOnError);
540     if( check == -1 )
541     {
542       NdbError err = pTrans->getNdbError();
543       closeTransaction(pNdb);
544       if (err.status == NdbError::TemporaryError)
545       {
546         NdbSleep_MilliSleep(50);
547         retryAttempt++;
548         continue;
549       }
550       ERR(err);
551       return NDBT_FAILED;
552     }
553 
554     int eof;
555     int rows = 0;
556 
557 
558     while((eof = pOp->nextResult()) == 0){
559       rows++;
560     }
561 
562     if (eof == -1)
563     {
564       const NdbError err = pTrans->getNdbError();
565       closeTransaction(pNdb);
566 
567       if (err.status == NdbError::TemporaryError)
568       {
569 	NdbSleep_MilliSleep(50);
570 	retryAttempt++;
571 	continue;
572       }
573       ERR(err);
574       closeTransaction(pNdb);
575       return NDBT_FAILED;
576     }
577 
578     closeTransaction(pNdb);
579 
580     if (count_rows != NULL){
581       *count_rows = rows;
582     }
583 
584     return NDBT_OK;
585   }
586   return NDBT_FAILED;
587 }
588 
589 int
verifyIndex(Ndb * pNdb,const char * indexName,int parallelism,bool transactional)590 UtilTransactions::verifyIndex(Ndb* pNdb,
591 			      const char* indexName,
592 			      int parallelism,
593 			      bool transactional){
594 
595 
596   const NdbDictionary::Index* pIndex
597     = pNdb->getDictionary()->getIndex(indexName, tab.getName());
598   if (pIndex == 0){
599     ndbout << " Index " << indexName << " does not exist!" << endl;
600     return NDBT_FAILED;
601   }
602 
603   switch (pIndex->getType()){
604   case NdbDictionary::Index::UniqueHashIndex:
605     return verifyUniqueIndex(pNdb, pIndex, parallelism, transactional);
606   case NdbDictionary::Index::OrderedIndex:
607     return verifyOrderedIndex(pNdb, pIndex, parallelism, transactional);
608     break;
609   default:
610     ndbout << "Unknown index type" << endl;
611     break;
612   }
613 
614   return NDBT_FAILED;
615 }
616 
617 int
verifyUniqueIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)618 UtilTransactions::verifyUniqueIndex(Ndb* pNdb,
619 				    const NdbDictionary::Index * pIndex,
620 				    int parallelism,
621 				    bool transactional){
622 
623   /**
624    * Scan all rows in TABLE and for each found row make one read in
625    * TABLE and one using INDEX_TABLE. Then compare the two returned
626    * rows. They should be equal!
627    *
628    */
629 
630   if (scanAndCompareUniqueIndex(pNdb,
631 				pIndex,
632 				parallelism,
633 				transactional) != NDBT_OK){
634     return NDBT_FAILED;
635   }
636 
637 
638   return NDBT_OK;
639 
640 }
641 
642 
643 int
scanAndCompareUniqueIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)644 UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
645 					    const NdbDictionary::Index* pIndex,
646 					    int parallelism,
647 					    bool transactional){
648 
649   int                  retryAttempt = 0;
650   const int            retryMax = 100;
651   int                  check;
652   NdbScanOperation       *pOp;
653   NDBT_ResultRow       row(tab);
654 
655   parallelism = 1;
656 
657   while (true){
658 restart:
659     if (retryAttempt >= retryMax){
660       g_info << "ERROR: has retried this operation " << retryAttempt
661 	     << " times, failing!" << endl;
662       return NDBT_FAILED;
663     }
664 
665     pTrans = pNdb->startTransaction();
666     if (pTrans == NULL) {
667       const NdbError err = pNdb->getNdbError();
668 
669       if (err.status == NdbError::TemporaryError){
670 	ERR(err);
671 	NdbSleep_MilliSleep(50);
672 	retryAttempt++;
673 	continue;
674       }
675       ERR(err);
676       return NDBT_FAILED;
677     }
678 
679     pOp = pTrans->getNdbScanOperation(tab.getName());
680     if (pOp == NULL) {
681       const NdbError err = pNdb->getNdbError();
682       closeTransaction(pNdb);
683       ERR(err);
684 
685       if (err.status == NdbError::TemporaryError){
686 	NdbSleep_MilliSleep(50);
687 	retryAttempt++;
688 	continue;
689       }
690       return NDBT_FAILED;
691     }
692 
693     int rs;
694     if(transactional){
695       rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
696     } else {
697       rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallelism);
698     }
699 
700     if( rs != 0 ) {
701       ERR(pTrans->getNdbError());
702       closeTransaction(pNdb);
703       return NDBT_FAILED;
704     }
705 
706     // Read all attributes
707     for (int a = 0; a < tab.getNoOfColumns(); a++){
708       if ((row.attributeStore(a) =
709 	   pOp->getValue(tab.getColumn(a)->getName())) == 0) {
710 	ERR(pTrans->getNdbError());
711 	closeTransaction(pNdb);
712 	return NDBT_FAILED;
713       }
714     }
715 
716     check = pTrans->execute(NoCommit, AbortOnError);
717     if( check == -1 ) {
718       const NdbError err = pTrans->getNdbError();
719 
720       if (err.status == NdbError::TemporaryError){
721 	ERR(err);
722 	closeTransaction(pNdb);
723 	NdbSleep_MilliSleep(50);
724 	retryAttempt++;
725 	continue;
726       }
727       ERR(err);
728       closeTransaction(pNdb);
729       return NDBT_FAILED;
730     }
731 
732     int eof;
733     int rows = 0;
734 
735 
736     while((eof = pOp->nextResult()) == 0){
737       rows++;
738 
739       // ndbout << row.c_str().c_str() << endl;
740 
741       if (readRowFromTableAndIndex(pNdb,
742 				   pTrans,
743 				   pIndex,
744 				   row) != NDBT_OK){
745 
746 	while((eof= pOp->nextResult(false)) == 0);
747 	if(eof == 2)
748 	  eof = pOp->nextResult(true); // this should give -1
749 	if(eof == -1)
750 	{
751 	  const NdbError err = pTrans->getNdbError();
752 
753 	  if (err.status == NdbError::TemporaryError){
754 	    ERR(err);
755 	    closeTransaction(pNdb);
756 	    NdbSleep_MilliSleep(50);
757 	    retryAttempt++;
758 	    goto restart;
759 	  }
760 	}
761 	closeTransaction(pNdb);
762 	return NDBT_FAILED;
763       }
764     }
765     if (eof == -1) {
766       const NdbError err = pTrans->getNdbError();
767 
768       if (err.status == NdbError::TemporaryError){
769 	ERR(err);
770 	closeTransaction(pNdb);
771 	NdbSleep_MilliSleep(50);
772 	retryAttempt++;
773 	continue;
774       }
775       ERR(err);
776       closeTransaction(pNdb);
777       return NDBT_FAILED;
778     }
779 
780     closeTransaction(pNdb);
781 
782     return NDBT_OK;
783   }
784   return NDBT_FAILED;
785 }
786 int
readRowFromTableAndIndex(Ndb * pNdb,NdbConnection * scanTrans,const NdbDictionary::Index * pIndex,NDBT_ResultRow & row)787 UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
788 					   NdbConnection* scanTrans,
789 					   const NdbDictionary::Index* pIndex,
790 					   NDBT_ResultRow& row ){
791 
792 
793   NdbDictionary::Index::Type indexType= pIndex->getType();
794   int                  retryAttempt = 0;
795   const int            retryMax = 100;
796   int                  check, a;
797   NdbConnection	       *pTrans1=NULL;
798   NdbOperation	       *pOp;
799 
800   int return_code= NDBT_FAILED;
801 
802   // Allocate place to store the result
803   NDBT_ResultRow       tabRow(tab);
804   NDBT_ResultRow       indexRow(tab);
805   const char * indexName = pIndex->getName();
806 
807   while (true){
808     if(retryAttempt)
809       ndbout_c("retryAttempt %d", retryAttempt);
810     if (retryAttempt >= retryMax){
811       g_info << "ERROR: has retried this operation " << retryAttempt
812 	     << " times, failing!" << endl;
813 	goto close_all;
814     }
815 
816     pTrans1 = pNdb->hupp(scanTrans); //startTransaction();
817     if (pTrans1 == NULL) {
818       const NdbError err = pNdb->getNdbError();
819 
820       if (err.code == 4006)
821         goto close_all;
822 
823       if (err.status == NdbError::TemporaryError){
824 	ERR(err);
825 	NdbSleep_MilliSleep(50);
826 	retryAttempt++;
827 	continue;
828       }
829 
830       if(err.code == 0){
831 	return_code = NDBT_OK;
832 	goto close_all;
833       }
834       ERR(err);
835       goto close_all;
836     }
837 
838     /**
839      * Read the record from TABLE
840      */
841     pOp = pTrans1->getNdbOperation(tab.getName());
842     if (pOp == NULL) {
843       ERR(pTrans1->getNdbError());
844       goto close_all;
845     }
846 
847     check = pOp->readTuple();
848     if( check == -1 ) {
849       ERR(pTrans1->getNdbError());
850       goto close_all;
851     }
852 
853     // Define primary keys
854 #if VERBOSE
855     printf("PK: ");
856 #endif
857     for(a = 0; a<tab.getNoOfColumns(); a++){
858       const NdbDictionary::Column* attr = tab.getColumn(a);
859       if (attr->getPrimaryKey() == true){
860 	if (pOp->equal(attr->getName(), row.attributeStore(a)->aRef()) != 0){
861 	  ERR(pTrans1->getNdbError());
862 	  goto close_all;
863 	}
864 #if VERBOSE
865 	printf("%s = %d: ", attr->getName(), row.attributeStore(a)->aRef());
866 #endif
867       }
868     }
869 #if VERBOSE
870     printf("\n");
871 #endif
872     // Read all attributes
873 #if VERBOSE
874     printf("Reading %u attributes: ", tab.getNoOfColumns());
875 #endif
876     for(a = 0; a<tab.getNoOfColumns(); a++){
877       if((tabRow.attributeStore(a) =
878 	  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
879 	ERR(pTrans1->getNdbError());
880 	goto close_all;
881       }
882 #if VERBOSE
883       printf("%s ", tab.getColumn(a)->getName());
884 #endif
885     }
886 #if VERBOSE
887     printf("\n");
888 #endif
889 
890     /**
891      * Read the record from INDEX_TABLE
892      */
893     NdbIndexOperation* pIndexOp= NULL;
894     NdbIndexScanOperation *pScanOp= NULL;
895     NdbOperation *pIOp= 0;
896 
897     bool null_found= false;
898     for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
899       const NdbDictionary::Column *  col = pIndex->getColumn(a);
900 
901       if (row.attributeStore(col->getName())->isNULL())
902       {
903 	null_found= true;
904 	break;
905       }
906     }
907 
908     const char * tabName= tab.getName();
909     if(!null_found)
910     {
911       if (indexType == NdbDictionary::Index::UniqueHashIndex) {
912 	pIOp= pIndexOp= pTrans1->getNdbIndexOperation(indexName, tabName);
913       } else {
914 	pIOp= pScanOp= pTrans1->getNdbIndexScanOperation(indexName, tabName);
915       }
916 
917       if (pIOp == NULL) {
918 	ERR(pTrans1->getNdbError());
919 	goto close_all;
920       }
921 
922       {
923 	bool not_ok;
924 	if (pIndexOp) {
925 	  not_ok = pIndexOp->readTuple() == -1;
926 	} else {
927 	  not_ok = pScanOp->readTuples();
928 	}
929 
930 	if( not_ok ) {
931 	  ERR(pTrans1->getNdbError());
932 	  goto close_all;
933 	}
934       }
935 
936     // Define primary keys for index
937 #if VERBOSE
938       printf("SI: ");
939 #endif
940       for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
941 	const NdbDictionary::Column *  col = pIndex->getColumn(a);
942 
943 	if ( !row.attributeStore(col->getName())->isNULL() ) {
944 	  if(pIOp->equal(col->getName(),
945 			 row.attributeStore(col->getName())->aRef()) != 0){
946 	    ERR(pTrans1->getNdbError());
947 	    goto close_all;
948 	  }
949 	}
950 #if VERBOSE
951 	printf("%s = %d: ", col->getName(), row.attributeStore(a)->aRef());
952 #endif
953       }
954 #if VERBOSE
955       printf("\n");
956 #endif
957 
958       // Read all attributes
959 #if VERBOSE
960       printf("Reading %u attributes: ", tab.getNoOfColumns());
961 #endif
962       for(a = 0; a<tab.getNoOfColumns(); a++){
963 	void* pCheck;
964 
965 	pCheck= indexRow.attributeStore(a)=
966 	  pIOp->getValue(tab.getColumn(a)->getName());
967 
968 	if(pCheck == NULL) {
969 	  ERR(pTrans1->getNdbError());
970 	  goto close_all;
971 	}
972 #if VERBOSE
973 	printf("%s ", tab.getColumn(a)->getName());
974 #endif
975       }
976     }
977 #if VERBOSE
978     printf("\n");
979 #endif
980     scanTrans->refresh();
981     check = pTrans1->execute(Commit, AbortOnError);
982     if( check == -1 ) {
983       const NdbError err = pTrans1->getNdbError();
984 
985       if (err.status == NdbError::TemporaryError){
986 	ERR(err);
987 	pNdb->closeTransaction(pTrans1);
988 	NdbSleep_MilliSleep(50);
989 	retryAttempt++;
990 	continue;
991       }
992       ndbout << "Error when comparing records - normal op" << endl;
993       ERR(err);
994       ndbout << "row: " << row.c_str().c_str() << endl;
995       goto close_all;
996     }
997 
998     /**
999      * Compare the two rows
1000      */
1001     if(!null_found){
1002       if (pScanOp) {
1003 	if (pScanOp->nextResult() != 0){
1004 	  const NdbError err = pTrans1->getNdbError();
1005 	  ERR(err);
1006 	  ndbout << "Error when comparing records - index op next_result missing" << endl;
1007 	  ndbout << "row: " << row.c_str().c_str() << endl;
1008 	  goto close_all;
1009 	}
1010       }
1011       if (!(tabRow.c_str() == indexRow.c_str())){
1012 	ndbout << "Error when comapring records" << endl;
1013 	ndbout << " tabRow: \n" << tabRow.c_str().c_str() << endl;
1014 	ndbout << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1015 	goto close_all;
1016       }
1017       if (pScanOp) {
1018 	if (pScanOp->nextResult() == 0){
1019 	  ndbout << "Error when comparing records - index op next_result to many" << endl;
1020 	  ndbout << "row: " << row.c_str().c_str() << endl;
1021 	  goto close_all;
1022 	}
1023       }
1024     }
1025     return_code= NDBT_OK;
1026     goto close_all;
1027   }
1028 
1029 close_all:
1030   if (pTrans1)
1031     pNdb->closeTransaction(pTrans1);
1032 
1033   return return_code;
1034 }
1035 
1036 int
verifyOrderedIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)1037 UtilTransactions::verifyOrderedIndex(Ndb* pNdb,
1038 				     const NdbDictionary::Index* pIndex,
1039 				     int parallelism,
1040 				     bool transactional){
1041 
1042   int                  retryAttempt = 0;
1043   const int            retryMax = 100;
1044   int                  check;
1045   NdbScanOperation     *pOp;
1046   NdbIndexScanOperation * iop = 0;
1047 
1048   NDBT_ResultRow       scanRow(tab);
1049   NDBT_ResultRow       pkRow(tab);
1050   NDBT_ResultRow       indexRow(tab);
1051   const char * indexName = pIndex->getName();
1052 
1053   int res;
1054   parallelism = 1;
1055 
1056   while (true){
1057 
1058     if (retryAttempt >= retryMax){
1059       g_info << "ERROR: has retried this operation " << retryAttempt
1060 	     << " times, failing!" << endl;
1061       return NDBT_FAILED;
1062     }
1063 
1064     pTrans = pNdb->startTransaction();
1065     if (pTrans == NULL) {
1066       const NdbError err = pNdb->getNdbError();
1067 
1068       if (err.status == NdbError::TemporaryError){
1069 	ERR(err);
1070 	NdbSleep_MilliSleep(50);
1071 	retryAttempt++;
1072 	continue;
1073       }
1074       ERR(err);
1075       return NDBT_FAILED;
1076     }
1077 
1078     pOp = pTrans->getNdbScanOperation(tab.getName());
1079     if (pOp == NULL) {
1080       ERR(pTrans->getNdbError());
1081       closeTransaction(pNdb);
1082       return NDBT_FAILED;
1083     }
1084 
1085     if( pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism) ) {
1086       ERR(pTrans->getNdbError());
1087       closeTransaction(pNdb);
1088       return NDBT_FAILED;
1089     }
1090 
1091     if(get_values(pOp, scanRow))
1092     {
1093       abort();
1094     }
1095 
1096     check = pTrans->execute(NoCommit, AbortOnError);
1097     if( check == -1 ) {
1098       const NdbError err = pTrans->getNdbError();
1099 
1100       if (err.status == NdbError::TemporaryError){
1101 	ERR(err);
1102 	closeTransaction(pNdb);
1103 	NdbSleep_MilliSleep(50);
1104 	retryAttempt++;
1105 	continue;
1106       }
1107       ERR(err);
1108       closeTransaction(pNdb);
1109       return NDBT_FAILED;
1110     }
1111 
1112     int eof;
1113     int rows = 0;
1114     while(check == 0 && (eof = pOp->nextResult()) == 0){
1115       rows++;
1116 
1117       bool null_found= false;
1118       for(int a = 0; a<(int)pIndex->getNoOfColumns(); a++){
1119 	const NdbDictionary::Column *  col = pIndex->getColumn(a);
1120 	if (scanRow.attributeStore(col->getName())->isNULL())
1121 	{
1122 	  null_found= true;
1123 	  break;
1124 	}
1125       }
1126 
1127       // Do pk lookup
1128       NdbOperation * pk = pTrans->getNdbOperation(tab.getName());
1129       if(!pk || pk->readTuple())
1130 	goto error;
1131       if(equal(&tab, pk, scanRow) || get_values(pk, pkRow))
1132 	goto error;
1133 
1134       if(!null_found)
1135       {
1136 	if((iop= pTrans->getNdbIndexScanOperation(indexName,
1137                                                   tab.getName())) != 0)
1138 	{
1139 	  if(iop->readTuples(NdbScanOperation::LM_CommittedRead,
1140 			     parallelism))
1141 	    goto error;
1142 	  if(get_values(iop, indexRow))
1143 	    goto error;
1144           if(equal(pIndex, iop, scanRow))
1145             goto error;
1146 	}
1147 	else
1148         {
1149           goto error;
1150         }
1151       }
1152 
1153       check = pTrans->execute(NoCommit, AbortOnError);
1154       if(check)
1155 	goto error;
1156 
1157       if(scanRow.c_str() != pkRow.c_str()){
1158 	g_err << "Error when comapring records" << endl;
1159 	g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
1160 	g_err << " pkRow: \n" << pkRow.c_str().c_str() << endl;
1161 	closeTransaction(pNdb);
1162 	return NDBT_FAILED;
1163       }
1164 
1165       if(!null_found)
1166       {
1167 
1168 	if((res= iop->nextResult()) != 0){
1169 	  g_err << "Failed to find row using index: " << res << endl;
1170 	  ERR(pTrans->getNdbError());
1171 	  closeTransaction(pNdb);
1172 	  return NDBT_FAILED;
1173 	}
1174 
1175 	if(scanRow.c_str() != indexRow.c_str()){
1176 	  g_err << "Error when comapring records" << endl;
1177 	  g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
1178 	  g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1179 	  closeTransaction(pNdb);
1180 	  return NDBT_FAILED;
1181 	}
1182 
1183 	if(iop->nextResult() == 0){
1184 	  g_err << "Found extra row!!" << endl;
1185 	  g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1186 	  closeTransaction(pNdb);
1187 	  return NDBT_FAILED;
1188 	}
1189       }
1190     }
1191 
1192     if (eof == -1 || check == -1) {
1193   error:
1194       const NdbError err = pTrans->getNdbError();
1195 
1196       if (err.status == NdbError::TemporaryError){
1197 	ERR(err);
1198 	iop = 0;
1199 	closeTransaction(pNdb);
1200 	NdbSleep_MilliSleep(50);
1201 	retryAttempt++;
1202 	rows--;
1203 	continue;
1204       }
1205       ERR(err);
1206       closeTransaction(pNdb);
1207       return NDBT_FAILED;
1208     }
1209 
1210     closeTransaction(pNdb);
1211 
1212     return NDBT_OK;
1213   }
1214   return NDBT_FAILED;
1215 }
1216 
1217 int
get_values(NdbOperation * op,NDBT_ResultRow & dst)1218 UtilTransactions::get_values(NdbOperation* op, NDBT_ResultRow& dst)
1219 {
1220   for (int a = 0; a < tab.getNoOfColumns(); a++){
1221     NdbRecAttr*& ref= dst.attributeStore(a);
1222     if ((ref= op->getValue(a)) == 0)
1223     {
1224       return NDBT_FAILED;
1225     }
1226   }
1227   return 0;
1228 }
1229 
1230 int
equal(const NdbDictionary::Index * pIndex,NdbOperation * op,const NDBT_ResultRow & src)1231 UtilTransactions::equal(const NdbDictionary::Index* pIndex,
1232 			NdbOperation* op, const NDBT_ResultRow& src)
1233 {
1234   for(Uint32 a = 0; a<pIndex->getNoOfColumns(); a++){
1235     const NdbDictionary::Column *  col = pIndex->getColumn(a);
1236     if(op->equal(col->getName(),
1237 		 src.attributeStore(col->getName())->aRef()) != 0){
1238       return NDBT_FAILED;
1239     }
1240   }
1241   return 0;
1242 }
1243 
1244 int
equal(const NdbDictionary::Table * pTable,NdbOperation * op,const NDBT_ResultRow & src)1245 UtilTransactions::equal(const NdbDictionary::Table* pTable,
1246 			NdbOperation* op, const NDBT_ResultRow& src)
1247 {
1248   for(Uint32 a = 0; (int)a<tab.getNoOfColumns(); a++){
1249     const NdbDictionary::Column* attr = tab.getColumn(a);
1250     if (attr->getPrimaryKey() == true){
1251       if (op->equal(attr->getName(), src.attributeStore(a)->aRef()) != 0){
1252 	return NDBT_FAILED;
1253       }
1254     }
1255   }
1256   return 0;
1257 }
1258 
1259 NdbScanOperation*
getScanOperation(NdbConnection * pTrans)1260 UtilTransactions::getScanOperation(NdbConnection* pTrans)
1261 {
1262   return (NdbScanOperation*)
1263     getOperation(pTrans, NdbOperation::OpenScanRequest);
1264 }
1265 
1266 NdbOperation*
getOperation(NdbConnection * pTrans,NdbOperation::OperationType type)1267 UtilTransactions::getOperation(NdbConnection* pTrans,
1268 			       NdbOperation::OperationType type)
1269 {
1270   switch(type){
1271   case NdbOperation::ReadRequest:
1272   case NdbOperation::ReadExclusive:
1273     if(idx)
1274     {
1275       switch(idx->getType()){
1276       case NdbDictionary::Index::UniqueHashIndex:
1277 	return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
1278       case NdbDictionary::Index::OrderedIndex:
1279 	return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1280       default:
1281         abort();
1282       }
1283     }
1284   case NdbOperation::InsertRequest:
1285   case NdbOperation::WriteRequest:
1286     return pTrans->getNdbOperation(tab.getName());
1287   case NdbOperation::UpdateRequest:
1288   case NdbOperation::DeleteRequest:
1289     if(idx)
1290     {
1291       switch(idx->getType()){
1292       case NdbDictionary::Index::UniqueHashIndex:
1293 	return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
1294       default:
1295         break;
1296       }
1297     }
1298     return pTrans->getNdbOperation(tab.getName());
1299   case NdbOperation::OpenScanRequest:
1300     if(idx)
1301     {
1302       switch(idx->getType()){
1303       case NdbDictionary::Index::OrderedIndex:
1304 	return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1305       default:
1306         break;
1307       }
1308     }
1309     return pTrans->getNdbScanOperation(tab.getName());
1310   case NdbOperation::OpenRangeScanRequest:
1311     if(idx)
1312     {
1313       switch(idx->getType()){
1314       case NdbDictionary::Index::OrderedIndex:
1315 	return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1316       default:
1317         break;
1318       }
1319     }
1320     return 0;
1321   default:
1322     abort();
1323   }
1324   return 0;
1325 }
1326 
1327 #include <HugoOperations.hpp>
1328 
1329 int
closeTransaction(Ndb * pNdb)1330 UtilTransactions::closeTransaction(Ndb* pNdb)
1331 {
1332   if (pTrans != NULL){
1333     pNdb->closeTransaction(pTrans);
1334     pTrans = NULL;
1335   }
1336   return 0;
1337 }
1338 
1339 int
compare(Ndb * pNdb,const char * tab_name2,int flags)1340 UtilTransactions::compare(Ndb* pNdb, const char* tab_name2, int flags){
1341 
1342 
1343   NdbError err;
1344   int return_code= 0, row_count= 0;
1345   int retryAttempt = 0, retryMax = 10;
1346 
1347   HugoCalculator calc(tab);
1348   NDBT_ResultRow row(tab);
1349   const NdbDictionary::Table* tmp= pNdb->getDictionary()->getTable(tab_name2);
1350   if(tmp == 0)
1351   {
1352     g_err << "Unable to lookup table: " << tab_name2
1353 	  << endl << pNdb->getDictionary()->getNdbError() << endl;
1354     return -1;
1355   }
1356   const NdbDictionary::Table& tab2= *tmp;
1357 
1358   HugoOperations cmp(tab2);
1359   UtilTransactions count(tab2);
1360 
1361   while (true){
1362 loop:
1363     if (retryAttempt++ >= retryMax){
1364       g_err << "ERROR: compare has retried this operation " << retryAttempt
1365 	     << " times, failing!" << endl;
1366       return -1;
1367     }
1368 
1369     NdbScanOperation *pOp= 0;
1370     pTrans = pNdb->startTransaction();
1371     if (pTrans == NULL) {
1372       err = pNdb->getNdbError();
1373       goto error;
1374     }
1375 
1376     pOp= pTrans->getNdbScanOperation(tab.getName());
1377     if (pOp == NULL) {
1378       ERR(err= pTrans->getNdbError());
1379       goto error;
1380     }
1381 
1382     if( pOp->readTuples(NdbScanOperation::LM_Read) ) {
1383       ERR(err= pTrans->getNdbError());
1384       goto error;
1385     }
1386 
1387     // Read all attributes
1388     {
1389       for (int a = 0; a < tab.getNoOfColumns(); a++){
1390 	if ((row.attributeStore(a) =
1391 	     pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1392 	  ERR(err= pTrans->getNdbError());
1393 	  goto error;
1394 	}
1395       }
1396     }
1397 
1398     if( pTrans->execute(NoCommit, AbortOnError) == -1 ) {
1399       ERR(err= pTrans->getNdbError());
1400       goto error;
1401     }
1402 
1403     row_count= 0;
1404     {
1405       int eof;
1406       while((eof = pOp->nextResult(true)) == 0)
1407       {
1408 	do {
1409 	  row_count++;
1410 	  if(cmp.startTransaction(pNdb) != NDBT_OK)
1411 	  {
1412 	    ERR(err= pNdb->getNdbError());
1413 	    goto error;
1414 	  }
1415 	  int rowNo= calc.getIdValue(&row);
1416 	  if(cmp.pkReadRecord(pNdb, rowNo, 1) != NDBT_OK)
1417 	  {
1418 	    ERR(err= cmp.getTransaction()->getNdbError());
1419 	    goto error;
1420 	  }
1421 	  if(cmp.execute_Commit(pNdb) != NDBT_OK ||
1422 	     cmp.getTransaction()->getNdbError().code)
1423 	  {
1424 	    ERR(err= cmp.getTransaction()->getNdbError());
1425 	    goto error;
1426 	  }
1427 	  if(row != cmp.get_row(0))
1428 	  {
1429 	    g_err << "COMPARE FAILED" << endl;
1430 	    g_err << row << endl;
1431 	    g_err << cmp.get_row(0) << endl;
1432 	    return_code++;
1433 	  }
1434 	  retryAttempt= 0;
1435 	  cmp.closeTransaction(pNdb);
1436 	} while((eof = pOp->nextResult(false)) == 0);
1437       }
1438       if (eof == -1)
1439       {
1440 	err = pTrans->getNdbError();
1441 	goto error;
1442       }
1443     }
1444 
1445     closeTransaction(pNdb);
1446 
1447     g_info << row_count << " rows compared" << endl;
1448     {
1449       int row_count2;
1450       if(count.selectCount(pNdb, 0, &row_count2) != NDBT_OK)
1451       {
1452 	g_err << "Failed to count rows in tab_name2" << endl;
1453 	return -1;
1454       }
1455 
1456       g_info << row_count2 << " rows in tab_name2 - failed " << return_code
1457 	     << endl;
1458       return (row_count == row_count2 ? return_code : 1);
1459     }
1460 error:
1461     if(err.status == NdbError::TemporaryError)
1462     {
1463       g_err << err << endl;
1464       NdbSleep_MilliSleep(50);
1465       closeTransaction(pNdb);
1466       if(cmp.getTransaction())
1467 	cmp.closeTransaction(pNdb);
1468 
1469       goto loop;
1470     }
1471     g_err << "ERROR" << endl;
1472     g_err << err << endl;
1473 
1474     break;
1475   }
1476 
1477   closeTransaction(pNdb);
1478 
1479   return return_code;
1480 }
1481 
1482