1 /*
2 Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "UtilTransactions.hpp"
26 #include <NdbSleep.h>
27 #include <NdbScanFilter.hpp>
28
29 #define VERBOSE 0
30
UtilTransactions(const NdbDictionary::Table & _tab,const NdbDictionary::Index * _idx)31 UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab,
32 const NdbDictionary::Index* _idx):
33 tab(_tab), idx(_idx), pTrans(0)
34 {
35 m_defaultClearMethod = 3;
36 }
37
UtilTransactions(Ndb * ndb,const char * name,const char * index)38 UtilTransactions::UtilTransactions(Ndb* ndb,
39 const char * name,
40 const char * index) :
41 tab(* ndb->getDictionary()->getTable(name)),
42 idx(index ? ndb->getDictionary()->getIndex(index, name) : 0),
43 pTrans(0)
44 {
45 m_defaultClearMethod = 3;
46 }
47
48 #define RESTART_SCAN 99
49
50 #define RETURN_FAIL(err) return (err.code != 0 ? err.code : NDBT_FAILED)
51
52 int
clearTable(Ndb * pNdb,NdbScanOperation::ScanFlag flags,int records,int parallelism)53 UtilTransactions::clearTable(Ndb* pNdb,
54 NdbScanOperation::ScanFlag flags,
55 int records,
56 int parallelism){
57 // Scan all records exclusive and delete
58 // them one by one
59 int retryAttempt = 0;
60 const int retryMax = 10;
61 int deletedRows = 0;
62 int check;
63 NdbScanOperation *pOp;
64 NdbError err;
65
66 int par = parallelism;
67 while (true){
68 restart:
69 if (retryAttempt++ >= retryMax){
70 g_err << "ERROR: has retried this operation " << retryAttempt
71 << " times, failing!, line: " << __LINE__ << endl;
72 return NDBT_FAILED;
73 }
74
75 pTrans = pNdb->startTransaction();
76 if (pTrans == NULL) {
77 err = pNdb->getNdbError();
78 if (err.status == NdbError::TemporaryError){
79 NDB_ERR(err);
80 NdbSleep_MilliSleep(50);
81 continue;
82 }
83 NDB_ERR(err);
84 goto failed;
85 }
86
87 pOp = getScanOperation(pTrans);
88 if (pOp == NULL) {
89 err = pTrans->getNdbError();
90 if(err.status == NdbError::TemporaryError){
91 NDB_ERR(err);
92 closeTransaction(pNdb);
93 NdbSleep_MilliSleep(50);
94 par = 1;
95 goto restart;
96 }
97 NDB_ERR(err);
98 goto failed;
99 }
100
101 if( pOp->readTuples(NdbOperation::LM_Exclusive, flags, par) ) {
102 err = pTrans->getNdbError();
103 NDB_ERR(err);
104 goto failed;
105 }
106
107 if(pTrans->execute(NoCommit, AbortOnError) != 0){
108 err = pTrans->getNdbError();
109 if(err.status == NdbError::TemporaryError){
110 NDB_ERR(err);
111 closeTransaction(pNdb);
112 NdbSleep_MilliSleep(50);
113 continue;
114 }
115 NDB_ERR(err);
116 goto failed;
117 }
118
119 while((check = pOp->nextResult(true)) == 0){
120 do {
121 if (pOp->deleteCurrentTuple() != 0){
122 NDB_ERR(err);
123 goto failed;
124 }
125 deletedRows++;
126 } while((check = pOp->nextResult(false)) == 0);
127
128 if(check != -1){
129 check = pTrans->execute(Commit, AbortOnError);
130 pTrans->restart();
131 }
132
133 err = pTrans->getNdbError();
134 if(check == -1){
135 if(err.status == NdbError::TemporaryError){
136 NDB_ERR(err);
137 closeTransaction(pNdb);
138 NdbSleep_MilliSleep(50);
139 par = 1;
140 goto restart;
141 }
142 NDB_ERR(err);
143 goto failed;
144 }
145 }
146 if(check == -1){
147 err = pTrans->getNdbError();
148 if(err.status == NdbError::TemporaryError){
149 NDB_ERR(err);
150 closeTransaction(pNdb);
151 NdbSleep_MilliSleep(50);
152 par = 1;
153 goto restart;
154 }
155 NDB_ERR(err);
156 goto failed;
157 }
158 closeTransaction(pNdb);
159 return NDBT_OK;
160 }
161 abort(); /* Should never happen */
162 return NDBT_FAILED;
163
164 failed:
165 if(pTrans != 0) closeTransaction(pNdb);
166 return (err.code != 0 ? err.code : NDBT_FAILED);
167 }
168
169 int
clearTable(Ndb * pNdb,int records,int parallelism)170 UtilTransactions::clearTable(Ndb* pNdb,
171 int records,
172 int parallelism){
173
174 return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
175 records, parallelism);
176 }
177
178
179 int
clearTable1(Ndb * pNdb,int records,int parallelism)180 UtilTransactions::clearTable1(Ndb* pNdb,
181 int records,
182 int parallelism)
183 {
184 return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
185 records, 1);
186 }
187
188 int
clearTable2(Ndb * pNdb,int records,int parallelism)189 UtilTransactions::clearTable2(Ndb* pNdb,
190 int records,
191 int parallelism)
192 {
193 return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
194 records, parallelism);
195 }
196
197 int
clearTable3(Ndb * pNdb,int records,int parallelism)198 UtilTransactions::clearTable3(Ndb* pNdb,
199 int records,
200 int parallelism)
201 {
202 return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
203 records, parallelism);
204 }
205
206 int
copyTableData(Ndb * pNdb,const char * destName)207 UtilTransactions::copyTableData(Ndb* pNdb,
208 const char* destName){
209 // Scan all records and copy
210 // them to destName table
211 int retryAttempt = 0;
212 const int retryMax = 10;
213 int insertedRows = 0;
214 int parallelism = 240;
215 int check;
216 NdbScanOperation *pOp;
217 NDBT_ResultRow row(tab);
218
219 while (true){
220
221 if (retryAttempt >= retryMax){
222 g_err << "ERROR: has retried this operation " << retryAttempt
223 << " times, failing!, line: " << __LINE__ << endl;
224 return NDBT_FAILED;
225 }
226
227
228 pTrans = pNdb->startTransaction();
229 if (pTrans == NULL) {
230 const NdbError err = pNdb->getNdbError();
231
232 if (err.status == NdbError::TemporaryError){
233 NDB_ERR(err);
234 NdbSleep_MilliSleep(50);
235 retryAttempt++;
236 continue;
237 }
238 NDB_ERR(err);
239 return NDBT_FAILED;
240 }
241
242 pOp = pTrans->getNdbScanOperation(tab.getName());
243 if (pOp == NULL) {
244 NDB_ERR(pTrans->getNdbError());
245 closeTransaction(pNdb);
246 return NDBT_FAILED;
247 }
248
249 if( pOp->readTuples(NdbScanOperation::LM_Read, parallelism) ) {
250 NDB_ERR(pTrans->getNdbError());
251 closeTransaction(pNdb);
252 return NDBT_FAILED;
253 }
254
255 // Read all attributes
256 for (int a = 0; a < tab.getNoOfColumns(); a++){
257 if ((row.attributeStore(a) =
258 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
259 NDB_ERR(pTrans->getNdbError());
260 closeTransaction(pNdb);
261 return NDBT_FAILED;
262 }
263 }
264
265 check = pTrans->execute(NoCommit, AbortOnError);
266 if( check == -1 ) {
267 NDB_ERR(pTrans->getNdbError());
268 closeTransaction(pNdb);
269 return NDBT_FAILED;
270 }
271
272 int eof;
273 while((eof = pOp->nextResult(true)) == 0){
274 do {
275 insertedRows++;
276 if (addRowToInsert(pNdb, pTrans, row, destName) != 0){
277 closeTransaction(pNdb);
278 g_err << "Line: " << __LINE__ << " failed to add row" << endl;
279 return NDBT_FAILED;
280 }
281 } while((eof = pOp->nextResult(false)) == 0);
282
283 check = pTrans->execute(Commit, AbortOnError);
284 pTrans->restart();
285 if( check == -1 ) {
286 const NdbError err = pTrans->getNdbError();
287 NDB_ERR(err);
288 closeTransaction(pNdb);
289 return NDBT_FAILED;
290 }
291 }
292 if (eof == -1) {
293 const NdbError err = pTrans->getNdbError();
294
295 if (err.status == NdbError::TemporaryError){
296 NDB_ERR(err);
297 closeTransaction(pNdb);
298 NdbSleep_MilliSleep(50);
299 // If error = 488 there should be no limit on number of retry attempts
300 if (err.code != 488)
301 retryAttempt++;
302 continue;
303 }
304 NDB_ERR(err);
305 closeTransaction(pNdb);
306 return NDBT_FAILED;
307 }
308
309 closeTransaction(pNdb);
310
311 g_info << insertedRows << " rows copied" << endl;
312
313 return NDBT_OK;
314 }
315 abort(); /* Should never happen */
316 return NDBT_FAILED;
317 }
318
319 int
addRowToInsert(Ndb * pNdb,NdbConnection * pInsTrans,NDBT_ResultRow & row,const char * insertTabName)320 UtilTransactions::addRowToInsert(Ndb* pNdb,
321 NdbConnection* pInsTrans,
322 NDBT_ResultRow & row,
323 const char *insertTabName){
324
325 int check;
326 NdbOperation* pInsOp;
327
328 pInsOp = pInsTrans->getNdbOperation(insertTabName);
329 if (pInsOp == NULL) {
330 NDB_ERR(pInsTrans->getNdbError());
331 return NDBT_FAILED;
332 }
333
334 check = pInsOp->insertTuple();
335 if( check == -1 ) {
336 NDB_ERR(pInsTrans->getNdbError());
337 return NDBT_FAILED;
338 }
339
340 // Set all attributes
341 for (int a = 0; a < tab.getNoOfColumns(); a++){
342 NdbRecAttr* r = row.attributeStore(a);
343 int sz = r->get_size_in_bytes();
344 if (pInsOp->setValue(tab.getColumn(a)->getName(),
345 r->aRef(),
346 sz) != 0) {
347 NDB_ERR(pInsTrans->getNdbError());
348 return NDBT_FAILED;
349 }
350 }
351
352 return NDBT_OK;
353 }
354
355
356 int
scanReadRecords(Ndb * pNdb,int parallelism,NdbOperation::LockMode lm,int records,int noAttribs,int * attrib_list,ReadCallBackFn * fn)357 UtilTransactions::scanReadRecords(Ndb* pNdb,
358 int parallelism,
359 NdbOperation::LockMode lm,
360 int records,
361 int noAttribs,
362 int *attrib_list,
363 ReadCallBackFn* fn){
364
365 int retryAttempt = 0;
366 const int retryMax = 100;
367 int check;
368 NdbScanOperation *pOp;
369 NDBT_ResultRow row(tab);
370
371 while (true){
372
373 if (retryAttempt >= retryMax){
374 g_err << "ERROR: has retried this operation " << retryAttempt
375 << " times, failing!, line: " << __LINE__ << endl;
376 return NDBT_FAILED;
377 }
378
379 pTrans = pNdb->startTransaction();
380 if (pTrans == NULL) {
381 const NdbError err = pNdb->getNdbError();
382
383 if (err.status == NdbError::TemporaryError){
384 NDB_ERR(err);
385 NdbSleep_MilliSleep(50);
386 retryAttempt++;
387 continue;
388 }
389 NDB_ERR(err);
390 return NDBT_FAILED;
391 }
392
393 pOp = getScanOperation(pTrans);
394 if (pOp == NULL) {
395 const NdbError err = pNdb->getNdbError();
396 closeTransaction(pNdb);
397
398 if (err.status == NdbError::TemporaryError){
399 NDB_ERR(err);
400 NdbSleep_MilliSleep(50);
401 retryAttempt++;
402 continue;
403 }
404 NDB_ERR(err);
405 return NDBT_FAILED;
406 }
407
408 if( pOp->readTuples(lm, 0, parallelism) ) {
409 NDB_ERR(pTrans->getNdbError());
410 closeTransaction(pNdb);
411 return NDBT_FAILED;
412 }
413
414 // Call getValue for all the attributes supplied in attrib_list
415 // ************************************************
416 for (int a = 0; a < noAttribs; a++){
417 if (attrib_list[a] < tab.getNoOfColumns()){
418 g_info << "getValue(" << attrib_list[a] << ")" << endl;
419 if ((row.attributeStore(attrib_list[a]) =
420 pOp->getValue(tab.getColumn(attrib_list[a])->getName())) == 0) {
421 NDB_ERR(pTrans->getNdbError());
422 closeTransaction(pNdb);
423 return NDBT_FAILED;
424 }
425 }
426 }
427 // *************************************************
428
429 check = pTrans->execute(NoCommit, AbortOnError);
430 if( check == -1 ) {
431 const NdbError err = pTrans->getNdbError();
432
433 if (err.status == NdbError::TemporaryError){
434 NDB_ERR(err);
435 closeTransaction(pNdb);
436 NdbSleep_MilliSleep(50);
437 retryAttempt++;
438 continue;
439 }
440 NDB_ERR(err);
441 closeTransaction(pNdb);
442 return NDBT_FAILED;
443 }
444
445 int eof;
446 int rows = 0;
447
448
449 while((eof = pOp->nextResult()) == 0){
450 rows++;
451
452 // Call callback for each record returned
453 if(fn != NULL)
454 fn(&row);
455 }
456 if (eof == -1) {
457 const NdbError err = pTrans->getNdbError();
458
459 if (err.status == NdbError::TemporaryError){
460 NDB_ERR(err);
461 closeTransaction(pNdb);
462 NdbSleep_MilliSleep(50);
463 retryAttempt++;
464 continue;
465 }
466 NDB_ERR(err);
467 closeTransaction(pNdb);
468 return NDBT_FAILED;
469 }
470
471 closeTransaction(pNdb);
472 g_info << rows << " rows have been read" << endl;
473 if (records != 0 && rows != records){
474 g_err << "Check expected number of records failed" << endl
475 << " expected=" << records <<", " << endl
476 << " read=" << rows << endl;
477 return NDBT_FAILED;
478 }
479
480 return NDBT_OK;
481 }
482 abort(); /* Should never happen */
483 return NDBT_FAILED;
484 }
485
486 int
selectCount(Ndb * pNdb,int parallelism,int * count_rows,NdbOperation::LockMode lm)487 UtilTransactions::selectCount(Ndb* pNdb,
488 int parallelism,
489 int* count_rows,
490 NdbOperation::LockMode lm)
491 {
492
493 int retryAttempt = 0;
494 const int retryMax = 100;
495 int check;
496
497 while (true){
498
499 if (retryAttempt >= retryMax){
500 g_err << "ERROR: has retried this operation " << retryAttempt
501 << " times, failing!, line: " << __LINE__ << endl;
502 return NDBT_FAILED;
503 }
504
505 pTrans = pNdb->startTransaction();
506 if (pTrans == NULL)
507 {
508 if (pNdb->getNdbError().status == NdbError::TemporaryError)
509 {
510 NdbSleep_MilliSleep(50);
511 retryAttempt++;
512 continue;
513 }
514 NDB_ERR(pNdb->getNdbError());
515 return NDBT_FAILED;
516 }
517
518
519 NdbScanOperation *pOp = getScanOperation(pTrans);
520 if (pOp == NULL)
521 {
522 NdbError err = pTrans->getNdbError();
523 closeTransaction(pNdb);
524 if (err.status == NdbError::TemporaryError)
525 {
526 NdbSleep_MilliSleep(50);
527 retryAttempt++;
528 continue;
529 }
530 NDB_ERR(err);
531 return NDBT_FAILED;
532 }
533
534 if( pOp->readTuples(lm) )
535 {
536 NDB_ERR(pTrans->getNdbError());
537 closeTransaction(pNdb);
538 return NDBT_FAILED;
539 }
540
541 if(0){
542 NdbScanFilter sf(pOp);
543 sf.begin(NdbScanFilter::OR);
544 sf.eq(2, (Uint32)30);
545 sf.end();
546 }
547
548 check = pTrans->execute(NoCommit, AbortOnError);
549 if( check == -1 )
550 {
551 NdbError err = pTrans->getNdbError();
552 closeTransaction(pNdb);
553 if (err.status == NdbError::TemporaryError)
554 {
555 NdbSleep_MilliSleep(50);
556 retryAttempt++;
557 continue;
558 }
559 NDB_ERR(err);
560 return NDBT_FAILED;
561 }
562
563 int eof;
564 int rows = 0;
565
566
567 while((eof = pOp->nextResult()) == 0){
568 rows++;
569 }
570
571 if (eof == -1)
572 {
573 const NdbError err = pTrans->getNdbError();
574 closeTransaction(pNdb);
575
576 if (err.status == NdbError::TemporaryError)
577 {
578 NdbSleep_MilliSleep(50);
579 retryAttempt++;
580 continue;
581 }
582 NDB_ERR(err);
583 closeTransaction(pNdb);
584 return NDBT_FAILED;
585 }
586
587 closeTransaction(pNdb);
588
589 if (count_rows != NULL){
590 *count_rows = rows;
591 }
592
593 return NDBT_OK;
594 }
595 abort(); /* Should never happen */
596 return NDBT_FAILED;
597 }
598
599 int
verifyIndex(Ndb * pNdb,const char * indexName,int parallelism,bool transactional)600 UtilTransactions::verifyIndex(Ndb* pNdb,
601 const char* indexName,
602 int parallelism,
603 bool transactional){
604
605
606 const NdbDictionary::Index* pIndex
607 = pNdb->getDictionary()->getIndex(indexName, tab.getName());
608 if (pIndex == 0){
609 ndbout << " Index " << indexName << " does not exist!" << endl;
610 return NDBT_FAILED;
611 }
612
613 switch (pIndex->getType()){
614 case NdbDictionary::Index::UniqueHashIndex:
615 return verifyUniqueIndex(pNdb, pIndex, parallelism, transactional);
616 case NdbDictionary::Index::OrderedIndex:
617 return verifyOrderedIndex(pNdb, pIndex, parallelism, transactional);
618 break;
619 default:
620 ndbout << "Unknown index type" << endl;
621 break;
622 }
623
624 return NDBT_FAILED;
625 }
626
627 int
verifyUniqueIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)628 UtilTransactions::verifyUniqueIndex(Ndb* pNdb,
629 const NdbDictionary::Index * pIndex,
630 int parallelism,
631 bool transactional){
632
633 /**
634 * Scan all rows in TABLE and for each found row make one read in
635 * TABLE and one using INDEX_TABLE. Then compare the two returned
636 * rows. They should be equal!
637 *
638 */
639
640 if (scanAndCompareUniqueIndex(pNdb,
641 pIndex,
642 parallelism,
643 transactional) != NDBT_OK){
644 return NDBT_FAILED;
645 }
646
647
648 return NDBT_OK;
649
650 }
651
652
653 int
scanAndCompareUniqueIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)654 UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
655 const NdbDictionary::Index* pIndex,
656 int parallelism,
657 bool transactional){
658
659 int retryAttempt = 0;
660 const int retryMax = 100;
661 int check;
662 NdbScanOperation *pOp;
663 NDBT_ResultRow row(tab);
664
665 parallelism = 1;
666
667 while (true){
668 restart:
669 if (retryAttempt >= retryMax){
670 g_err << "ERROR: has retried this operation " << retryAttempt
671 << " times, failing!, line: " << __LINE__ << endl;
672 return NDBT_FAILED;
673 }
674
675 pTrans = pNdb->startTransaction();
676 if (pTrans == NULL) {
677 const NdbError err = pNdb->getNdbError();
678
679 if (err.status == NdbError::TemporaryError){
680 NDB_ERR(err);
681 NdbSleep_MilliSleep(50);
682 retryAttempt++;
683 continue;
684 }
685 NDB_ERR(err);
686 return NDBT_FAILED;
687 }
688
689 pOp = pTrans->getNdbScanOperation(tab.getName());
690 if (pOp == NULL) {
691 const NdbError err = pNdb->getNdbError();
692 closeTransaction(pNdb);
693 NDB_ERR(err);
694
695 if (err.status == NdbError::TemporaryError){
696 NdbSleep_MilliSleep(50);
697 retryAttempt++;
698 continue;
699 }
700 return NDBT_FAILED;
701 }
702
703 int rs;
704 if(transactional){
705 rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
706 } else {
707 rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallelism);
708 }
709
710 if( rs != 0 ) {
711 NDB_ERR(pTrans->getNdbError());
712 closeTransaction(pNdb);
713 return NDBT_FAILED;
714 }
715
716 // Read all attributes
717 for (int a = 0; a < tab.getNoOfColumns(); a++){
718 if ((row.attributeStore(a) =
719 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
720 NDB_ERR(pTrans->getNdbError());
721 closeTransaction(pNdb);
722 return NDBT_FAILED;
723 }
724 }
725
726 check = pTrans->execute(NoCommit, AbortOnError);
727 if( check == -1 ) {
728 const NdbError err = pTrans->getNdbError();
729
730 if (err.status == NdbError::TemporaryError){
731 NDB_ERR(err);
732 closeTransaction(pNdb);
733 NdbSleep_MilliSleep(50);
734 retryAttempt++;
735 continue;
736 }
737 NDB_ERR(err);
738 closeTransaction(pNdb);
739 return NDBT_FAILED;
740 }
741
742 int eof;
743 int rows = 0;
744
745
746 while((eof = pOp->nextResult()) == 0){
747 rows++;
748
749 // ndbout << row.c_str().c_str() << endl;
750
751 if (readRowFromTableAndIndex(pNdb,
752 pTrans,
753 pIndex,
754 row) != NDBT_OK){
755
756 while((eof= pOp->nextResult(false)) == 0);
757 if(eof == 2)
758 eof = pOp->nextResult(true); // this should give -1
759 if(eof == -1)
760 {
761 const NdbError err = pTrans->getNdbError();
762
763 if (err.status == NdbError::TemporaryError){
764 NDB_ERR(err);
765 closeTransaction(pNdb);
766 NdbSleep_MilliSleep(50);
767 retryAttempt++;
768 goto restart;
769 }
770 }
771 closeTransaction(pNdb);
772 g_err << "Line: " << __LINE__ << " next result failed" << endl;
773 return NDBT_FAILED;
774 }
775 }
776 if (eof == -1) {
777 const NdbError err = pTrans->getNdbError();
778
779 if (err.status == NdbError::TemporaryError){
780 NDB_ERR(err);
781 closeTransaction(pNdb);
782 NdbSleep_MilliSleep(50);
783 retryAttempt++;
784 continue;
785 }
786 NDB_ERR(err);
787 closeTransaction(pNdb);
788 return NDBT_FAILED;
789 }
790
791 closeTransaction(pNdb);
792
793 return NDBT_OK;
794 }
795 abort(); /* Should never happen */
796 return NDBT_FAILED;
797 }
798 int
readRowFromTableAndIndex(Ndb * pNdb,NdbConnection * scanTrans,const NdbDictionary::Index * pIndex,NDBT_ResultRow & row)799 UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
800 NdbConnection* scanTrans,
801 const NdbDictionary::Index* pIndex,
802 NDBT_ResultRow& row ){
803
804
805 NdbDictionary::Index::Type indexType= pIndex->getType();
806 int retryAttempt = 0;
807 const int retryMax = 100;
808 int check, a;
809 NdbConnection *pTrans1=NULL;
810 NdbOperation *pOp;
811
812 int return_code= NDBT_FAILED;
813
814 // Allocate place to store the result
815 NDBT_ResultRow tabRow(tab);
816 NDBT_ResultRow indexRow(tab);
817 const char * indexName = pIndex->getName();
818
819 while (true){
820 if(retryAttempt)
821 ndbout_c("retryAttempt %d", retryAttempt);
822 if (retryAttempt >= retryMax){
823 g_err << "ERROR: has retried this operation " << retryAttempt
824 << " times, failing!, line: " << __LINE__ << endl;
825 goto close_all;
826 }
827
828 pTrans1 = pNdb->hupp(scanTrans); //startTransaction();
829 if (pTrans1 == NULL) {
830 const NdbError err = pNdb->getNdbError();
831
832 if (err.code == 4006)
833 {
834 g_err << "Line: " << __LINE__ << " err: 4006" << endl;
835 goto close_all;
836 }
837
838 if (err.status == NdbError::TemporaryError){
839 NDB_ERR(err);
840 NdbSleep_MilliSleep(50);
841 retryAttempt++;
842 continue;
843 }
844
845 if(err.code == 0){
846 return_code = NDBT_OK;
847 goto close_all;
848 }
849 NDB_ERR(err);
850 goto close_all;
851 }
852
853 /**
854 * Read the record from TABLE
855 */
856 pOp = pTrans1->getNdbOperation(tab.getName());
857 if (pOp == NULL) {
858 NDB_ERR(pTrans1->getNdbError());
859 goto close_all;
860 }
861
862 check = pOp->readTuple();
863 if( check == -1 ) {
864 NDB_ERR(pTrans1->getNdbError());
865 goto close_all;
866 }
867
868 // Define primary keys
869 #if VERBOSE
870 printf("PK: ");
871 #endif
872 for(a = 0; a<tab.getNoOfColumns(); a++){
873 const NdbDictionary::Column* attr = tab.getColumn(a);
874 if (attr->getPrimaryKey() == true){
875 if (pOp->equal(attr->getName(), row.attributeStore(a)->aRef()) != 0){
876 NDB_ERR(pTrans1->getNdbError());
877 goto close_all;
878 }
879 #if VERBOSE
880 printf("%s = %d: ", attr->getName(), row.attributeStore(a)->aRef());
881 #endif
882 }
883 }
884 #if VERBOSE
885 printf("\n");
886 #endif
887 // Read all attributes
888 #if VERBOSE
889 printf("Reading %u attributes: ", tab.getNoOfColumns());
890 #endif
891 for(a = 0; a<tab.getNoOfColumns(); a++){
892 if((tabRow.attributeStore(a) =
893 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
894 NDB_ERR(pTrans1->getNdbError());
895 goto close_all;
896 }
897 #if VERBOSE
898 printf("%s ", tab.getColumn(a)->getName());
899 #endif
900 }
901 #if VERBOSE
902 printf("\n");
903 #endif
904
905 /**
906 * Read the record from INDEX_TABLE
907 */
908 NdbIndexOperation* pIndexOp= NULL;
909 NdbIndexScanOperation *pScanOp= NULL;
910 NdbOperation *pIOp= 0;
911
912 bool null_found= false;
913 for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
914 const NdbDictionary::Column * col = pIndex->getColumn(a);
915
916 if (row.attributeStore(col->getName())->isNULL())
917 {
918 null_found= true;
919 break;
920 }
921 }
922
923 const char * tabName= tab.getName();
924 if(!null_found)
925 {
926 if (indexType == NdbDictionary::Index::UniqueHashIndex) {
927 pIOp= pIndexOp= pTrans1->getNdbIndexOperation(indexName, tabName);
928 } else {
929 pIOp= pScanOp= pTrans1->getNdbIndexScanOperation(indexName, tabName);
930 }
931
932 if (pIOp == NULL) {
933 NDB_ERR(pTrans1->getNdbError());
934 goto close_all;
935 }
936
937 {
938 bool not_ok;
939 if (pIndexOp) {
940 not_ok = pIndexOp->readTuple() == -1;
941 } else {
942 not_ok = pScanOp->readTuples();
943 }
944
945 if( not_ok ) {
946 NDB_ERR(pTrans1->getNdbError());
947 goto close_all;
948 }
949 }
950
951 // Define primary keys for index
952 #if VERBOSE
953 printf("SI: ");
954 #endif
955 for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
956 const NdbDictionary::Column * col = pIndex->getColumn(a);
957
958 if ( !row.attributeStore(col->getName())->isNULL() ) {
959 if(pIOp->equal(col->getName(),
960 row.attributeStore(col->getName())->aRef()) != 0){
961 NDB_ERR(pTrans1->getNdbError());
962 goto close_all;
963 }
964 }
965 #if VERBOSE
966 printf("%s = %d: ", col->getName(), row.attributeStore(a)->aRef());
967 #endif
968 }
969 #if VERBOSE
970 printf("\n");
971 #endif
972
973 // Read all attributes
974 #if VERBOSE
975 printf("Reading %u attributes: ", tab.getNoOfColumns());
976 #endif
977 for(a = 0; a<tab.getNoOfColumns(); a++){
978 void* pCheck;
979
980 pCheck= indexRow.attributeStore(a)=
981 pIOp->getValue(tab.getColumn(a)->getName());
982
983 if(pCheck == NULL) {
984 NDB_ERR(pTrans1->getNdbError());
985 goto close_all;
986 }
987 #if VERBOSE
988 printf("%s ", tab.getColumn(a)->getName());
989 #endif
990 }
991 }
992 #if VERBOSE
993 printf("\n");
994 #endif
995 scanTrans->refresh();
996 check = pTrans1->execute(Commit, AbortOnError);
997 if( check == -1 ) {
998 const NdbError err = pTrans1->getNdbError();
999
1000 if (err.status == NdbError::TemporaryError){
1001 NDB_ERR(err);
1002 pNdb->closeTransaction(pTrans1);
1003 NdbSleep_MilliSleep(50);
1004 retryAttempt++;
1005 continue;
1006 }
1007 ndbout << "Error when comparing records - normal op" << endl;
1008 NDB_ERR(err);
1009 ndbout << "row: " << row.c_str().c_str() << endl;
1010 goto close_all;
1011 }
1012
1013 /**
1014 * Compare the two rows
1015 */
1016 if(!null_found){
1017 if (pScanOp) {
1018 if (pScanOp->nextResult() != 0){
1019 const NdbError err = pTrans1->getNdbError();
1020 NDB_ERR(err);
1021 ndbout << "Error when comparing records - index op next_result missing" << endl;
1022 ndbout << "row: " << row.c_str().c_str() << endl;
1023 goto close_all;
1024 }
1025 }
1026 if (!(tabRow.c_str() == indexRow.c_str())){
1027 ndbout << "Error when comapring records" << endl;
1028 ndbout << " tabRow: \n" << tabRow.c_str().c_str() << endl;
1029 ndbout << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1030 goto close_all;
1031 }
1032 if (pScanOp) {
1033 if (pScanOp->nextResult() == 0){
1034 ndbout << "Error when comparing records - index op next_result to many" << endl;
1035 ndbout << "row: " << row.c_str().c_str() << endl;
1036 goto close_all;
1037 }
1038 }
1039 }
1040 return_code= NDBT_OK;
1041 goto close_all;
1042 }
1043
1044 close_all:
1045 if (pTrans1)
1046 pNdb->closeTransaction(pTrans1);
1047
1048 return return_code;
1049 }
1050
1051 int
verifyOrderedIndex(Ndb * pNdb,const NdbDictionary::Index * pIndex,int parallelism,bool transactional)1052 UtilTransactions::verifyOrderedIndex(Ndb* pNdb,
1053 const NdbDictionary::Index* pIndex,
1054 int parallelism,
1055 bool transactional){
1056
1057 int retryAttempt = 0;
1058 const int retryMax = 100;
1059 int check;
1060 NdbScanOperation *pOp = NULL;
1061 NdbIndexScanOperation *iop = NULL;
1062
1063 NDBT_ResultRow scanRow(tab);
1064 NDBT_ResultRow pkRow(tab);
1065 NDBT_ResultRow indexRow(tab);
1066 const char * indexName = pIndex->getName();
1067
1068 int res;
1069 parallelism = 1;
1070
1071 while (true){
1072
1073 if (retryAttempt >= retryMax){
1074 g_err << "ERROR: has retried this operation " << retryAttempt
1075 << " times, failing!, line: " << __LINE__ << endl;
1076 return NDBT_FAILED;
1077 }
1078
1079 pTrans = pNdb->startTransaction();
1080 if (pTrans == NULL) {
1081 const NdbError err = pNdb->getNdbError();
1082
1083 if (err.status == NdbError::TemporaryError){
1084 NDB_ERR(err);
1085 NdbSleep_MilliSleep(50);
1086 retryAttempt++;
1087 continue;
1088 }
1089 NDB_ERR(err);
1090 return NDBT_FAILED;
1091 }
1092
1093 pOp = pTrans->getNdbScanOperation(tab.getName());
1094 if (pOp == NULL) {
1095 NDB_ERR(pTrans->getNdbError());
1096 closeTransaction(pNdb);
1097 return NDBT_FAILED;
1098 }
1099
1100 if( pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism) ) {
1101 NDB_ERR(pTrans->getNdbError());
1102 closeTransaction(pNdb);
1103 return NDBT_FAILED;
1104 }
1105
1106 if(get_values(pOp, scanRow))
1107 {
1108 abort();
1109 }
1110
1111 check = pTrans->execute(NoCommit, AbortOnError);
1112 if( check == -1 ) {
1113 const NdbError err = pTrans->getNdbError();
1114
1115 if (err.status == NdbError::TemporaryError){
1116 NDB_ERR(err);
1117 closeTransaction(pNdb);
1118 NdbSleep_MilliSleep(50);
1119 retryAttempt++;
1120 continue;
1121 }
1122 NDB_ERR(err);
1123 closeTransaction(pNdb);
1124 return NDBT_FAILED;
1125 }
1126
1127 int eof;
1128 int rows = 0;
1129 while(check == 0 && (eof = pOp->nextResult()) == 0){
1130 rows++;
1131
1132 bool null_found= false;
1133 for(int a = 0; a<(int)pIndex->getNoOfColumns(); a++){
1134 const NdbDictionary::Column * col = pIndex->getColumn(a);
1135 if (scanRow.attributeStore(col->getName())->isNULL())
1136 {
1137 null_found= true;
1138 break;
1139 }
1140 }
1141
1142 // Do pk lookup
1143 NdbOperation * pk = pTrans->getNdbOperation(tab.getName());
1144 if(!pk || pk->readTuple())
1145 goto error;
1146 if(equal(&tab, pk, scanRow) || get_values(pk, pkRow))
1147 goto error;
1148
1149 if(!null_found)
1150 {
1151 if((iop= pTrans->getNdbIndexScanOperation(indexName,
1152 tab.getName())) != NULL)
1153 {
1154 if(iop->readTuples(NdbScanOperation::LM_CommittedRead,
1155 parallelism))
1156 goto error;
1157 if(get_values(iop, indexRow))
1158 goto error;
1159 if(equal(pIndex, iop, scanRow))
1160 goto error;
1161 }
1162 else
1163 {
1164 goto error;
1165 }
1166 }
1167
1168 check = pTrans->execute(NoCommit, AbortOnError);
1169 if(check)
1170 goto error;
1171
1172 if(scanRow.c_str() != pkRow.c_str()){
1173 g_err << "Error when comapring records" << endl;
1174 g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
1175 g_err << " pkRow: \n" << pkRow.c_str().c_str() << endl;
1176 closeTransaction(pNdb);
1177 return NDBT_FAILED;
1178 }
1179
1180 if(!null_found)
1181 {
1182 if((res= iop->nextResult()) != 0){
1183 g_err << "Failed to find row using index: " << res << endl;
1184 NDB_ERR(pTrans->getNdbError());
1185 closeTransaction(pNdb);
1186 return NDBT_FAILED;
1187 }
1188
1189 if(scanRow.c_str() != indexRow.c_str()){
1190 g_err << "Error when comapring records" << endl;
1191 g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
1192 g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1193 closeTransaction(pNdb);
1194 return NDBT_FAILED;
1195 }
1196
1197 if(iop->nextResult() == 0){
1198 g_err << "Found extra row!!" << endl;
1199 g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1200 closeTransaction(pNdb);
1201 return NDBT_FAILED;
1202 }
1203 iop->close(false,true); // Close, and release 'iop'
1204 iop = NULL;
1205 }
1206 } // while 'pOp->nextResult()'
1207
1208 pOp->close();
1209 pOp = NULL;
1210 if (eof == -1 || check == -1) {
1211 error:
1212 const NdbError err = pTrans->getNdbError();
1213
1214 if (err.status == NdbError::TemporaryError){
1215 NDB_ERR(err);
1216 iop = 0;
1217 closeTransaction(pNdb);
1218 NdbSleep_MilliSleep(50);
1219 retryAttempt++;
1220 rows--;
1221 continue;
1222 }
1223 NDB_ERR(err);
1224 closeTransaction(pNdb);
1225 return NDBT_FAILED;
1226 }
1227
1228 closeTransaction(pNdb);
1229
1230 return NDBT_OK;
1231 }
1232 abort(); /* Should never happen */
1233 return NDBT_FAILED;
1234 }
1235
1236 int
get_values(NdbOperation * op,NDBT_ResultRow & dst)1237 UtilTransactions::get_values(NdbOperation* op, NDBT_ResultRow& dst)
1238 {
1239 for (int a = 0; a < tab.getNoOfColumns(); a++){
1240 NdbRecAttr*& ref= dst.attributeStore(a);
1241 if ((ref= op->getValue(a)) == 0)
1242 {
1243 g_err << "Line: " << __LINE__ << " getValue failed" << endl;
1244 return NDBT_FAILED;
1245 }
1246 }
1247 return 0;
1248 }
1249
1250 int
equal(const NdbDictionary::Index * pIndex,NdbOperation * op,const NDBT_ResultRow & src)1251 UtilTransactions::equal(const NdbDictionary::Index* pIndex,
1252 NdbOperation* op, const NDBT_ResultRow& src)
1253 {
1254 for(Uint32 a = 0; a<pIndex->getNoOfColumns(); a++){
1255 const NdbDictionary::Column * col = pIndex->getColumn(a);
1256 if(op->equal(col->getName(),
1257 src.attributeStore(col->getName())->aRef()) != 0){
1258 g_err << "Line: " << __LINE__ << " equal failed" << endl;
1259 return NDBT_FAILED;
1260 }
1261 }
1262 return 0;
1263 }
1264
1265 int
equal(const NdbDictionary::Table * pTable,NdbOperation * op,const NDBT_ResultRow & src)1266 UtilTransactions::equal(const NdbDictionary::Table* pTable,
1267 NdbOperation* op, const NDBT_ResultRow& src)
1268 {
1269 for(Uint32 a = 0; (int)a<tab.getNoOfColumns(); a++){
1270 const NdbDictionary::Column* attr = tab.getColumn(a);
1271 if (attr->getPrimaryKey() == true){
1272 if (op->equal(attr->getName(), src.attributeStore(a)->aRef()) != 0){
1273 g_err << "Line: " << __LINE__ << " equal failed" << endl;
1274 return NDBT_FAILED;
1275 }
1276 }
1277 }
1278 return 0;
1279 }
1280
1281 NdbScanOperation*
getScanOperation(NdbConnection * pTrans)1282 UtilTransactions::getScanOperation(NdbConnection* pTrans)
1283 {
1284 return (NdbScanOperation*)
1285 getOperation(pTrans, NdbOperation::OpenScanRequest);
1286 }
1287
1288 NdbOperation*
getOperation(NdbConnection * pTrans,NdbOperation::OperationType type)1289 UtilTransactions::getOperation(NdbConnection* pTrans,
1290 NdbOperation::OperationType type)
1291 {
1292 switch(type){
1293 case NdbOperation::ReadRequest:
1294 case NdbOperation::ReadExclusive:
1295 if(idx)
1296 {
1297 switch(idx->getType()){
1298 case NdbDictionary::Index::UniqueHashIndex:
1299 return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
1300 case NdbDictionary::Index::OrderedIndex:
1301 return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1302 default:
1303 abort();
1304 }
1305 }
1306 case NdbOperation::InsertRequest:
1307 case NdbOperation::WriteRequest:
1308 return pTrans->getNdbOperation(tab.getName());
1309 case NdbOperation::UpdateRequest:
1310 case NdbOperation::DeleteRequest:
1311 if(idx)
1312 {
1313 switch(idx->getType()){
1314 case NdbDictionary::Index::UniqueHashIndex:
1315 return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
1316 default:
1317 break;
1318 }
1319 }
1320 return pTrans->getNdbOperation(tab.getName());
1321 case NdbOperation::OpenScanRequest:
1322 if(idx)
1323 {
1324 switch(idx->getType()){
1325 case NdbDictionary::Index::OrderedIndex:
1326 return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1327 default:
1328 break;
1329 }
1330 }
1331 return pTrans->getNdbScanOperation(tab.getName());
1332 case NdbOperation::OpenRangeScanRequest:
1333 if(idx)
1334 {
1335 switch(idx->getType()){
1336 case NdbDictionary::Index::OrderedIndex:
1337 return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1338 default:
1339 break;
1340 }
1341 }
1342 return 0;
1343 default:
1344 abort();
1345 }
1346 return 0;
1347 }
1348
1349 #include <HugoOperations.hpp>
1350
1351 int
closeTransaction(Ndb * pNdb)1352 UtilTransactions::closeTransaction(Ndb* pNdb)
1353 {
1354 if (pTrans != NULL){
1355 pNdb->closeTransaction(pTrans);
1356 pTrans = NULL;
1357 }
1358 return 0;
1359 }
1360
1361 int
compare(Ndb * pNdb,const char * tab_name2,int flags)1362 UtilTransactions::compare(Ndb* pNdb, const char* tab_name2, int flags){
1363
1364
1365 NdbError err;
1366 int return_code= 0, row_count= 0;
1367 int retryAttempt = 0, retryMax = 10;
1368
1369 HugoCalculator calc(tab);
1370 NDBT_ResultRow row(tab);
1371 const NdbDictionary::Table* tmp= pNdb->getDictionary()->getTable(tab_name2);
1372 if(tmp == 0)
1373 {
1374 g_err << "Unable to lookup table: " << tab_name2
1375 << endl << pNdb->getDictionary()->getNdbError() << endl;
1376 return -1;
1377 }
1378 const NdbDictionary::Table& tab2= *tmp;
1379
1380 HugoOperations cmp(tab2);
1381 UtilTransactions count(tab2);
1382
1383 while (true){
1384 loop:
1385 if (retryAttempt++ >= retryMax){
1386 g_err << "ERROR: compare has retried this operation " << retryAttempt
1387 << " times, failing!" << endl;
1388 return -1;
1389 }
1390
1391 NdbScanOperation *pOp= 0;
1392 pTrans = pNdb->startTransaction();
1393 if (pTrans == NULL) {
1394 err = pNdb->getNdbError();
1395 goto error;
1396 }
1397
1398 pOp= pTrans->getNdbScanOperation(tab.getName());
1399 if (pOp == NULL) {
1400 NDB_ERR(err= pTrans->getNdbError());
1401 goto error;
1402 }
1403
1404 if( pOp->readTuples(NdbScanOperation::LM_Read) ) {
1405 NDB_ERR(err= pTrans->getNdbError());
1406 goto error;
1407 }
1408
1409 // Read all attributes
1410 {
1411 for (int a = 0; a < tab.getNoOfColumns(); a++){
1412 if ((row.attributeStore(a) =
1413 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1414 NDB_ERR(err= pTrans->getNdbError());
1415 goto error;
1416 }
1417 }
1418 }
1419
1420 if( pTrans->execute(NoCommit, AbortOnError) == -1 ) {
1421 NDB_ERR(err= pTrans->getNdbError());
1422 goto error;
1423 }
1424
1425 row_count= 0;
1426 {
1427 int eof;
1428 while((eof = pOp->nextResult(true)) == 0)
1429 {
1430 do {
1431 row_count++;
1432 if(cmp.startTransaction(pNdb) != NDBT_OK)
1433 {
1434 NDB_ERR(err= pNdb->getNdbError());
1435 goto error;
1436 }
1437 int rowNo= calc.getIdValue(&row);
1438 if(cmp.pkReadRecord(pNdb, rowNo, 1) != NDBT_OK)
1439 {
1440 NDB_ERR(err= cmp.getTransaction()->getNdbError());
1441 goto error;
1442 }
1443 if(cmp.execute_Commit(pNdb) != NDBT_OK ||
1444 cmp.getTransaction()->getNdbError().code)
1445 {
1446 NDB_ERR(err= cmp.getTransaction()->getNdbError());
1447 goto error;
1448 }
1449 if(row != cmp.get_row(0))
1450 {
1451 g_err << "COMPARE FAILED" << endl;
1452 g_err << row << endl;
1453 g_err << cmp.get_row(0) << endl;
1454 return_code++;
1455 }
1456 retryAttempt= 0;
1457 cmp.closeTransaction(pNdb);
1458 } while((eof = pOp->nextResult(false)) == 0);
1459 }
1460 if (eof == -1)
1461 {
1462 err = pTrans->getNdbError();
1463 goto error;
1464 }
1465 }
1466
1467 closeTransaction(pNdb);
1468
1469 g_info << row_count << " rows compared" << endl;
1470 {
1471 int row_count2;
1472 if(count.selectCount(pNdb, 0, &row_count2) != NDBT_OK)
1473 {
1474 g_err << "Failed to count rows in tab_name2" << endl;
1475 return -1;
1476 }
1477
1478 g_info << row_count2 << " rows in tab_name2 - failed " << return_code
1479 << endl;
1480 return (row_count == row_count2 ? return_code : 1);
1481 }
1482 error:
1483 if(err.status == NdbError::TemporaryError)
1484 {
1485 g_err << err << endl;
1486 NdbSleep_MilliSleep(50);
1487 closeTransaction(pNdb);
1488 if(cmp.getTransaction())
1489 cmp.closeTransaction(pNdb);
1490
1491 goto loop;
1492 }
1493 g_err << "ERROR" << endl;
1494 g_err << err << endl;
1495
1496 break;
1497 }
1498
1499 closeTransaction(pNdb);
1500
1501 return return_code;
1502 }
1503
1504