1 /*
2 Copyright (C) 2003-2008 MySQL AB, 2008, 2009 Sun Microsystems, Inc.
3 All rights reserved. Use is subject to license terms.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 */
25
26 #include "UtilTransactions.hpp"
27 #include <NdbSleep.h>
28 #include <NdbScanFilter.hpp>
29
30 #define VERBOSE 0
31
UtilTransactions(const NdbDictionary::Table & _tab,const NdbDictionary::Index * _idx)32 UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab,
33 const NdbDictionary::Index* _idx):
34 tab(_tab), idx(_idx), pTrans(0)
35 {
36 m_defaultClearMethod = 3;
37 }
38
UtilTransactions(Ndb * ndb,const char * name,const char * index)39 UtilTransactions::UtilTransactions(Ndb* ndb,
40 const char * name,
41 const char * index) :
42 tab(* ndb->getDictionary()->getTable(name)),
43 idx(index ? ndb->getDictionary()->getIndex(index, name) : 0),
44 pTrans(0)
45 {
46 m_defaultClearMethod = 3;
47 }
48
49 #define RESTART_SCAN 99
50
51 #define RETURN_FAIL(err) return (err.code != 0 ? err.code : NDBT_FAILED)
52
53 int
clearTable(Ndb * pNdb,NdbScanOperation::ScanFlag flags,int records,int parallelism)54 UtilTransactions::clearTable(Ndb* pNdb,
55 NdbScanOperation::ScanFlag flags,
56 int records,
57 int parallelism){
58 // Scan all records exclusive and delete
59 // them one by one
60 int retryAttempt = 0;
61 const int retryMax = 10;
62 int deletedRows = 0;
63 int check;
64 NdbScanOperation *pOp;
65 NdbError err;
66
67 int par = parallelism;
68 while (true){
69 restart:
70 if (retryAttempt++ >= retryMax){
71 g_info << "ERROR: has retried this operation " << retryAttempt
72 << " times, failing!" << endl;
73 return NDBT_FAILED;
74 }
75
76 pTrans = pNdb->startTransaction();
77 if (pTrans == NULL) {
78 err = pNdb->getNdbError();
79 if (err.status == NdbError::TemporaryError){
80 ERR(err);
81 NdbSleep_MilliSleep(50);
82 continue;
83 }
84 goto failed;
85 }
86
87 pOp = getScanOperation(pTrans);
88 if (pOp == NULL) {
89 err = pTrans->getNdbError();
90 if(err.status == NdbError::TemporaryError){
91 ERR(err);
92 closeTransaction(pNdb);
93 NdbSleep_MilliSleep(50);
94 par = 1;
95 goto restart;
96 }
97 goto failed;
98 }
99
100 if( pOp->readTuples(NdbOperation::LM_Exclusive, flags, par) ) {
101 err = pTrans->getNdbError();
102 goto failed;
103 }
104
105 if(pTrans->execute(NoCommit, AbortOnError) != 0){
106 err = pTrans->getNdbError();
107 if(err.status == NdbError::TemporaryError){
108 ERR(err);
109 closeTransaction(pNdb);
110 NdbSleep_MilliSleep(50);
111 continue;
112 }
113 goto failed;
114 }
115
116 while((check = pOp->nextResult(true)) == 0){
117 do {
118 if (pOp->deleteCurrentTuple() != 0){
119 goto failed;
120 }
121 deletedRows++;
122 } while((check = pOp->nextResult(false)) == 0);
123
124 if(check != -1){
125 check = pTrans->execute(Commit, AbortOnError);
126 pTrans->restart();
127 }
128
129 err = pTrans->getNdbError();
130 if(check == -1){
131 if(err.status == NdbError::TemporaryError){
132 ERR(err);
133 closeTransaction(pNdb);
134 NdbSleep_MilliSleep(50);
135 par = 1;
136 goto restart;
137 }
138 goto failed;
139 }
140 }
141 if(check == -1){
142 err = pTrans->getNdbError();
143 if(err.status == NdbError::TemporaryError){
144 ERR(err);
145 closeTransaction(pNdb);
146 NdbSleep_MilliSleep(50);
147 par = 1;
148 goto restart;
149 }
150 goto failed;
151 }
152 closeTransaction(pNdb);
153 return NDBT_OK;
154 }
155 return NDBT_FAILED;
156
157 failed:
158 if(pTrans != 0) closeTransaction(pNdb);
159 ERR(err);
160 return (err.code != 0 ? err.code : NDBT_FAILED);
161 }
162
163 int
clearTable(Ndb * pNdb,int records,int parallelism)164 UtilTransactions::clearTable(Ndb* pNdb,
165 int records,
166 int parallelism){
167
168 return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
169 records, parallelism);
170 }
171
172
173 int
clearTable1(Ndb * pNdb,int records,int parallelism)174 UtilTransactions::clearTable1(Ndb* pNdb,
175 int records,
176 int parallelism)
177 {
178 return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
179 records, 1);
180 }
181
182 int
clearTable2(Ndb * pNdb,int records,int parallelism)183 UtilTransactions::clearTable2(Ndb* pNdb,
184 int records,
185 int parallelism)
186 {
187 return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
188 records, parallelism);
189 }
190
191 int
clearTable3(Ndb * pNdb,int records,int parallelism)192 UtilTransactions::clearTable3(Ndb* pNdb,
193 int records,
194 int parallelism)
195 {
196 return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
197 records, parallelism);
198 }
199
200 int
copyTableData(Ndb * pNdb,const char * destName)201 UtilTransactions::copyTableData(Ndb* pNdb,
202 const char* destName){
203 // Scan all records and copy
204 // them to destName table
205 int retryAttempt = 0;
206 const int retryMax = 10;
207 int insertedRows = 0;
208 int parallelism = 240;
209 int check;
210 NdbScanOperation *pOp;
211 NDBT_ResultRow row(tab);
212
213 while (true){
214
215 if (retryAttempt >= retryMax){
216 g_info << "ERROR: has retried this operation " << retryAttempt
217 << " times, failing!" << endl;
218 return NDBT_FAILED;
219 }
220
221
222 pTrans = pNdb->startTransaction();
223 if (pTrans == NULL) {
224 const NdbError err = pNdb->getNdbError();
225
226 if (err.status == NdbError::TemporaryError){
227 ERR(err);
228 NdbSleep_MilliSleep(50);
229 retryAttempt++;
230 continue;
231 }
232 ERR(err);
233 return NDBT_FAILED;
234 }
235
236 pOp = pTrans->getNdbScanOperation(tab.getName());
237 if (pOp == NULL) {
238 ERR(pTrans->getNdbError());
239 closeTransaction(pNdb);
240 return NDBT_FAILED;
241 }
242
243 if( pOp->readTuples(NdbScanOperation::LM_Read, parallelism) ) {
244 ERR(pTrans->getNdbError());
245 closeTransaction(pNdb);
246 return NDBT_FAILED;
247 }
248
249 // Read all attributes
250 for (int a = 0; a < tab.getNoOfColumns(); a++){
251 if ((row.attributeStore(a) =
252 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
253 ERR(pTrans->getNdbError());
254 closeTransaction(pNdb);
255 return NDBT_FAILED;
256 }
257 }
258
259 check = pTrans->execute(NoCommit, AbortOnError);
260 if( check == -1 ) {
261 ERR(pTrans->getNdbError());
262 closeTransaction(pNdb);
263 return NDBT_FAILED;
264 }
265
266 int eof;
267 while((eof = pOp->nextResult(true)) == 0){
268 do {
269 insertedRows++;
270 if (addRowToInsert(pNdb, pTrans, row, destName) != 0){
271 closeTransaction(pNdb);
272 return NDBT_FAILED;
273 }
274 } while((eof = pOp->nextResult(false)) == 0);
275
276 check = pTrans->execute(Commit, AbortOnError);
277 pTrans->restart();
278 if( check == -1 ) {
279 const NdbError err = pTrans->getNdbError();
280 ERR(err);
281 closeTransaction(pNdb);
282 return NDBT_FAILED;
283 }
284 }
285 if (eof == -1) {
286 const NdbError err = pTrans->getNdbError();
287
288 if (err.status == NdbError::TemporaryError){
289 ERR(err);
290 closeTransaction(pNdb);
291 NdbSleep_MilliSleep(50);
292 // If error = 488 there should be no limit on number of retry attempts
293 if (err.code != 488)
294 retryAttempt++;
295 continue;
296 }
297 ERR(err);
298 closeTransaction(pNdb);
299 return NDBT_FAILED;
300 }
301
302 closeTransaction(pNdb);
303
304 g_info << insertedRows << " rows copied" << endl;
305
306 return NDBT_OK;
307 }
308 return NDBT_FAILED;
309 }
310
311 int
addRowToInsert(Ndb * pNdb,NdbConnection * pInsTrans,NDBT_ResultRow & row,const char * insertTabName)312 UtilTransactions::addRowToInsert(Ndb* pNdb,
313 NdbConnection* pInsTrans,
314 NDBT_ResultRow & row,
315 const char *insertTabName){
316
317 int check;
318 NdbOperation* pInsOp;
319
320 pInsOp = pInsTrans->getNdbOperation(insertTabName);
321 if (pInsOp == NULL) {
322 ERR(pInsTrans->getNdbError());
323 return NDBT_FAILED;
324 }
325
326 check = pInsOp->insertTuple();
327 if( check == -1 ) {
328 ERR(pInsTrans->getNdbError());
329 return NDBT_FAILED;
330 }
331
332 // Set all attributes
333 for (int a = 0; a < tab.getNoOfColumns(); a++){
334 NdbRecAttr* r = row.attributeStore(a);
335 int sz = r->get_size_in_bytes();
336 if (pInsOp->setValue(tab.getColumn(a)->getName(),
337 r->aRef(),
338 sz) != 0) {
339 ERR(pInsTrans->getNdbError());
340 return NDBT_FAILED;
341 }
342 }
343
344 return NDBT_OK;
345 }
346
347
348 int
scanReadRecords(Ndb * pNdb,int parallelism,NdbOperation::LockMode lm,int records,int noAttribs,int * attrib_list,ReadCallBackFn * fn)349 UtilTransactions::scanReadRecords(Ndb* pNdb,
350 int parallelism,
351 NdbOperation::LockMode lm,
352 int records,
353 int noAttribs,
354 int *attrib_list,
355 ReadCallBackFn* fn){
356
357 int retryAttempt = 0;
358 const int retryMax = 100;
359 int check;
360 NdbScanOperation *pOp;
361 NDBT_ResultRow row(tab);
362
363 while (true){
364
365 if (retryAttempt >= retryMax){
366 g_info << "ERROR: has retried this operation " << retryAttempt
367 << " times, failing!" << endl;
368 return NDBT_FAILED;
369 }
370
371 pTrans = pNdb->startTransaction();
372 if (pTrans == NULL) {
373 const NdbError err = pNdb->getNdbError();
374
375 if (err.status == NdbError::TemporaryError){
376 ERR(err);
377 NdbSleep_MilliSleep(50);
378 retryAttempt++;
379 continue;
380 }
381 ERR(err);
382 return NDBT_FAILED;
383 }
384
385 pOp = getScanOperation(pTrans);
386 if (pOp == NULL) {
387 const NdbError err = pNdb->getNdbError();
388 closeTransaction(pNdb);
389
390 if (err.status == NdbError::TemporaryError){
391 ERR(err);
392 NdbSleep_MilliSleep(50);
393 retryAttempt++;
394 continue;
395 }
396 ERR(err);
397 return NDBT_FAILED;
398 }
399
400 if( pOp->readTuples(lm, 0, parallelism) ) {
401 ERR(pTrans->getNdbError());
402 closeTransaction(pNdb);
403 return NDBT_FAILED;
404 }
405
406 // Call getValue for all the attributes supplied in attrib_list
407 // ************************************************
408 for (int a = 0; a < noAttribs; a++){
409 if (attrib_list[a] < tab.getNoOfColumns()){
410 g_info << "getValue(" << attrib_list[a] << ")" << endl;
411 if ((row.attributeStore(attrib_list[a]) =
412 pOp->getValue(tab.getColumn(attrib_list[a])->getName())) == 0) {
413 ERR(pTrans->getNdbError());
414 closeTransaction(pNdb);
415 return NDBT_FAILED;
416 }
417 }
418 }
419 // *************************************************
420
421 check = pTrans->execute(NoCommit, AbortOnError);
422 if( check == -1 ) {
423 const NdbError err = pTrans->getNdbError();
424
425 if (err.status == NdbError::TemporaryError){
426 ERR(err);
427 closeTransaction(pNdb);
428 NdbSleep_MilliSleep(50);
429 retryAttempt++;
430 continue;
431 }
432 ERR(err);
433 closeTransaction(pNdb);
434 return NDBT_FAILED;
435 }
436
437 int eof;
438 int rows = 0;
439
440
441 while((eof = pOp->nextResult()) == 0){
442 rows++;
443
444 // Call callback for each record returned
445 if(fn != NULL)
446 fn(&row);
447 }
448 if (eof == -1) {
449 const NdbError err = pTrans->getNdbError();
450
451 if (err.status == NdbError::TemporaryError){
452 ERR(err);
453 closeTransaction(pNdb);
454 NdbSleep_MilliSleep(50);
455 retryAttempt++;
456 continue;
457 }
458 ERR(err);
459 closeTransaction(pNdb);
460 return NDBT_FAILED;
461 }
462
463 closeTransaction(pNdb);
464 g_info << rows << " rows have been read" << endl;
465 if (records != 0 && rows != records){
466 g_info << "Check expected number of records failed" << endl
467 << " expected=" << records <<", " << endl
468 << " read=" << rows << endl;
469 return NDBT_FAILED;
470 }
471
472 return NDBT_OK;
473 }
474 return NDBT_FAILED;
475 }
476
477 int
selectCount(Ndb * pNdb,int parallelism,int * count_rows,NdbOperation::LockMode lm)478 UtilTransactions::selectCount(Ndb* pNdb,
479 int parallelism,
480 int* count_rows,
481 NdbOperation::LockMode lm)
482 {
483
484 int retryAttempt = 0;
485 const int retryMax = 100;
486 int check;
487
488 while (true){
489
490 if (retryAttempt >= retryMax){
491 g_info << "ERROR: has retried this operation " << retryAttempt
492 << " times, failing!" << endl;
493 return NDBT_FAILED;
494 }
495
496 pTrans = pNdb->startTransaction();
497 if (pTrans == NULL)
498 {
499 if (pNdb->getNdbError().status == NdbError::TemporaryError)
500 {
501 NdbSleep_MilliSleep(50);
502 retryAttempt++;
503 continue;
504 }
505 ERR(pNdb->getNdbError());
506 return NDBT_FAILED;
507 }
508
509
510 NdbScanOperation *pOp = getScanOperation(pTrans);
511 if (pOp == NULL)
512 {
513 NdbError err = pTrans->getNdbError();
514 closeTransaction(pNdb);
515 if (err.status == NdbError::TemporaryError)
516 {
517 NdbSleep_MilliSleep(50);
518 retryAttempt++;
519 continue;
520 }
521 ERR(err);
522 return NDBT_FAILED;
523 }
524
525 if( pOp->readTuples(lm) )
526 {
527 ERR(pTrans->getNdbError());
528 closeTransaction(pNdb);
529 return NDBT_FAILED;
530 }
531
532 if(0){
533 NdbScanFilter sf(pOp);
534 sf.begin(NdbScanFilter::OR);
535 sf.eq(2, (Uint32)30);
536 sf.end();
537 }
538
539 check = pTrans->execute(NoCommit, AbortOnError);
540 if( check == -1 )
541 {
542 NdbError err = pTrans->getNdbError();
543 closeTransaction(pNdb);
544 if (err.status == NdbError::TemporaryError)
545 {
546 NdbSleep_MilliSleep(50);
547 retryAttempt++;
548 continue;
549 }
550 ERR(err);
551 return NDBT_FAILED;
552 }
553
554 int eof;
555 int rows = 0;
556
557
558 while((eof = pOp->nextResult()) == 0){
559 rows++;
560 }
561
562 if (eof == -1)
563 {
564 const NdbError err = pTrans->getNdbError();
565 closeTransaction(pNdb);
566
567 if (err.status == NdbError::TemporaryError)
568 {
569 NdbSleep_MilliSleep(50);
570 retryAttempt++;
571 continue;
572 }
573 ERR(err);
574 closeTransaction(pNdb);
575 return NDBT_FAILED;
576 }
577
578 closeTransaction(pNdb);
579
580 if (count_rows != NULL){
581 *count_rows = rows;
582 }
583
584 return NDBT_OK;
585 }
586 return NDBT_FAILED;
587 }
588
589 int
verifyIndex(Ndb * pNdb,const char * indexName,int parallelism,bool transactional)590 UtilTransactions::verifyIndex(Ndb* pNdb,
591 const char* indexName,
592 int parallelism,
593 bool transactional){
594
595
596 const NdbDictionary::Index* pIndex
597 = pNdb->getDictionary()->getIndex(indexName, tab.getName());
598 if (pIndex == 0){
599 ndbout << " Index " << indexName << " does not exist!" << endl;
600 return NDBT_FAILED;
601 }
602
603 switch (pIndex->getType()){
604 case NdbDictionary::Index::UniqueHashIndex:
605 return verifyUniqueIndex(pNdb, pIndex, parallelism, transactional);
606 case NdbDictionary::Index::OrderedIndex:
607 return verifyOrderedIndex(pNdb, pIndex, parallelism, transactional);
608 break;
609 default:
610 ndbout << "Unknown index type" << endl;
611 break;
612 }
613
614 return NDBT_FAILED;
615 }
616
617 int
verifyUniqueIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)618 UtilTransactions::verifyUniqueIndex(Ndb* pNdb,
619 const NdbDictionary::Index * pIndex,
620 int parallelism,
621 bool transactional){
622
623 /**
624 * Scan all rows in TABLE and for each found row make one read in
625 * TABLE and one using INDEX_TABLE. Then compare the two returned
626 * rows. They should be equal!
627 *
628 */
629
630 if (scanAndCompareUniqueIndex(pNdb,
631 pIndex,
632 parallelism,
633 transactional) != NDBT_OK){
634 return NDBT_FAILED;
635 }
636
637
638 return NDBT_OK;
639
640 }
641
642
643 int
scanAndCompareUniqueIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)644 UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
645 const NdbDictionary::Index* pIndex,
646 int parallelism,
647 bool transactional){
648
649 int retryAttempt = 0;
650 const int retryMax = 100;
651 int check;
652 NdbScanOperation *pOp;
653 NDBT_ResultRow row(tab);
654
655 parallelism = 1;
656
657 while (true){
658 restart:
659 if (retryAttempt >= retryMax){
660 g_info << "ERROR: has retried this operation " << retryAttempt
661 << " times, failing!" << endl;
662 return NDBT_FAILED;
663 }
664
665 pTrans = pNdb->startTransaction();
666 if (pTrans == NULL) {
667 const NdbError err = pNdb->getNdbError();
668
669 if (err.status == NdbError::TemporaryError){
670 ERR(err);
671 NdbSleep_MilliSleep(50);
672 retryAttempt++;
673 continue;
674 }
675 ERR(err);
676 return NDBT_FAILED;
677 }
678
679 pOp = pTrans->getNdbScanOperation(tab.getName());
680 if (pOp == NULL) {
681 const NdbError err = pNdb->getNdbError();
682 closeTransaction(pNdb);
683 ERR(err);
684
685 if (err.status == NdbError::TemporaryError){
686 NdbSleep_MilliSleep(50);
687 retryAttempt++;
688 continue;
689 }
690 return NDBT_FAILED;
691 }
692
693 int rs;
694 if(transactional){
695 rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
696 } else {
697 rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallelism);
698 }
699
700 if( rs != 0 ) {
701 ERR(pTrans->getNdbError());
702 closeTransaction(pNdb);
703 return NDBT_FAILED;
704 }
705
706 // Read all attributes
707 for (int a = 0; a < tab.getNoOfColumns(); a++){
708 if ((row.attributeStore(a) =
709 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
710 ERR(pTrans->getNdbError());
711 closeTransaction(pNdb);
712 return NDBT_FAILED;
713 }
714 }
715
716 check = pTrans->execute(NoCommit, AbortOnError);
717 if( check == -1 ) {
718 const NdbError err = pTrans->getNdbError();
719
720 if (err.status == NdbError::TemporaryError){
721 ERR(err);
722 closeTransaction(pNdb);
723 NdbSleep_MilliSleep(50);
724 retryAttempt++;
725 continue;
726 }
727 ERR(err);
728 closeTransaction(pNdb);
729 return NDBT_FAILED;
730 }
731
732 int eof;
733 int rows = 0;
734
735
736 while((eof = pOp->nextResult()) == 0){
737 rows++;
738
739 // ndbout << row.c_str().c_str() << endl;
740
741 if (readRowFromTableAndIndex(pNdb,
742 pTrans,
743 pIndex,
744 row) != NDBT_OK){
745
746 while((eof= pOp->nextResult(false)) == 0);
747 if(eof == 2)
748 eof = pOp->nextResult(true); // this should give -1
749 if(eof == -1)
750 {
751 const NdbError err = pTrans->getNdbError();
752
753 if (err.status == NdbError::TemporaryError){
754 ERR(err);
755 closeTransaction(pNdb);
756 NdbSleep_MilliSleep(50);
757 retryAttempt++;
758 goto restart;
759 }
760 }
761 closeTransaction(pNdb);
762 return NDBT_FAILED;
763 }
764 }
765 if (eof == -1) {
766 const NdbError err = pTrans->getNdbError();
767
768 if (err.status == NdbError::TemporaryError){
769 ERR(err);
770 closeTransaction(pNdb);
771 NdbSleep_MilliSleep(50);
772 retryAttempt++;
773 continue;
774 }
775 ERR(err);
776 closeTransaction(pNdb);
777 return NDBT_FAILED;
778 }
779
780 closeTransaction(pNdb);
781
782 return NDBT_OK;
783 }
784 return NDBT_FAILED;
785 }
786 int
readRowFromTableAndIndex(Ndb * pNdb,NdbConnection * scanTrans,const NdbDictionary::Index * pIndex,NDBT_ResultRow & row)787 UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
788 NdbConnection* scanTrans,
789 const NdbDictionary::Index* pIndex,
790 NDBT_ResultRow& row ){
791
792
793 NdbDictionary::Index::Type indexType= pIndex->getType();
794 int retryAttempt = 0;
795 const int retryMax = 100;
796 int check, a;
797 NdbConnection *pTrans1=NULL;
798 NdbOperation *pOp;
799
800 int return_code= NDBT_FAILED;
801
802 // Allocate place to store the result
803 NDBT_ResultRow tabRow(tab);
804 NDBT_ResultRow indexRow(tab);
805 const char * indexName = pIndex->getName();
806
807 while (true){
808 if(retryAttempt)
809 ndbout_c("retryAttempt %d", retryAttempt);
810 if (retryAttempt >= retryMax){
811 g_info << "ERROR: has retried this operation " << retryAttempt
812 << " times, failing!" << endl;
813 goto close_all;
814 }
815
816 pTrans1 = pNdb->hupp(scanTrans); //startTransaction();
817 if (pTrans1 == NULL) {
818 const NdbError err = pNdb->getNdbError();
819
820 if (err.code == 4006)
821 goto close_all;
822
823 if (err.status == NdbError::TemporaryError){
824 ERR(err);
825 NdbSleep_MilliSleep(50);
826 retryAttempt++;
827 continue;
828 }
829
830 if(err.code == 0){
831 return_code = NDBT_OK;
832 goto close_all;
833 }
834 ERR(err);
835 goto close_all;
836 }
837
838 /**
839 * Read the record from TABLE
840 */
841 pOp = pTrans1->getNdbOperation(tab.getName());
842 if (pOp == NULL) {
843 ERR(pTrans1->getNdbError());
844 goto close_all;
845 }
846
847 check = pOp->readTuple();
848 if( check == -1 ) {
849 ERR(pTrans1->getNdbError());
850 goto close_all;
851 }
852
853 // Define primary keys
854 #if VERBOSE
855 printf("PK: ");
856 #endif
857 for(a = 0; a<tab.getNoOfColumns(); a++){
858 const NdbDictionary::Column* attr = tab.getColumn(a);
859 if (attr->getPrimaryKey() == true){
860 if (pOp->equal(attr->getName(), row.attributeStore(a)->aRef()) != 0){
861 ERR(pTrans1->getNdbError());
862 goto close_all;
863 }
864 #if VERBOSE
865 printf("%s = %d: ", attr->getName(), row.attributeStore(a)->aRef());
866 #endif
867 }
868 }
869 #if VERBOSE
870 printf("\n");
871 #endif
872 // Read all attributes
873 #if VERBOSE
874 printf("Reading %u attributes: ", tab.getNoOfColumns());
875 #endif
876 for(a = 0; a<tab.getNoOfColumns(); a++){
877 if((tabRow.attributeStore(a) =
878 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
879 ERR(pTrans1->getNdbError());
880 goto close_all;
881 }
882 #if VERBOSE
883 printf("%s ", tab.getColumn(a)->getName());
884 #endif
885 }
886 #if VERBOSE
887 printf("\n");
888 #endif
889
890 /**
891 * Read the record from INDEX_TABLE
892 */
893 NdbIndexOperation* pIndexOp= NULL;
894 NdbIndexScanOperation *pScanOp= NULL;
895 NdbOperation *pIOp= 0;
896
897 bool null_found= false;
898 for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
899 const NdbDictionary::Column * col = pIndex->getColumn(a);
900
901 if (row.attributeStore(col->getName())->isNULL())
902 {
903 null_found= true;
904 break;
905 }
906 }
907
908 const char * tabName= tab.getName();
909 if(!null_found)
910 {
911 if (indexType == NdbDictionary::Index::UniqueHashIndex) {
912 pIOp= pIndexOp= pTrans1->getNdbIndexOperation(indexName, tabName);
913 } else {
914 pIOp= pScanOp= pTrans1->getNdbIndexScanOperation(indexName, tabName);
915 }
916
917 if (pIOp == NULL) {
918 ERR(pTrans1->getNdbError());
919 goto close_all;
920 }
921
922 {
923 bool not_ok;
924 if (pIndexOp) {
925 not_ok = pIndexOp->readTuple() == -1;
926 } else {
927 not_ok = pScanOp->readTuples();
928 }
929
930 if( not_ok ) {
931 ERR(pTrans1->getNdbError());
932 goto close_all;
933 }
934 }
935
936 // Define primary keys for index
937 #if VERBOSE
938 printf("SI: ");
939 #endif
940 for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
941 const NdbDictionary::Column * col = pIndex->getColumn(a);
942
943 if ( !row.attributeStore(col->getName())->isNULL() ) {
944 if(pIOp->equal(col->getName(),
945 row.attributeStore(col->getName())->aRef()) != 0){
946 ERR(pTrans1->getNdbError());
947 goto close_all;
948 }
949 }
950 #if VERBOSE
951 printf("%s = %d: ", col->getName(), row.attributeStore(a)->aRef());
952 #endif
953 }
954 #if VERBOSE
955 printf("\n");
956 #endif
957
958 // Read all attributes
959 #if VERBOSE
960 printf("Reading %u attributes: ", tab.getNoOfColumns());
961 #endif
962 for(a = 0; a<tab.getNoOfColumns(); a++){
963 void* pCheck;
964
965 pCheck= indexRow.attributeStore(a)=
966 pIOp->getValue(tab.getColumn(a)->getName());
967
968 if(pCheck == NULL) {
969 ERR(pTrans1->getNdbError());
970 goto close_all;
971 }
972 #if VERBOSE
973 printf("%s ", tab.getColumn(a)->getName());
974 #endif
975 }
976 }
977 #if VERBOSE
978 printf("\n");
979 #endif
980 scanTrans->refresh();
981 check = pTrans1->execute(Commit, AbortOnError);
982 if( check == -1 ) {
983 const NdbError err = pTrans1->getNdbError();
984
985 if (err.status == NdbError::TemporaryError){
986 ERR(err);
987 pNdb->closeTransaction(pTrans1);
988 NdbSleep_MilliSleep(50);
989 retryAttempt++;
990 continue;
991 }
992 ndbout << "Error when comparing records - normal op" << endl;
993 ERR(err);
994 ndbout << "row: " << row.c_str().c_str() << endl;
995 goto close_all;
996 }
997
998 /**
999 * Compare the two rows
1000 */
1001 if(!null_found){
1002 if (pScanOp) {
1003 if (pScanOp->nextResult() != 0){
1004 const NdbError err = pTrans1->getNdbError();
1005 ERR(err);
1006 ndbout << "Error when comparing records - index op next_result missing" << endl;
1007 ndbout << "row: " << row.c_str().c_str() << endl;
1008 goto close_all;
1009 }
1010 }
1011 if (!(tabRow.c_str() == indexRow.c_str())){
1012 ndbout << "Error when comapring records" << endl;
1013 ndbout << " tabRow: \n" << tabRow.c_str().c_str() << endl;
1014 ndbout << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1015 goto close_all;
1016 }
1017 if (pScanOp) {
1018 if (pScanOp->nextResult() == 0){
1019 ndbout << "Error when comparing records - index op next_result to many" << endl;
1020 ndbout << "row: " << row.c_str().c_str() << endl;
1021 goto close_all;
1022 }
1023 }
1024 }
1025 return_code= NDBT_OK;
1026 goto close_all;
1027 }
1028
1029 close_all:
1030 if (pTrans1)
1031 pNdb->closeTransaction(pTrans1);
1032
1033 return return_code;
1034 }
1035
1036 int
verifyOrderedIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)1037 UtilTransactions::verifyOrderedIndex(Ndb* pNdb,
1038 const NdbDictionary::Index* pIndex,
1039 int parallelism,
1040 bool transactional){
1041
1042 int retryAttempt = 0;
1043 const int retryMax = 100;
1044 int check;
1045 NdbScanOperation *pOp;
1046 NdbIndexScanOperation * iop = 0;
1047
1048 NDBT_ResultRow scanRow(tab);
1049 NDBT_ResultRow pkRow(tab);
1050 NDBT_ResultRow indexRow(tab);
1051 const char * indexName = pIndex->getName();
1052
1053 int res;
1054 parallelism = 1;
1055
1056 while (true){
1057
1058 if (retryAttempt >= retryMax){
1059 g_info << "ERROR: has retried this operation " << retryAttempt
1060 << " times, failing!" << endl;
1061 return NDBT_FAILED;
1062 }
1063
1064 pTrans = pNdb->startTransaction();
1065 if (pTrans == NULL) {
1066 const NdbError err = pNdb->getNdbError();
1067
1068 if (err.status == NdbError::TemporaryError){
1069 ERR(err);
1070 NdbSleep_MilliSleep(50);
1071 retryAttempt++;
1072 continue;
1073 }
1074 ERR(err);
1075 return NDBT_FAILED;
1076 }
1077
1078 pOp = pTrans->getNdbScanOperation(tab.getName());
1079 if (pOp == NULL) {
1080 ERR(pTrans->getNdbError());
1081 closeTransaction(pNdb);
1082 return NDBT_FAILED;
1083 }
1084
1085 if( pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism) ) {
1086 ERR(pTrans->getNdbError());
1087 closeTransaction(pNdb);
1088 return NDBT_FAILED;
1089 }
1090
1091 if(get_values(pOp, scanRow))
1092 {
1093 abort();
1094 }
1095
1096 check = pTrans->execute(NoCommit, AbortOnError);
1097 if( check == -1 ) {
1098 const NdbError err = pTrans->getNdbError();
1099
1100 if (err.status == NdbError::TemporaryError){
1101 ERR(err);
1102 closeTransaction(pNdb);
1103 NdbSleep_MilliSleep(50);
1104 retryAttempt++;
1105 continue;
1106 }
1107 ERR(err);
1108 closeTransaction(pNdb);
1109 return NDBT_FAILED;
1110 }
1111
1112 int eof;
1113 int rows = 0;
1114 while(check == 0 && (eof = pOp->nextResult()) == 0){
1115 rows++;
1116
1117 bool null_found= false;
1118 for(int a = 0; a<(int)pIndex->getNoOfColumns(); a++){
1119 const NdbDictionary::Column * col = pIndex->getColumn(a);
1120 if (scanRow.attributeStore(col->getName())->isNULL())
1121 {
1122 null_found= true;
1123 break;
1124 }
1125 }
1126
1127 // Do pk lookup
1128 NdbOperation * pk = pTrans->getNdbOperation(tab.getName());
1129 if(!pk || pk->readTuple())
1130 goto error;
1131 if(equal(&tab, pk, scanRow) || get_values(pk, pkRow))
1132 goto error;
1133
1134 if(!null_found)
1135 {
1136 if((iop= pTrans->getNdbIndexScanOperation(indexName,
1137 tab.getName())) != 0)
1138 {
1139 if(iop->readTuples(NdbScanOperation::LM_CommittedRead,
1140 parallelism))
1141 goto error;
1142 if(get_values(iop, indexRow))
1143 goto error;
1144 if(equal(pIndex, iop, scanRow))
1145 goto error;
1146 }
1147 else
1148 {
1149 goto error;
1150 }
1151 }
1152
1153 check = pTrans->execute(NoCommit, AbortOnError);
1154 if(check)
1155 goto error;
1156
1157 if(scanRow.c_str() != pkRow.c_str()){
1158 g_err << "Error when comapring records" << endl;
1159 g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
1160 g_err << " pkRow: \n" << pkRow.c_str().c_str() << endl;
1161 closeTransaction(pNdb);
1162 return NDBT_FAILED;
1163 }
1164
1165 if(!null_found)
1166 {
1167
1168 if((res= iop->nextResult()) != 0){
1169 g_err << "Failed to find row using index: " << res << endl;
1170 ERR(pTrans->getNdbError());
1171 closeTransaction(pNdb);
1172 return NDBT_FAILED;
1173 }
1174
1175 if(scanRow.c_str() != indexRow.c_str()){
1176 g_err << "Error when comapring records" << endl;
1177 g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
1178 g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1179 closeTransaction(pNdb);
1180 return NDBT_FAILED;
1181 }
1182
1183 if(iop->nextResult() == 0){
1184 g_err << "Found extra row!!" << endl;
1185 g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1186 closeTransaction(pNdb);
1187 return NDBT_FAILED;
1188 }
1189 }
1190 }
1191
1192 if (eof == -1 || check == -1) {
1193 error:
1194 const NdbError err = pTrans->getNdbError();
1195
1196 if (err.status == NdbError::TemporaryError){
1197 ERR(err);
1198 iop = 0;
1199 closeTransaction(pNdb);
1200 NdbSleep_MilliSleep(50);
1201 retryAttempt++;
1202 rows--;
1203 continue;
1204 }
1205 ERR(err);
1206 closeTransaction(pNdb);
1207 return NDBT_FAILED;
1208 }
1209
1210 closeTransaction(pNdb);
1211
1212 return NDBT_OK;
1213 }
1214 return NDBT_FAILED;
1215 }
1216
1217 int
get_values(NdbOperation * op,NDBT_ResultRow & dst)1218 UtilTransactions::get_values(NdbOperation* op, NDBT_ResultRow& dst)
1219 {
1220 for (int a = 0; a < tab.getNoOfColumns(); a++){
1221 NdbRecAttr*& ref= dst.attributeStore(a);
1222 if ((ref= op->getValue(a)) == 0)
1223 {
1224 return NDBT_FAILED;
1225 }
1226 }
1227 return 0;
1228 }
1229
1230 int
equal(const NdbDictionary::Index * pIndex,NdbOperation * op,const NDBT_ResultRow & src)1231 UtilTransactions::equal(const NdbDictionary::Index* pIndex,
1232 NdbOperation* op, const NDBT_ResultRow& src)
1233 {
1234 for(Uint32 a = 0; a<pIndex->getNoOfColumns(); a++){
1235 const NdbDictionary::Column * col = pIndex->getColumn(a);
1236 if(op->equal(col->getName(),
1237 src.attributeStore(col->getName())->aRef()) != 0){
1238 return NDBT_FAILED;
1239 }
1240 }
1241 return 0;
1242 }
1243
1244 int
equal(const NdbDictionary::Table * pTable,NdbOperation * op,const NDBT_ResultRow & src)1245 UtilTransactions::equal(const NdbDictionary::Table* pTable,
1246 NdbOperation* op, const NDBT_ResultRow& src)
1247 {
1248 for(Uint32 a = 0; (int)a<tab.getNoOfColumns(); a++){
1249 const NdbDictionary::Column* attr = tab.getColumn(a);
1250 if (attr->getPrimaryKey() == true){
1251 if (op->equal(attr->getName(), src.attributeStore(a)->aRef()) != 0){
1252 return NDBT_FAILED;
1253 }
1254 }
1255 }
1256 return 0;
1257 }
1258
1259 NdbScanOperation*
getScanOperation(NdbConnection * pTrans)1260 UtilTransactions::getScanOperation(NdbConnection* pTrans)
1261 {
1262 return (NdbScanOperation*)
1263 getOperation(pTrans, NdbOperation::OpenScanRequest);
1264 }
1265
1266 NdbOperation*
getOperation(NdbConnection * pTrans,NdbOperation::OperationType type)1267 UtilTransactions::getOperation(NdbConnection* pTrans,
1268 NdbOperation::OperationType type)
1269 {
1270 switch(type){
1271 case NdbOperation::ReadRequest:
1272 case NdbOperation::ReadExclusive:
1273 if(idx)
1274 {
1275 switch(idx->getType()){
1276 case NdbDictionary::Index::UniqueHashIndex:
1277 return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
1278 case NdbDictionary::Index::OrderedIndex:
1279 return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1280 default:
1281 abort();
1282 }
1283 }
1284 case NdbOperation::InsertRequest:
1285 case NdbOperation::WriteRequest:
1286 return pTrans->getNdbOperation(tab.getName());
1287 case NdbOperation::UpdateRequest:
1288 case NdbOperation::DeleteRequest:
1289 if(idx)
1290 {
1291 switch(idx->getType()){
1292 case NdbDictionary::Index::UniqueHashIndex:
1293 return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
1294 default:
1295 break;
1296 }
1297 }
1298 return pTrans->getNdbOperation(tab.getName());
1299 case NdbOperation::OpenScanRequest:
1300 if(idx)
1301 {
1302 switch(idx->getType()){
1303 case NdbDictionary::Index::OrderedIndex:
1304 return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1305 default:
1306 break;
1307 }
1308 }
1309 return pTrans->getNdbScanOperation(tab.getName());
1310 case NdbOperation::OpenRangeScanRequest:
1311 if(idx)
1312 {
1313 switch(idx->getType()){
1314 case NdbDictionary::Index::OrderedIndex:
1315 return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1316 default:
1317 break;
1318 }
1319 }
1320 return 0;
1321 default:
1322 abort();
1323 }
1324 return 0;
1325 }
1326
1327 #include <HugoOperations.hpp>
1328
1329 int
closeTransaction(Ndb * pNdb)1330 UtilTransactions::closeTransaction(Ndb* pNdb)
1331 {
1332 if (pTrans != NULL){
1333 pNdb->closeTransaction(pTrans);
1334 pTrans = NULL;
1335 }
1336 return 0;
1337 }
1338
1339 int
compare(Ndb * pNdb,const char * tab_name2,int flags)1340 UtilTransactions::compare(Ndb* pNdb, const char* tab_name2, int flags){
1341
1342
1343 NdbError err;
1344 int return_code= 0, row_count= 0;
1345 int retryAttempt = 0, retryMax = 10;
1346
1347 HugoCalculator calc(tab);
1348 NDBT_ResultRow row(tab);
1349 const NdbDictionary::Table* tmp= pNdb->getDictionary()->getTable(tab_name2);
1350 if(tmp == 0)
1351 {
1352 g_err << "Unable to lookup table: " << tab_name2
1353 << endl << pNdb->getDictionary()->getNdbError() << endl;
1354 return -1;
1355 }
1356 const NdbDictionary::Table& tab2= *tmp;
1357
1358 HugoOperations cmp(tab2);
1359 UtilTransactions count(tab2);
1360
1361 while (true){
1362 loop:
1363 if (retryAttempt++ >= retryMax){
1364 g_err << "ERROR: compare has retried this operation " << retryAttempt
1365 << " times, failing!" << endl;
1366 return -1;
1367 }
1368
1369 NdbScanOperation *pOp= 0;
1370 pTrans = pNdb->startTransaction();
1371 if (pTrans == NULL) {
1372 err = pNdb->getNdbError();
1373 goto error;
1374 }
1375
1376 pOp= pTrans->getNdbScanOperation(tab.getName());
1377 if (pOp == NULL) {
1378 ERR(err= pTrans->getNdbError());
1379 goto error;
1380 }
1381
1382 if( pOp->readTuples(NdbScanOperation::LM_Read) ) {
1383 ERR(err= pTrans->getNdbError());
1384 goto error;
1385 }
1386
1387 // Read all attributes
1388 {
1389 for (int a = 0; a < tab.getNoOfColumns(); a++){
1390 if ((row.attributeStore(a) =
1391 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1392 ERR(err= pTrans->getNdbError());
1393 goto error;
1394 }
1395 }
1396 }
1397
1398 if( pTrans->execute(NoCommit, AbortOnError) == -1 ) {
1399 ERR(err= pTrans->getNdbError());
1400 goto error;
1401 }
1402
1403 row_count= 0;
1404 {
1405 int eof;
1406 while((eof = pOp->nextResult(true)) == 0)
1407 {
1408 do {
1409 row_count++;
1410 if(cmp.startTransaction(pNdb) != NDBT_OK)
1411 {
1412 ERR(err= pNdb->getNdbError());
1413 goto error;
1414 }
1415 int rowNo= calc.getIdValue(&row);
1416 if(cmp.pkReadRecord(pNdb, rowNo, 1) != NDBT_OK)
1417 {
1418 ERR(err= cmp.getTransaction()->getNdbError());
1419 goto error;
1420 }
1421 if(cmp.execute_Commit(pNdb) != NDBT_OK ||
1422 cmp.getTransaction()->getNdbError().code)
1423 {
1424 ERR(err= cmp.getTransaction()->getNdbError());
1425 goto error;
1426 }
1427 if(row != cmp.get_row(0))
1428 {
1429 g_err << "COMPARE FAILED" << endl;
1430 g_err << row << endl;
1431 g_err << cmp.get_row(0) << endl;
1432 return_code++;
1433 }
1434 retryAttempt= 0;
1435 cmp.closeTransaction(pNdb);
1436 } while((eof = pOp->nextResult(false)) == 0);
1437 }
1438 if (eof == -1)
1439 {
1440 err = pTrans->getNdbError();
1441 goto error;
1442 }
1443 }
1444
1445 closeTransaction(pNdb);
1446
1447 g_info << row_count << " rows compared" << endl;
1448 {
1449 int row_count2;
1450 if(count.selectCount(pNdb, 0, &row_count2) != NDBT_OK)
1451 {
1452 g_err << "Failed to count rows in tab_name2" << endl;
1453 return -1;
1454 }
1455
1456 g_info << row_count2 << " rows in tab_name2 - failed " << return_code
1457 << endl;
1458 return (row_count == row_count2 ? return_code : 1);
1459 }
1460 error:
1461 if(err.status == NdbError::TemporaryError)
1462 {
1463 g_err << err << endl;
1464 NdbSleep_MilliSleep(50);
1465 closeTransaction(pNdb);
1466 if(cmp.getTransaction())
1467 cmp.closeTransaction(pNdb);
1468
1469 goto loop;
1470 }
1471 g_err << "ERROR" << endl;
1472 g_err << err << endl;
1473
1474 break;
1475 }
1476
1477 closeTransaction(pNdb);
1478
1479 return return_code;
1480 }
1481
1482