1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "HugoTransactions.hpp"
26 #include <NDBT_Stats.hpp>
27 #include <NdbSleep.h>
28 #include <NdbTick.h>
29
HugoTransactions(const NdbDictionary::Table & _tab,const NdbDictionary::Index * idx)30 HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
31 const NdbDictionary::Index* idx):
32 HugoOperations(_tab, idx),
33 row(_tab){
34
35 m_defaultScanUpdateMethod = 3;
36 setRetryMax();
37 m_retryMaxReached = false;
38 m_stats_latency = 0;
39
40 m_thr_count = 0;
41 m_thr_no = -1;
42
43 m_empty_update = false;
44 }
45
~HugoTransactions()46 HugoTransactions::~HugoTransactions(){
47 deallocRows();
48 }
49
50 int
scanReadRecords(Ndb * pNdb,int records,int abortPercent,int parallelism,NdbOperation::LockMode lm,int scan_flags)51 HugoTransactions::scanReadRecords(Ndb* pNdb,
52 int records,
53 int abortPercent,
54 int parallelism,
55 NdbOperation::LockMode lm,
56 int scan_flags)
57 {
58
59 int retryAttempt = 0;
60 int check, a;
61 NdbScanOperation *pOp;
62
63 while (true){
64
65 if (retryAttempt >= m_retryMax){
66 g_err << __LINE__ << " ERROR: has retried this operation "
67 << retryAttempt << " times, failing!, line: " << __LINE__ << endl;
68 return NDBT_FAILED;
69 }
70
71 pTrans = pNdb->startTransaction();
72 if (pTrans == NULL) {
73 const NdbError err = pNdb->getNdbError();
74
75 if (err.status == NdbError::TemporaryError){
76 NDB_ERR(err);
77 NdbSleep_MilliSleep(50);
78 retryAttempt++;
79 continue;
80 }
81 NDB_ERR(err);
82 setNdbError(err);
83 return NDBT_FAILED;
84 }
85
86 pOp = getScanOperation(pTrans);
87 if (pOp == NULL) {
88 NDB_ERR(pTrans->getNdbError());
89 setNdbError(pTrans->getNdbError());
90 closeTransaction(pNdb);
91 return NDBT_FAILED;
92 }
93
94 if( pOp ->readTuples(lm, scan_flags, parallelism) ) {
95 NDB_ERR(pTrans->getNdbError());
96 setNdbError(pTrans->getNdbError());
97 closeTransaction(pNdb);
98 return NDBT_FAILED;
99 }
100
101 for(a = 0; a<tab.getNoOfColumns(); a++){
102 if((row.attributeStore(a) =
103 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
104 NDB_ERR(pTrans->getNdbError());
105 setNdbError(pTrans->getNdbError());
106 closeTransaction(pNdb);
107 return NDBT_FAILED;
108 }
109 }
110
111 check = pTrans->execute(NoCommit, AbortOnError);
112 if( check == -1 ) {
113 const NdbError err = pTrans->getNdbError();
114 if (err.status == NdbError::TemporaryError){
115 NDB_ERR(err);
116 closeTransaction(pNdb);
117 NdbSleep_MilliSleep(50);
118 retryAttempt++;
119 continue;
120 }
121 NDB_ERR(err);
122 setNdbError(err);
123 closeTransaction(pNdb);
124 return NDBT_FAILED;
125 }
126
127 // Abort after 1-100 or 1-records rows
128 int ranVal = rand();
129 int abortCount = ranVal % (records == 0 ? 100 : records);
130 bool abortTrans = false;
131 if (abortPercent > 0){
132 // Abort if abortCount is less then abortPercent
133 if (abortCount < abortPercent)
134 abortTrans = true;
135 }
136
137 int eof;
138 int rows = 0;
139 while((eof = pOp->nextResult(true)) == 0){
140 rows++;
141 if (calc.verifyRowValues(&row) != 0){
142 closeTransaction(pNdb);
143 g_err << "Line: " << __LINE__ << " verify row failed" << endl;
144 return NDBT_FAILED;
145 }
146
147 if (abortCount == rows && abortTrans == true){
148 ndbout << "Scan is aborted" << endl;
149 g_info << "Scan is aborted" << endl;
150 pOp->close();
151 if( check == -1 ) {
152 NDB_ERR(pTrans->getNdbError());
153 setNdbError(pTrans->getNdbError());
154 closeTransaction(pNdb);
155 return NDBT_FAILED;
156 }
157
158 closeTransaction(pNdb);
159 return NDBT_OK;
160 }
161 }
162 if (eof == -1) {
163 const NdbError err = pTrans->getNdbError();
164
165 if (err.status == NdbError::TemporaryError){
166 NDB_ERR_INFO(err);
167 closeTransaction(pNdb);
168 NdbSleep_MilliSleep(50);
169 switch (err.code){
170 case 488:
171 case 245:
172 case 490:
173 // Too many active scans, no limit on number of retry attempts
174 break;
175 default:
176 if (err.classification == NdbError::TimeoutExpired)
177 {
178 if (retryAttempt >= (m_retryMax / 10) &&
179 (parallelism == 0 || parallelism > 1))
180 {
181 /**
182 * decrease parallelism
183 */
184 parallelism = 1;
185 ndbout_c("decrease parallelism");
186 }
187 }
188 retryAttempt++;
189 }
190 continue;
191 }
192 NDB_ERR(err);
193 setNdbError(err);
194 closeTransaction(pNdb);
195 return NDBT_FAILED;
196 }
197
198 closeTransaction(pNdb);
199
200 g_info << rows << " rows have been read" << endl;
201 if (records != 0 && rows != records){
202 g_err << "Check expected number of records failed" << endl
203 << " expected=" << records <<", " << endl
204 << " read=" << rows << endl;
205 return NDBT_FAILED;
206 }
207
208 return NDBT_OK;
209 }
210 abort(); /* Should never happen */
211 return NDBT_FAILED;
212 }
213
214 int
scanReadRecords(Ndb * pNdb,const NdbDictionary::Index * pIdx,int records,int abortPercent,int parallelism,NdbOperation::LockMode lm,int scan_flags,int bound_cnt,const HugoBound * bound_arr)215 HugoTransactions::scanReadRecords(Ndb* pNdb,
216 const NdbDictionary::Index * pIdx,
217 int records,
218 int abortPercent,
219 int parallelism,
220 NdbOperation::LockMode lm,
221 int scan_flags,
222 int bound_cnt, const HugoBound* bound_arr)
223 {
224
225 int retryAttempt = 0;
226 int check, a;
227 NdbScanOperation *pOp;
228 NdbIndexScanOperation *pIxOp;
229
230 while (true){
231
232 if (retryAttempt >= m_retryMax){
233 g_err << __LINE__ << " ERROR: has retried this operation "
234 << retryAttempt << " times, failing!" << endl;
235 g_err << "lm: " << Uint32(lm) << " flags: H'" << hex << scan_flags
236 << endl;
237 return NDBT_FAILED;
238 }
239
240 pTrans = pNdb->startTransaction();
241 if (pTrans == NULL) {
242 const NdbError err = pNdb->getNdbError();
243
244 if (err.status == NdbError::TemporaryError){
245 NDB_ERR(err);
246 NdbSleep_MilliSleep(50);
247 retryAttempt++;
248 continue;
249 }
250 NDB_ERR(err);
251 setNdbError(err);
252 return NDBT_FAILED;
253 }
254
255 if (pIdx != NULL) {
256 pOp = pIxOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName());
257 } else {
258 pOp = pTrans->getNdbScanOperation(tab.getName());
259 pIxOp = NULL;
260 }
261
262 if (pOp == NULL) {
263 NDB_ERR(pTrans->getNdbError());
264 setNdbError(pTrans->getNdbError());
265 closeTransaction(pNdb);
266 return NDBT_FAILED;
267 }
268
269 if( pOp ->readTuples(lm, scan_flags, parallelism) ) {
270 NDB_ERR(pTrans->getNdbError());
271 setNdbError(pTrans->getNdbError());
272 closeTransaction(pNdb);
273 return NDBT_FAILED;
274 }
275
276 for (int i = 0; i < bound_cnt; i++) {
277 const HugoBound& b = bound_arr[i];
278 if (pIxOp->setBound(b.attr, b.type, b.value) != 0) {
279 NDB_ERR(pIxOp->getNdbError());
280 setNdbError(pIxOp->getNdbError());
281 return NDBT_FAILED;
282 }
283 }
284
285 for(a = 0; a<tab.getNoOfColumns(); a++){
286 if((row.attributeStore(a) =
287 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
288 NDB_ERR(pTrans->getNdbError());
289 setNdbError(pTrans->getNdbError());
290 closeTransaction(pNdb);
291 return NDBT_FAILED;
292 }
293 }
294
295 check = pTrans->execute(NoCommit, AbortOnError);
296 if( check == -1 ) {
297 const NdbError err = pTrans->getNdbError();
298 if (err.status == NdbError::TemporaryError){
299 NDB_ERR(err);
300 closeTransaction(pNdb);
301 NdbSleep_MilliSleep(50);
302 retryAttempt++;
303 continue;
304 }
305 NDB_ERR(err);
306 setNdbError(err);
307 closeTransaction(pNdb);
308 return NDBT_FAILED;
309 }
310
311 // Abort after 1-100 or 1-records rows
312 int ranVal = rand();
313 int abortCount = ranVal % (records == 0 ? 100 : records);
314 bool abortTrans = false;
315 if (abortPercent > 0){
316 // Abort if abortCount is less then abortPercent
317 if (abortCount < abortPercent)
318 abortTrans = true;
319 }
320
321 int eof;
322 int rows = 0;
323 while((eof = pOp->nextResult(true)) == 0){
324 rows++;
325 if (calc.verifyRowValues(&row) != 0){
326 closeTransaction(pNdb);
327 g_err << "Line: " << __LINE__ << " verify row failed" << endl;
328 return NDBT_FAILED;
329 }
330
331 if (abortCount == rows && abortTrans == true){
332 ndbout << "Scan is aborted" << endl;
333 g_info << "Scan is aborted" << endl;
334 pOp->close();
335 if( check == -1 ) {
336 NDB_ERR(pTrans->getNdbError());
337 setNdbError(pTrans->getNdbError());
338 closeTransaction(pNdb);
339 return NDBT_FAILED;
340 }
341
342 closeTransaction(pNdb);
343 return NDBT_OK;
344 }
345 }
346 if (eof == -1) {
347 const NdbError err = pTrans->getNdbError();
348
349 if (err.status == NdbError::TemporaryError){
350 NDB_ERR_INFO(err);
351 closeTransaction(pNdb);
352 NdbSleep_MilliSleep(50);
353 switch (err.code){
354 case 488:
355 case 245:
356 case 490:
357 // Too many active scans, no limit on number of retry attempts
358 break;
359 default:
360 if (err.classification == NdbError::TimeoutExpired)
361 {
362 if (retryAttempt >= (m_retryMax / 10) &&
363 (parallelism == 0 || parallelism > 1))
364 {
365 /**
366 * decrease parallelism
367 */
368 parallelism = 1;
369 ndbout_c("decrease parallelism");
370 }
371 else if (retryAttempt >= (m_retryMax / 5) &&
372 (lm != NdbOperation::LM_CommittedRead))
373 {
374 lm = NdbOperation::LM_CommittedRead;
375 ndbout_c("switch to LM_CommittedRead");
376 }
377 else if (retryAttempt >= (m_retryMax / 4) &&
378 (pIdx != 0))
379 {
380 pIdx = NULL;
381 bound_cnt = 0;
382 scan_flags |= NdbScanOperation::SF_TupScan;
383 ndbout_c("switch to table-scan (SF_TupScan) from index-scan");
384 }
385 }
386 retryAttempt++;
387 }
388 continue;
389 }
390 NDB_ERR(err);
391 setNdbError(err);
392 closeTransaction(pNdb);
393 return NDBT_FAILED;
394 }
395
396 closeTransaction(pNdb);
397
398 g_info << rows << " rows have been read"
399 << ", number of index bounds " << bound_cnt << endl;
400 // TODO verify expected number of records with index bounds
401 if (records != 0 && rows != records && bound_cnt == 0){
402 g_err << "Check expected number of records failed" << endl
403 << " expected=" << records <<", " << endl
404 << " read=" << rows << endl;
405 return NDBT_FAILED;
406 }
407
408 return NDBT_OK;
409 }
410 abort(); /* Should never happen */
411 return NDBT_FAILED;
412 }
413
414
415 #define RESTART_SCAN 99
416
417 int
scanUpdateRecords(Ndb * pNdb,NdbScanOperation::ScanFlag flags,int records,int abortPercent,int parallelism)418 HugoTransactions::scanUpdateRecords(Ndb* pNdb,
419 NdbScanOperation::ScanFlag flags,
420 int records,
421 int abortPercent,
422 int parallelism){
423 int retryAttempt = 0;
424 int check, a;
425 NdbScanOperation *pOp;
426 m_retryMaxReached = false;
427
428 while (true){
429 restart:
430 if (retryAttempt++ >= m_retryMax){
431 g_err << "ERROR: has retried this operation " << retryAttempt
432 << " times, failing!, line: " << __LINE__ << endl;
433 m_retryMaxReached = true;
434 return NDBT_FAILED;
435 }
436
437 pTrans = pNdb->startTransaction();
438 if (pTrans == NULL) {
439 const NdbError err = pNdb->getNdbError();
440 NDB_ERR(err);
441 if (err.status == NdbError::TemporaryError){
442 NdbSleep_MilliSleep(50);
443 continue;
444 }
445 setNdbError(err);
446 return NDBT_FAILED;
447 }
448
449 pOp = getScanOperation(pTrans);
450 if (pOp == NULL)
451 {
452 const NdbError err = pTrans->getNdbError();
453 NDB_ERR(err);
454 closeTransaction(pNdb);
455 if (err.status == NdbError::TemporaryError)
456 {
457 NdbSleep_MilliSleep(50);
458 continue;
459 }
460 setNdbError(err);
461 return NDBT_FAILED;
462 }
463
464 if( pOp->readTuples(NdbOperation::LM_Exclusive, flags,
465 parallelism))
466 {
467 NDB_ERR(pOp->getNdbError());
468 setNdbError(pOp->getNdbError());
469 closeTransaction(pNdb);
470 return NDBT_FAILED;
471 }
472
473 // Read all attributes from this table
474 for(a=0; a<tab.getNoOfColumns(); a++){
475 if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
476 NDB_ERR(pTrans->getNdbError());
477 setNdbError(pTrans->getNdbError());
478 closeTransaction(pNdb);
479 return NDBT_FAILED;
480 }
481 }
482
483 check = pTrans->execute(NoCommit, AbortOnError);
484 if( check == -1 ) {
485 const NdbError err = pTrans->getNdbError();
486 NDB_ERR(err);
487 closeTransaction(pNdb);
488 if (err.status == NdbError::TemporaryError){
489 NdbSleep_MilliSleep(50);
490 continue;
491 }
492 setNdbError(err);
493 return NDBT_FAILED;
494 }
495
496 // Abort after 1-100 or 1-records rows
497 int ranVal = rand();
498 int abortCount = ranVal % (records == 0 ? 100 : records);
499 bool abortTrans = false;
500 if (abortPercent > 0){
501 // Abort if abortCount is less then abortPercent
502 if (abortCount < abortPercent)
503 abortTrans = true;
504 }
505
506 int rows = 0;
507 while((check = pOp->nextResult(true)) == 0){
508 do {
509 rows++;
510 NdbOperation* pUp = pOp->updateCurrentTuple();
511 if(pUp == 0){
512 NDB_ERR(pTrans->getNdbError());
513 setNdbError(pTrans->getNdbError());
514 closeTransaction(pNdb);
515 return NDBT_FAILED;
516 }
517 const int updates = calc.getUpdatesValue(&row) + (m_empty_update? 0 : 1);
518 const int r = calc.getIdValue(&row);
519
520 for(a = 0; a<tab.getNoOfColumns(); a++){
521 if (tab.getColumn(a)->getPrimaryKey() == false){
522 if(setValueForAttr(pUp, a, r, updates ) != 0){
523 NDB_ERR(pTrans->getNdbError());
524 setNdbError(pTrans->getNdbError());
525 closeTransaction(pNdb);
526 return NDBT_FAILED;
527 }
528 }
529 }
530
531 if (rows == abortCount && abortTrans == true){
532 g_info << "Scan is aborted" << endl;
533 // This scan should be aborted
534 closeTransaction(pNdb);
535 return NDBT_OK;
536 }
537 } while((check = pOp->nextResult(false)) == 0);
538
539 if(check != -1){
540 check = pTrans->execute(Commit, AbortOnError);
541 if(check != -1)
542 m_latest_gci = pTrans->getGCI();
543 pTrans->restart();
544 }
545
546 const NdbError err = pTrans->getNdbError();
547 if( check == -1 ) {
548 closeTransaction(pNdb);
549 NDB_ERR(err);
550 if (err.status == NdbError::TemporaryError){
551 NdbSleep_MilliSleep(50);
552 goto restart;
553 }
554 setNdbError(err);
555 return NDBT_FAILED;
556 }
557 }
558
559 const NdbError err = pTrans->getNdbError();
560 if( check == -1 ) {
561 closeTransaction(pNdb);
562 NDB_ERR(err);
563 if (err.status == NdbError::TemporaryError){
564 NdbSleep_MilliSleep(50);
565 goto restart;
566 }
567 setNdbError(err);
568 return NDBT_FAILED;
569 }
570
571 closeTransaction(pNdb);
572
573 g_info << rows << " rows have been updated" << endl;
574 return NDBT_OK;
575 }
576 abort(); /* Should never happen */
577 return NDBT_FAILED;
578 }
579
580 int
scanUpdateRecords(Ndb * pNdb,int records,int abortPercent,int parallelism)581 HugoTransactions::scanUpdateRecords(Ndb* pNdb,
582 int records,
583 int abortPercent,
584 int parallelism){
585
586 return scanUpdateRecords(pNdb,
587 (NdbScanOperation::ScanFlag)0,
588 records, abortPercent, parallelism);
589 }
590
591 // Scan all records exclusive and update
592 // them one by one
593 int
scanUpdateRecords1(Ndb * pNdb,int records,int abortPercent,int parallelism)594 HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
595 int records,
596 int abortPercent,
597 int parallelism){
598 return scanUpdateRecords(pNdb,
599 (NdbScanOperation::ScanFlag)0,
600 records, abortPercent, 1);
601 }
602
603 // Scan all records exclusive and update
604 // them batched by asking nextScanResult to
605 // give us all cached records before fetching new
606 // records from db
607 int
scanUpdateRecords2(Ndb * pNdb,int records,int abortPercent,int parallelism)608 HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
609 int records,
610 int abortPercent,
611 int parallelism){
612 return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
613 records, abortPercent, parallelism);
614 }
615
616 int
scanUpdateRecords3(Ndb * pNdb,int records,int abortPercent,int parallelism)617 HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
618 int records,
619 int abortPercent,
620 int parallelism)
621 {
622 return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
623 records, abortPercent, parallelism);
624 }
625
626 int
loadTable(Ndb * pNdb,int records,int batch,bool allowConstraintViolation,int doSleep,bool oneTrans,int value,bool abort,bool abort_on_first_error)627 HugoTransactions::loadTable(Ndb* pNdb,
628 int records,
629 int batch,
630 bool allowConstraintViolation,
631 int doSleep,
632 bool oneTrans,
633 int value,
634 bool abort,
635 bool abort_on_first_error)
636 {
637 return loadTableStartFrom(pNdb, 0, records, batch, allowConstraintViolation,
638 doSleep, oneTrans, value, abort,
639 abort_on_first_error);
640 }
641
642 int
loadTableStartFrom(Ndb * pNdb,int startFrom,int records,int batch,bool allowConstraintViolation,int doSleep,bool oneTrans,int value,bool abort,bool abort_on_first_error)643 HugoTransactions::loadTableStartFrom(Ndb* pNdb,
644 int startFrom,
645 int records,
646 int batch,
647 bool allowConstraintViolation,
648 int doSleep,
649 bool oneTrans,
650 int value,
651 bool abort,
652 bool abort_on_first_error){
653 int check;
654 int retryAttempt = 0;
655 int retryMax = 5;
656 bool first_batch = true;
657
658 const int org = batch;
659 const int cols = tab.getNoOfColumns();
660 const int brow = tab.getRowSizeInBytes();
661 const int bytes = 12 + brow + 4 * cols;
662 batch = (batch * 256); // -> 512 -> 65536k per commit
663 batch = batch/bytes; //
664 batch = batch == 0 ? 1 : batch;
665
666 if(batch != org){
667 g_info << "batch = " << org << " rowsize = " << bytes
668 << " -> rows/commit = " << batch << endl;
669 }
670
671 //Uint32 orgbatch = batch;
672 g_info << "|- Inserting records..." << endl;
673 for (int c=0 ; c<records; ){
674 bool closeTrans = true;
675
676 if(c + batch > records)
677 batch = records - c;
678
679 if (retryAttempt >= retryMax){
680 g_info << "Record " << c << " could not be inserted, has retried "
681 << retryAttempt << " times " << endl;
682 // Reset retry counters and continue with next record
683 retryAttempt = 0;
684 c++;
685 }
686 if (doSleep > 0)
687 NdbSleep_MilliSleep(doSleep);
688
689 // if (first_batch || !oneTrans) {
690 if (first_batch || !pTrans) {
691 first_batch = false;
692 pTrans = pNdb->startTransaction();
693 if (pTrans == NULL) {
694 const NdbError err = pNdb->getNdbError();
695
696 if (err.status == NdbError::TemporaryError){
697 NDB_ERR(err);
698 NdbSleep_MilliSleep(50);
699 retryAttempt++;
700 continue;
701 }
702 NDB_ERR(err);
703 setNdbError(err);
704 return NDBT_FAILED;
705 }
706 }
707
708 if(pkInsertRecord(pNdb, c + startFrom, batch, value) != NDBT_OK)
709 {
710 NDB_ERR(pTrans->getNdbError());
711 setNdbError(pTrans->getNdbError());
712 closeTransaction(pNdb);
713 return NDBT_FAILED;
714 }
715
716 // Execute the transaction and insert the record
717 if (!oneTrans || (c + batch) >= records) {
718 // closeTrans = true;
719 closeTrans = false;
720 if (!abort)
721 {
722 check = pTrans->execute(Commit, AbortOnError);
723 if(check != -1)
724 pTrans->getGCI(&m_latest_gci);
725 pTrans->restart();
726 }
727 else
728 {
729 check = pTrans->execute(NoCommit, AbortOnError);
730 if (check != -1)
731 {
732 check = pTrans->execute( Rollback );
733 closeTransaction(pNdb);
734 }
735 }
736 } else {
737 closeTrans = false;
738 check = pTrans->execute(NoCommit, AbortOnError);
739 }
740 if(check == -1 ) {
741 const NdbError err = pTrans->getNdbError();
742 closeTransaction(pNdb);
743 pTrans= 0;
744 switch(err.status){
745 case NdbError::Success:
746 NDB_ERR(err);
747 g_info << "ERROR: NdbError reports success when transcaction failed"
748 << endl;
749 setNdbError(err);
750 return NDBT_FAILED;
751 break;
752
753 case NdbError::TemporaryError:
754 if (abort_on_first_error)
755 {
756 return err.code;
757 }
758 NDB_ERR(err);
759 NdbSleep_MilliSleep(50);
760 retryAttempt++;
761 batch = 1;
762 continue;
763 break;
764
765 case NdbError::UnknownResult:
766 NDB_ERR(err);
767 setNdbError(err);
768 return NDBT_FAILED;
769 break;
770
771 case NdbError::PermanentError:
772 if (allowConstraintViolation == true){
773 switch (err.classification){
774 case NdbError::ConstraintViolation:
775 // Tuple already existed, OK but should be reported
776 g_info << c << ": " << err.code << " " << err.message << endl;
777 c++;
778 continue;
779 break;
780 default:
781 break;
782 }
783 }
784 NDB_ERR(err);
785 setNdbError(err);
786 return err.code;
787 break;
788 }
789 }
790 else{
791 if (closeTrans) {
792 closeTransaction(pNdb);
793 pTrans= 0;
794 }
795 }
796
797 // Step to next record
798 c = c+batch;
799 retryAttempt = 0;
800 }
801
802 if(pTrans)
803 closeTransaction(pNdb);
804 return NDBT_OK;
805 }
806
807 int
fillTable(Ndb * pNdb,int batch)808 HugoTransactions::fillTable(Ndb* pNdb,
809 int batch){
810 return fillTableStartFrom(pNdb, 0, batch);
811 }
812
813 int
fillTableStartFrom(Ndb * pNdb,int startFrom,int batch)814 HugoTransactions::fillTableStartFrom(Ndb* pNdb,
815 int startFrom,
816 int batch){
817 int check;
818 int retryFull = 0;
819 int retryAttempt = 0;
820 int retryMax = 5;
821
822 const int org = batch;
823 const int cols = tab.getNoOfColumns();
824 const int brow = tab.getRowSizeInBytes();
825 const int bytes = 12 + brow + 4 * cols;
826 batch = (batch * 256); // -> 512 -> 65536k per commit
827 batch = batch/bytes; //
828 batch = batch == 0 ? 1 : batch;
829
830 if(batch != org){
831 g_info << "batch = " << org << " rowsize = " << bytes
832 << " -> rows/commit = " << batch << endl;
833 }
834
835 for (int c=startFrom ; ; ){
836
837 if (retryAttempt >= retryMax){
838 g_info << "Record " << c << " could not be inserted, has retried "
839 << retryAttempt << " times " << endl;
840 // Reset retry counters and continue with next record
841 retryAttempt = 0;
842 c++;
843 }
844
845 pTrans = pNdb->startTransaction();
846 if (pTrans == NULL) {
847 const NdbError err = pNdb->getNdbError();
848
849 if (err.status == NdbError::TemporaryError){
850 NDB_ERR(err);
851 NdbSleep_MilliSleep(50);
852 retryAttempt++;
853 continue;
854 }
855 NDB_ERR(err);
856 setNdbError(err);
857 return NDBT_FAILED;
858 }
859
860 if(pkInsertRecord(pNdb, c, batch) != NDBT_OK)
861 {
862 NDB_ERR(pTrans->getNdbError());
863 setNdbError(pTrans->getNdbError());
864 closeTransaction(pNdb);
865 return NDBT_FAILED;
866 }
867
868 // Execute the transaction and insert the record
869 check = pTrans->execute(Commit, CommitAsMuchAsPossible);
870 const NdbError err = pTrans->getNdbError();
871 if(check == -1 || err.code != 0) {
872 closeTransaction(pNdb);
873
874 switch(err.status){
875 case NdbError::Success:
876 NDB_ERR(err);
877 setNdbError(err);
878 g_info << "ERROR: NdbError reports success when transcaction failed"
879 << endl;
880 return NDBT_FAILED;
881 break;
882
883 case NdbError::TemporaryError:
884 NDB_ERR(err);
885 NdbSleep_MilliSleep(50);
886 retryAttempt++;
887 continue;
888 break;
889
890 case NdbError::UnknownResult:
891 NDB_ERR(err);
892 setNdbError(err);
893 return NDBT_FAILED;
894 break;
895
896 case NdbError::PermanentError:
897 // if (allowConstraintViolation == true){
898 // switch (err.classification){
899 // case NdbError::ConstraintViolation:
900 // // Tuple already existed, OK but should be reported
901 // g_info << c << ": " << err.code << " " << err.message << endl;
902 // c++;
903 // continue;
904 // break;
905 // default:
906 // break;es
907 // }
908 // }
909
910 // Check if this is the "db full" error
911 if (err.classification==NdbError::InsufficientSpace){
912 // Datamemory might have been released by abort of
913 // batch insert. Retry fill with a smaller batch
914 // in order to ensure table is filled to last row.
915 if (batch > 1){
916 c = c+batch;
917 batch = batch/2;
918 continue;
919 }
920 // Only some datanodes might be full. Retry with
921 // another record until we are *really sure* that
922 // all datanodes are full.
923 if (retryFull < 64) {
924 retryFull++;
925 c++;
926 continue;
927 }
928
929 NDB_ERR(err);
930 return NDBT_OK;
931 }
932
933 if (err.classification == NdbError::ConstraintViolation){
934 NDB_ERR(err);
935 break;
936 }
937 NDB_ERR(err);
938 setNdbError(err);
939 return NDBT_FAILED;
940 break;
941 }
942 }
943 else{
944 pTrans->getGCI(&m_latest_gci);
945 closeTransaction(pNdb);
946 }
947
948 // Step to next record
949 c = c+batch;
950 retryAttempt = 0;
951 retryFull = 0;
952 }
953 return NDBT_OK;
954 }
955
956 int
pkReadRecords(Ndb * pNdb,int records,int batch,NdbOperation::LockMode lm,int _rand)957 HugoTransactions::pkReadRecords(Ndb* pNdb,
958 int records,
959 int batch,
960 NdbOperation::LockMode lm,
961 int _rand){
962 int reads = 0;
963 int r = 0;
964 int retryAttempt = 0;
965 int check;
966
967 if (batch == 0) {
968 g_err << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed.";
969 g_err << "Line: " << __LINE__ << endl;
970 return NDBT_FAILED;
971 }
972
973 while (r < records){
974 if(r + batch > records)
975 batch = records - r;
976
977 if (retryAttempt >= m_retryMax){
978 g_err << "ERROR: has retried this operation " << retryAttempt
979 << " times, failing!, line: " << __LINE__ << endl;
980 return NDBT_FAILED;
981 }
982
983 pTrans = pNdb->startTransaction();
984 if (pTrans == NULL) {
985 const NdbError err = pNdb->getNdbError();
986
987 if (err.status == NdbError::TemporaryError){
988 NDB_ERR(err);
989 NdbSleep_MilliSleep(500);
990 retryAttempt++;
991 continue;
992 }
993 NDB_ERR(err);
994 setNdbError(err);
995 return NDBT_FAILED;
996 }
997 retryAttempt = 0;
998
999 NDB_TICKS timer_start;
1000 NDB_TICKS timer_stop;
1001 bool timer_active =
1002 m_stats_latency != 0 &&
1003 r >= batch && // first batch is "warmup"
1004 r + batch != records; // last batch is usually partial
1005
1006 if (timer_active)
1007 timer_start = NdbTick_getCurrentTicks();
1008
1009 NdbOperation::LockMode lmused;
1010 if (_rand == 0)
1011 {
1012 if(pkReadRecord(pNdb, r, batch, lm, &lmused) != NDBT_OK)
1013 {
1014 NDB_ERR(pTrans->getNdbError());
1015 setNdbError(pTrans->getNdbError());
1016 closeTransaction(pNdb);
1017 return NDBT_FAILED;
1018 }
1019 }
1020 else
1021 {
1022 if(pkReadRandRecord(pNdb, records, batch, lm, &lmused) != NDBT_OK)
1023 {
1024 NDB_ERR(pTrans->getNdbError());
1025 setNdbError(pTrans->getNdbError());
1026 closeTransaction(pNdb);
1027 return NDBT_FAILED;
1028 }
1029 }
1030
1031 check = pTrans->execute(Commit, AbortOnError);
1032
1033 if (check != -1 && lmused == NdbOperation::LM_CommittedRead)
1034 {
1035 /**
1036 * LM_CommittedRead will not abort transaction
1037 * even if doing execute(AbortOnError);
1038 * so also check pTrans->getNdbError() in this case
1039 */
1040 if (pTrans->getNdbError().status != NdbError::Success)
1041 {
1042 check = -1;
1043 }
1044 }
1045
1046 if( check == -1 ) {
1047 const NdbError err = pTrans->getNdbError();
1048
1049 if (err.status == NdbError::TemporaryError){
1050 NDB_ERR(err);
1051 closeTransaction(pNdb);
1052 NdbSleep_MilliSleep(50);
1053 retryAttempt++;
1054 continue;
1055 }
1056 switch(err.code){
1057 case 626: // Tuple did not exist
1058 g_info << r << ": " << err.code << " " << err.message << endl;
1059 r++;
1060 break;
1061
1062 default:
1063 NDB_ERR(err);
1064 setNdbError(err);
1065 closeTransaction(pNdb);
1066 return NDBT_FAILED;
1067 }
1068 } else {
1069
1070 /**
1071 * Extra debug aid:
1072 * We do not (yet) expect any transaction or operation
1073 * errors if ::execute() does not return with error.
1074 */
1075 const NdbError err1 = pTrans->getNdbError();
1076 if (err1.code)
1077 {
1078 ndbout << "BEWARE: HugoTransactions::pkReadRecords"
1079 << ", execute succeeded with Trans error: " << err1.code
1080 << endl;
1081
1082 }
1083 const NdbOperation* pOp = pTrans->getNdbErrorOperation();
1084 if (pOp != NULL)
1085 {
1086 const NdbError err2 = pOp->getNdbError();
1087 ndbout << "BEWARE HugoTransactions::pkReadRecords"
1088 << ", NdbOperation error: " << err2.code
1089 << endl;
1090 }
1091
1092 retryAttempt = 0;
1093 if(indexScans.size() > 0)
1094 {
1095 /* Index scan used to read records....*/
1096 int rows_found = 0;
1097 for (Uint32 scanOp=0; scanOp < indexScans.size(); scanOp++)
1098 {
1099 while((check = indexScans[scanOp]->nextResult()) == 0)
1100 {
1101 rows_found++;
1102 if (calc.verifyRowValues(rows[0]) != 0){
1103 closeTransaction(pNdb);
1104 g_err << "Line: " << __LINE__ << " verify row failed" << endl;
1105 return NDBT_FAILED;
1106 }
1107 }
1108 }
1109 if(check != 1 || rows_found > batch)
1110 {
1111 closeTransaction(pNdb);
1112 g_err << "Line: " << __LINE__ << " check rows failed" << endl;
1113 return NDBT_FAILED;
1114 }
1115 else if(rows_found < batch)
1116 {
1117 if(batch == 1){
1118 g_info << r << ": not found" << endl; abort(); }
1119 else
1120 g_info << "Found " << rows_found << " of "
1121 << batch << " rows" << endl;
1122 }
1123 r += batch;
1124 reads += rows_found;
1125 }
1126 else
1127 {
1128 for (int b=0; (b<batch) && (r+b<records); b++){
1129 if (calc.verifyRowValues(rows[b]) != 0){
1130 closeTransaction(pNdb);
1131 g_err << "Line: " << __LINE__
1132 << " verify row failed"
1133 << ", record: " << r << " of: " << records
1134 << ", row: " << b << " in a batch of: " << batch
1135 << endl;
1136 return NDBT_FAILED;
1137 }
1138 reads++;
1139 r++;
1140 }
1141 }
1142 }
1143
1144 closeTransaction(pNdb);
1145
1146 if (timer_active) {
1147 timer_stop = NdbTick_getCurrentTicks();
1148 Uint64 elapsed = NdbTick_Elapsed(timer_start, timer_stop).microSec();
1149 m_stats_latency->addObservation((double)elapsed);
1150 }
1151 }
1152 deallocRows();
1153 indexScans.clear();
1154 g_info << reads << " records read" << endl;
1155 return NDBT_OK;
1156 }
1157
1158
1159
1160 int
pkUpdateRecords(Ndb * pNdb,int records,int batch,int doSleep)1161 HugoTransactions::pkUpdateRecords(Ndb* pNdb,
1162 int records,
1163 int batch,
1164 int doSleep){
1165 int updated = 0;
1166 int r = 0;
1167 int retryAttempt = 0;
1168 int check, b;
1169
1170 allocRows(batch);
1171
1172 g_info << "|- Updating records (batch=" << batch << ")..." << endl;
1173 int batch_no = 0;
1174 while (r < records){
1175 if(r + batch > records)
1176 batch = records - r;
1177
1178 if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
1179 {
1180 r += batch;
1181 batch_no++;
1182 continue;
1183 }
1184
1185 if (retryAttempt >= m_retryMax){
1186 g_err << "ERROR: has retried this operation " << retryAttempt
1187 << " times, failing!, line: " << __LINE__ << endl;
1188 return NDBT_FAILED;
1189 }
1190
1191 if (doSleep > 0)
1192 NdbSleep_MilliSleep(doSleep);
1193
1194 pTrans = pNdb->startTransaction();
1195 if (pTrans == NULL) {
1196 const NdbError err = pNdb->getNdbError();
1197
1198 if (err.status == NdbError::TemporaryError){
1199 NDB_ERR(err);
1200 NdbSleep_MilliSleep(50);
1201 retryAttempt++;
1202 continue;
1203 }
1204 NDB_ERR(err);
1205 setNdbError(err);
1206 return NDBT_FAILED;
1207 }
1208
1209 if(pkReadRecord(pNdb, r, batch, NdbOperation::LM_Exclusive) != NDBT_OK)
1210 {
1211 NDB_ERR(pTrans->getNdbError());
1212 setNdbError(pTrans->getNdbError());
1213 closeTransaction(pNdb);
1214 return NDBT_FAILED;
1215 }
1216
1217 check = pTrans->execute(NoCommit, AbortOnError);
1218 if( check == -1 ) {
1219 const NdbError err = pTrans->getNdbError();
1220
1221 if (err.status == NdbError::TemporaryError){
1222 NDB_ERR(err);
1223 closeTransaction(pNdb);
1224 NdbSleep_MilliSleep(50);
1225 retryAttempt++;
1226 continue;
1227 }
1228 NDB_ERR(err);
1229 setNdbError(err);
1230 closeTransaction(pNdb);
1231 return NDBT_FAILED;
1232 }
1233
1234 NDB_TICKS timer_start;
1235 NDB_TICKS timer_stop;
1236 bool timer_active =
1237 m_stats_latency != 0 &&
1238 r >= batch && // first batch is "warmup"
1239 r + batch != records; // last batch is usually partial
1240
1241 if (timer_active)
1242 timer_start = NdbTick_getCurrentTicks();
1243
1244 int rows_found = 0;
1245
1246 if(indexScans.size() > 0)
1247 {
1248 /* Index scans used to read records */
1249 for (Uint32 scanOp=0; scanOp < indexScans.size(); scanOp++)
1250 {
1251 while((check = indexScans[scanOp]->nextResult(true)) == 0)
1252 {
1253 do {
1254
1255 if (calc.verifyRowValues(rows[0]) != 0){
1256 g_err << "Row validation failure, line: " << __LINE__ << endl;
1257 closeTransaction(pNdb);
1258 return NDBT_FAILED;
1259 }
1260
1261 int updates = calc.getUpdatesValue(rows[0]) + (m_empty_update? 0 : 1);
1262
1263 /* Rows may not arrive in the order they were requested
1264 * (When multiple partitions scanned without ORDERBY)
1265 * therefore we use the id from the row to update it
1266 */
1267 const Uint32 rowId= calc.getIdValue(rows[0]);
1268 if(pkUpdateRecord(pNdb, rowId, 1, updates) != NDBT_OK)
1269 {
1270 NDB_ERR(pTrans->getNdbError());
1271 setNdbError(pTrans->getNdbError());
1272 closeTransaction(pNdb);
1273 return NDBT_FAILED;
1274 }
1275 rows_found++;
1276 } while((check = indexScans[scanOp]->nextResult(false)) == 0);
1277
1278 if(check != 2)
1279 break;
1280 if((check = pTrans->execute(NoCommit, AbortOnError)) != 0)
1281 break;
1282 } // Next fetch on this scan op...
1283
1284 if(check != 1)
1285 {
1286 g_err << "Check failed, line: " << __LINE__ << endl;
1287 closeTransaction(pNdb);
1288 return NDBT_FAILED;
1289 }
1290 } // Next scan op...
1291
1292 if (rows_found != batch)
1293 {
1294 g_err << "Incorrect num of rows found. Expected "
1295 << batch << ". Found " << rows_found << endl;
1296 g_err << "Line: " << __LINE__ << endl;
1297 closeTransaction(pNdb);
1298 return NDBT_FAILED;
1299 }
1300 }
1301 else
1302 {
1303 for(b = 0; b<batch && (b+r)<records; b++)
1304 {
1305 if (calc.verifyRowValues(rows[b]) != 0)
1306 {
1307 closeTransaction(pNdb);
1308 g_err << "Line: " << __LINE__ << " verify row failed" << endl;
1309 return NDBT_FAILED;
1310 }
1311
1312 int updates = calc.getUpdatesValue(rows[b]) + (m_empty_update? 0 : 1);
1313
1314 if(pkUpdateRecord(pNdb, r+b, 1, updates) != NDBT_OK)
1315 {
1316 NDB_ERR(pTrans->getNdbError());
1317 setNdbError(pTrans->getNdbError());
1318 closeTransaction(pNdb);
1319 return NDBT_FAILED;
1320 }
1321 }
1322 check = pTrans->execute(Commit, AbortOnError);
1323 }
1324 if( check == -1 ) {
1325 const NdbError err = pTrans->getNdbError();
1326
1327 if (err.status == NdbError::TemporaryError){
1328 NDB_ERR(err);
1329 closeTransaction(pNdb);
1330 NdbSleep_MilliSleep(50);
1331 retryAttempt++;
1332 continue;
1333 }
1334 NDB_ERR(err);
1335 setNdbError(err);
1336 ndbout << "r = " << r << endl;
1337 closeTransaction(pNdb);
1338 return NDBT_FAILED;
1339 }
1340 else{
1341 updated += batch;
1342 pTrans->getGCI(&m_latest_gci);
1343 }
1344
1345 closeTransaction(pNdb);
1346
1347 if (timer_active) {
1348 timer_stop = NdbTick_getCurrentTicks();
1349 Uint64 elapsed = NdbTick_Elapsed(timer_start, timer_stop).microSec();
1350 m_stats_latency->addObservation((double)elapsed);
1351 }
1352
1353 r += batch; // Read next record
1354 batch_no++;
1355 }
1356
1357 deallocRows();
1358 indexScans.clear();
1359 g_info << "|- " << updated << " records updated" << endl;
1360 return NDBT_OK;
1361 }
1362
1363 int
pkInterpretedUpdateRecords(Ndb * pNdb,int records,int batch)1364 HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb,
1365 int records,
1366 int batch){
1367 int updated = 0;
1368 int r = 0;
1369 int retryAttempt = 0;
1370 int check, a;
1371
1372 while (r < records){
1373
1374 if (retryAttempt >= m_retryMax){
1375 g_err << "ERROR: has retried this operation " << retryAttempt
1376 << " times, failing!, line: " << __LINE__ << endl;
1377 return NDBT_FAILED;
1378 }
1379
1380 pTrans = pNdb->startTransaction();
1381 if (pTrans == NULL) {
1382 const NdbError err = pNdb->getNdbError();
1383
1384 if (err.status == NdbError::TemporaryError){
1385 NDB_ERR(err);
1386 NdbSleep_MilliSleep(50);
1387 retryAttempt++;
1388 continue;
1389 }
1390 NDB_ERR(err);
1391 setNdbError(err);
1392 return NDBT_FAILED;
1393 }
1394
1395 NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
1396 if (pOp == NULL) {
1397 NDB_ERR(pTrans->getNdbError());
1398 setNdbError(pTrans->getNdbError());
1399 closeTransaction(pNdb);
1400 return NDBT_FAILED;
1401 }
1402
1403 check = pOp->readTupleExclusive();
1404 if( check == -1 ) {
1405 NDB_ERR(pTrans->getNdbError());
1406 setNdbError(pTrans->getNdbError());
1407 closeTransaction(pNdb);
1408 return NDBT_FAILED;
1409 }
1410
1411 // Define primary keys
1412 if (equalForRow(pOp, r) != 0)
1413 {
1414 closeTransaction(pNdb);
1415 g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
1416 return NDBT_FAILED;
1417 }
1418
1419 // Read update value
1420 for(a = 0; a<tab.getNoOfColumns(); a++){
1421 if (calc.isUpdateCol(a) == true){
1422 if((row.attributeStore(a) =
1423 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1424 NDB_ERR(pTrans->getNdbError());
1425 setNdbError(pTrans->getNdbError());
1426 closeTransaction(pNdb);
1427 return NDBT_FAILED;
1428 }
1429 }
1430 }
1431
1432 check = pTrans->execute(NoCommit, AbortOnError);
1433 if( check == -1 ) {
1434 const NdbError err = pTrans->getNdbError();
1435
1436 if (err.status == NdbError::TemporaryError){
1437 NDB_ERR(err);
1438 closeTransaction(pNdb);
1439 NdbSleep_MilliSleep(50);
1440 retryAttempt++;
1441 continue;
1442 }
1443 NDB_ERR(err);
1444 setNdbError(err);
1445 closeTransaction(pNdb);
1446 return NDBT_FAILED;
1447 }
1448
1449 int updates = calc.getUpdatesValue(&row) + (m_empty_update? 0 : 1);
1450
1451 NdbOperation* pUpdOp;
1452 pUpdOp = pTrans->getNdbOperation(tab.getName());
1453 if (pUpdOp == NULL) {
1454 NDB_ERR(pTrans->getNdbError());
1455 setNdbError(pTrans->getNdbError());
1456 closeTransaction(pNdb);
1457 return NDBT_FAILED;
1458 }
1459
1460 check = pUpdOp->interpretedUpdateTuple();
1461 if( check == -1 ) {
1462 NDB_ERR(pTrans->getNdbError());
1463 setNdbError(pTrans->getNdbError());
1464 closeTransaction(pNdb);
1465 return NDBT_FAILED;
1466 }
1467
1468 // PKs
1469 if (equalForRow(pUpdOp, r) != 0)
1470 {
1471 closeTransaction(pNdb);
1472 g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
1473 return NDBT_FAILED;
1474 }
1475
1476 // Update col
1477 for(a = 0; a<tab.getNoOfColumns(); a++){
1478 if ((tab.getColumn(a)->getPrimaryKey() == false) &&
1479 (calc.isUpdateCol(a) == true)){
1480
1481 // TODO switch for 32/64 bit
1482 const NdbDictionary::Column* attr = tab.getColumn(a);
1483 Uint32 valToIncWith = 1;
1484 check = pUpdOp->incValue(attr->getName(), valToIncWith);
1485 if( check == -1 ) {
1486 NDB_ERR(pTrans->getNdbError());
1487 setNdbError(pTrans->getNdbError());
1488 closeTransaction(pNdb);
1489 return NDBT_FAILED;
1490 }
1491 }
1492 }
1493
1494 // Remaining attributes
1495 for(a = 0; a<tab.getNoOfColumns(); a++){
1496 if ((tab.getColumn(a)->getPrimaryKey() == false) &&
1497 (calc.isUpdateCol(a) == false)){
1498 if(setValueForAttr(pUpdOp, a, r, updates ) != 0){
1499 NDB_ERR(pTrans->getNdbError());
1500 setNdbError(pTrans->getNdbError());
1501 closeTransaction(pNdb);
1502 return NDBT_FAILED;
1503 }
1504 }
1505 }
1506
1507
1508
1509 check = pTrans->execute(Commit, AbortOnError);
1510 if( check == -1 ) {
1511 const NdbError err = pTrans->getNdbError();
1512
1513 if (err.status == NdbError::TemporaryError){
1514 NDB_ERR(err);
1515 closeTransaction(pNdb);
1516 NdbSleep_MilliSleep(50);
1517 retryAttempt++;
1518 continue;
1519 }
1520 NDB_ERR(err);
1521 setNdbError(err);
1522 ndbout << "r = " << r << endl;
1523 closeTransaction(pNdb);
1524 return NDBT_FAILED;
1525 }
1526 else{
1527 updated++;
1528 pTrans->getGCI(&m_latest_gci);
1529 }
1530
1531
1532 closeTransaction(pNdb);
1533
1534 r++; // Read next record
1535
1536 }
1537
1538 g_info << "|- " << updated << " records updated" << endl;
1539 return NDBT_OK;
1540 }
1541
1542 int
pkDelRecords(Ndb * pNdb,int records,int batch,bool allowConstraintViolation,int doSleep)1543 HugoTransactions::pkDelRecords(Ndb* pNdb,
1544 int records,
1545 int batch,
1546 bool allowConstraintViolation,
1547 int doSleep){
1548 // TODO Batch is not implemented
1549 int deleted = 0;
1550 int r = 0;
1551 int retryAttempt = 0;
1552 int check;
1553
1554 g_info << "|- Deleting records..." << endl;
1555 int batch_no = 0;
1556 while (r < records){
1557 if(r + batch > records)
1558 batch = records - r;
1559
1560 if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
1561 {
1562 r += batch;
1563 batch_no++;
1564 continue;
1565 }
1566
1567 if (retryAttempt >= m_retryMax){
1568 g_err << "ERROR: has retried this operation " << retryAttempt
1569 << " times, failing!, line: " << __LINE__ << endl;
1570 return NDBT_FAILED;
1571 }
1572
1573 if (doSleep > 0)
1574 NdbSleep_MilliSleep(doSleep);
1575
1576 pTrans = pNdb->startTransaction();
1577 if (pTrans == NULL) {
1578 const NdbError err = pNdb->getNdbError();
1579
1580 if (err.status == NdbError::TemporaryError){
1581 NDB_ERR(err);
1582 NdbSleep_MilliSleep(50);
1583 retryAttempt++;
1584 continue;
1585 }
1586 NDB_ERR(err);
1587 setNdbError(err);
1588 return NDBT_FAILED;
1589 }
1590
1591 NDB_TICKS timer_start;
1592 NDB_TICKS timer_stop;
1593 bool timer_active =
1594 m_stats_latency != 0 &&
1595 r >= batch && // first batch is "warmup"
1596 r + batch != records; // last batch is usually partial
1597
1598 if (timer_active)
1599 timer_start = NdbTick_getCurrentTicks();
1600
1601 if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)
1602 {
1603 NDB_ERR(pTrans->getNdbError());
1604 setNdbError(pTrans->getNdbError());
1605 closeTransaction(pNdb);
1606 return NDBT_FAILED;
1607 }
1608
1609 check = pTrans->execute(Commit, AbortOnError);
1610 if( check == -1) {
1611 const NdbError err = pTrans->getNdbError();
1612
1613 switch(err.status){
1614 case NdbError::TemporaryError:
1615 NDB_ERR(err);
1616 closeTransaction(pNdb);
1617 NdbSleep_MilliSleep(50);
1618 retryAttempt++;
1619 continue;
1620 break;
1621
1622 case NdbError::PermanentError:
1623 if (allowConstraintViolation == true){
1624 switch (err.classification){
1625 case NdbError::ConstraintViolation:
1626 // Tuple did not exist, OK but should be reported
1627 g_info << r << ": " << err.code << " " << err.message << endl;
1628 continue;
1629 break;
1630 default:
1631 break;
1632 }
1633 }
1634 NDB_ERR(err);
1635 setNdbError(err);
1636 closeTransaction(pNdb);
1637 return NDBT_FAILED;
1638 break;
1639
1640 default:
1641 NDB_ERR(err);
1642 setNdbError(err);
1643 closeTransaction(pNdb);
1644 return NDBT_FAILED;
1645 }
1646 }
1647 else {
1648 deleted += batch;
1649 pTrans->getGCI(&m_latest_gci);
1650 }
1651 closeTransaction(pNdb);
1652
1653 if (timer_active) {
1654 timer_stop = NdbTick_getCurrentTicks();
1655 Uint64 elapsed = NdbTick_Elapsed(timer_start, timer_stop).microSec();
1656 m_stats_latency->addObservation((double)elapsed);
1657 }
1658
1659 r += batch; // Read next record
1660 batch_no++;
1661 }
1662
1663 g_info << "|- " << deleted << " records deleted" << endl;
1664 return NDBT_OK;
1665 }
1666
1667 int
pkRefreshRecords(Ndb * pNdb,int startFrom,int count,int batch)1668 HugoTransactions::pkRefreshRecords(Ndb* pNdb,
1669 int startFrom,
1670 int count,
1671 int batch)
1672 {
1673 int r = 0;
1674 int retryAttempt = 0;
1675
1676 g_info << "|- Refreshing records..." << startFrom << "-" << (startFrom+count)
1677 << " (batch=" << batch << ")" << endl;
1678
1679 while (r < count)
1680 {
1681 if(r + batch > count)
1682 batch = count - r;
1683
1684 if (retryAttempt >= m_retryMax)
1685 {
1686 g_err << "ERROR: has retried this operation " << retryAttempt
1687 << " times, failing!, line: " << __LINE__ << endl;
1688 return NDBT_FAILED;
1689 }
1690
1691 pTrans = pNdb->startTransaction();
1692 if (pTrans == NULL)
1693 {
1694 const NdbError err = pNdb->getNdbError();
1695
1696 if (err.status == NdbError::TemporaryError){
1697 NDB_ERR(err);
1698 NdbSleep_MilliSleep(50);
1699 retryAttempt++;
1700 continue;
1701 }
1702 NDB_ERR(err);
1703 return NDBT_FAILED;
1704 }
1705
1706 if (pkRefreshRecord(pNdb, r, batch) != NDBT_OK)
1707 {
1708 NDB_ERR(pTrans->getNdbError());
1709 closeTransaction(pNdb);
1710 return NDBT_FAILED;
1711 }
1712
1713 if (pTrans->execute(Commit, AbortOnError) == -1)
1714 {
1715 const NdbError err = pTrans->getNdbError();
1716
1717 switch(err.status){
1718 case NdbError::TemporaryError:
1719 NDB_ERR(err);
1720 closeTransaction(pNdb);
1721 NdbSleep_MilliSleep(50);
1722 retryAttempt++;
1723 continue;
1724 break;
1725
1726 default:
1727 NDB_ERR(err);
1728 closeTransaction(pNdb);
1729 return NDBT_FAILED;
1730 }
1731 }
1732
1733 closeTransaction(pNdb);
1734 r += batch; // Read next record
1735 }
1736
1737 return NDBT_OK;
1738 }
1739
1740 int
pkReadUnlockRecords(Ndb * pNdb,int records,int batch,NdbOperation::LockMode lm)1741 HugoTransactions::pkReadUnlockRecords(Ndb* pNdb,
1742 int records,
1743 int batch,
1744 NdbOperation::LockMode lm)
1745 {
1746 int reads = 0;
1747 int r = 0;
1748 int retryAttempt = 0;
1749 int check;
1750
1751 if (batch == 0) {
1752 g_err << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed.";
1753 g_err << " line: " << __LINE__ << endl;
1754 return NDBT_FAILED;
1755 }
1756
1757 if (idx != NULL) {
1758 g_err << "ERROR: Cannot call pkReadUnlockRecords for index";
1759 g_err << " line: " << __LINE__ << endl;
1760 return NDBT_FAILED;
1761 }
1762
1763 while (r < records){
1764 if(r + batch > records)
1765 batch = records - r;
1766
1767 if (retryAttempt >= m_retryMax){
1768 g_err << "ERROR: has retried this operation " << retryAttempt
1769 << " times, failing!, line: " << __LINE__ << endl;
1770 return NDBT_FAILED;
1771 }
1772
1773 pTrans = pNdb->startTransaction();
1774 if (pTrans == NULL) {
1775 const NdbError err = pNdb->getNdbError();
1776
1777 if (err.status == NdbError::TemporaryError){
1778 NDB_ERR(err);
1779 NdbSleep_MilliSleep(50);
1780 retryAttempt++;
1781 continue;
1782 }
1783 NDB_ERR(err);
1784 return NDBT_FAILED;
1785 }
1786
1787 NDB_TICKS timer_start;
1788 NDB_TICKS timer_stop;
1789 bool timer_active =
1790 m_stats_latency != 0 &&
1791 r >= batch && // first batch is "warmup"
1792 r + batch != records; // last batch is usually partial
1793
1794 if (timer_active)
1795 timer_start = NdbTick_getCurrentTicks();
1796
1797 Vector<const NdbLockHandle*> lockHandles;
1798
1799 NdbOperation::LockMode lmused;
1800 if(pkReadRecordLockHandle(pNdb, lockHandles, r, batch, lm, &lmused) != NDBT_OK)
1801 {
1802 NDB_ERR(pTrans->getNdbError());
1803 closeTransaction(pNdb);
1804 return NDBT_FAILED;
1805 }
1806
1807 check = pTrans->execute(NoCommit, AbortOnError);
1808
1809 if( check == -1 ) {
1810 const NdbError err = pTrans->getNdbError();
1811
1812 if (err.status == NdbError::TemporaryError){
1813 NDB_ERR(err);
1814 closeTransaction(pNdb);
1815 NdbSleep_MilliSleep(50);
1816 retryAttempt++;
1817 continue;
1818 }
1819 switch(err.code){
1820 case 626: // Tuple did not exist
1821 g_info << r << ": " << err.code << " " << err.message << endl;
1822 r++;
1823 break;
1824
1825 default:
1826 NDB_ERR(err);
1827 closeTransaction(pNdb);
1828 return NDBT_FAILED;
1829 }
1830 } else {
1831 /* Execute succeeded ok */
1832 for (int b=0; (b<batch) && (r+b<records); b++){
1833 if (calc.verifyRowValues(rows[b]) != 0){
1834 closeTransaction(pNdb);
1835 g_err << "Line: " << __LINE__ << " verify row failed" << endl;
1836 return NDBT_FAILED;
1837 }
1838 reads++;
1839 r++;
1840 }
1841
1842 if (pkUnlockRecord(pNdb,
1843 lockHandles) != NDBT_OK)
1844 {
1845 closeTransaction(pNdb);
1846 g_err << "Line: " << __LINE__ << " unlock row failed" << endl;
1847 return NDBT_FAILED;
1848 }
1849
1850 check = pTrans->execute(Commit, AbortOnError);
1851
1852 if (check == -1 )
1853 {
1854 const NdbError err = pTrans->getNdbError();
1855
1856 if (err.status == NdbError::TemporaryError){
1857 NDB_ERR(err);
1858 closeTransaction(pNdb);
1859 NdbSleep_MilliSleep(50);
1860 retryAttempt++;
1861 continue;
1862 }
1863 NDB_ERR(err);
1864 closeTransaction(pNdb);
1865 return NDBT_FAILED;
1866 }
1867 }
1868
1869 closeTransaction(pNdb);
1870
1871 if (timer_active) {
1872 timer_stop = NdbTick_getCurrentTicks();
1873 Uint64 elapsed = NdbTick_Elapsed(timer_start, timer_stop).microSec();
1874 m_stats_latency->addObservation((double)elapsed);
1875 }
1876 }
1877 deallocRows();
1878 g_info << reads << " records read" << endl;
1879 return NDBT_OK;
1880 }
1881
1882
1883 int
lockRecords(Ndb * pNdb,int records,int percentToLock,int lockTime)1884 HugoTransactions::lockRecords(Ndb* pNdb,
1885 int records,
1886 int percentToLock,
1887 int lockTime){
1888 // Place a lock on percentToLock% of the records in the Db
1889 // Keep the locks for lockTime ms, commit operation
1890 // and lock som other records
1891 int r = 0;
1892 int retryAttempt = 0;
1893 int check;
1894 NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
1895
1896 // Calculate how many records to lock in each batch
1897 if (percentToLock <= 0)
1898 percentToLock = 1;
1899 double percentVal = (double)percentToLock / 100;
1900 int lockBatch = (int)(records * percentVal);
1901 if (lockBatch <= 0)
1902 lockBatch = 1;
1903
1904 allocRows(lockBatch);
1905
1906 while (r < records){
1907 if(r + lockBatch > records)
1908 lockBatch = records - r;
1909
1910 g_info << "|- Locking " << lockBatch << " records..." << endl;
1911
1912 if (retryAttempt >= m_retryMax){
1913 g_err << "ERROR: has retried this operation " << retryAttempt
1914 << " times, failing!, line: " << __LINE__ << endl;
1915 return NDBT_FAILED;
1916 }
1917
1918 pTrans = pNdb->startTransaction();
1919 if (pTrans == NULL) {
1920 const NdbError err = pNdb->getNdbError();
1921
1922 if (err.status == NdbError::TemporaryError){
1923 NDB_ERR(err);
1924 NdbSleep_MilliSleep(50);
1925 retryAttempt++;
1926 continue;
1927 }
1928 NDB_ERR(err);
1929 setNdbError(err);
1930 return NDBT_FAILED;
1931 }
1932
1933 if(pkReadRecord(pNdb, r, lockBatch, lm) != NDBT_OK)
1934 {
1935 NDB_ERR(pTrans->getNdbError());
1936 setNdbError(pTrans->getNdbError());
1937 closeTransaction(pNdb);
1938 return NDBT_FAILED;
1939 }
1940
1941 // NoCommit lockTime times with 100 millis interval
1942 int sleepInterval = 50;
1943 int lockCount = lockTime / sleepInterval;
1944 int commitCount = 0;
1945 bool tempErr = false;
1946 do {
1947 check = pTrans->execute(NoCommit, AbortOnError);
1948 if( check == -1) {
1949 const NdbError err = pTrans->getNdbError();
1950
1951 if (err.status == NdbError::TemporaryError){
1952 NDB_ERR(err);
1953 closeTransaction(pNdb);
1954 NdbSleep_MilliSleep(50);
1955 tempErr = true;
1956 retryAttempt++;
1957 break;
1958 }
1959 NDB_ERR(err);
1960 setNdbError(err);
1961 closeTransaction(pNdb);
1962 return NDBT_FAILED;
1963 }
1964 for (int b=0; (b<lockBatch) && (r+b<records); b++){
1965 if (calc.verifyRowValues(rows[b]) != 0){
1966 closeTransaction(pNdb);
1967 g_err << "Line: " << __LINE__ << " verify row failed" << endl;
1968 return NDBT_FAILED;
1969 }
1970 }
1971 commitCount++;
1972 NdbSleep_MilliSleep(sleepInterval);
1973 } while (commitCount < lockCount);
1974
1975 if (tempErr)
1976 continue; /* Retry lock attempt */
1977
1978 // Really commit the trans, puuh!
1979 check = pTrans->execute(Commit, AbortOnError);
1980 if( check == -1) {
1981 const NdbError err = pTrans->getNdbError();
1982
1983 if (err.status == NdbError::TemporaryError){
1984 NDB_ERR(err);
1985 closeTransaction(pNdb);
1986 NdbSleep_MilliSleep(50);
1987 retryAttempt++;
1988 continue;
1989 }
1990 NDB_ERR(err);
1991 setNdbError(err);
1992 closeTransaction(pNdb);
1993 return NDBT_FAILED;
1994 }
1995 else{
1996 for (int b=0; (b<lockBatch) && (r<records); b++){
1997 if (calc.verifyRowValues(rows[b]) != 0){
1998 closeTransaction(pNdb);
1999 g_err << "Line: " << __LINE__ << " verify row failed" << endl;
2000 return NDBT_FAILED;
2001 }
2002 r++; // Read next record
2003 }
2004 }
2005
2006 closeTransaction(pNdb);
2007
2008 }
2009 deallocRows();
2010 g_info << "|- Record locking completed" << endl;
2011 return NDBT_OK;
2012 }
2013
2014 int
indexReadRecords(Ndb * pNdb,const char * idxName,int records,int batch)2015 HugoTransactions::indexReadRecords(Ndb* pNdb,
2016 const char * idxName,
2017 int records,
2018 int batch){
2019 int reads = 0;
2020 int r = 0;
2021 int retryAttempt = 0;
2022 int check, a;
2023 NdbOperation *pOp;
2024 NdbIndexScanOperation *sOp;
2025
2026 const NdbDictionary::Index* pIndex
2027 = pNdb->getDictionary()->getIndex(idxName, tab.getName());
2028
2029 if (pIndex == NULL)
2030 {
2031 g_info << "Index " << idxName << " not existing" << endl;
2032 return NDBT_FAILED;
2033 }
2034
2035 const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex);
2036
2037 if (batch == 0) {
2038 g_err << "ERROR: Argument batch == 0 in indexReadRecords(). "
2039 << "Not allowed, line: " << __LINE__ << endl;
2040 return NDBT_FAILED;
2041 }
2042
2043 if (ordered) {
2044 batch = 1;
2045 }
2046
2047 allocRows(batch);
2048
2049 while (r < records){
2050 if (retryAttempt >= m_retryMax){
2051 g_err << "ERROR: has retried this operation " << retryAttempt
2052 << " times, failing!, line: " << __LINE__ << endl;
2053 return NDBT_FAILED;
2054 }
2055
2056 pTrans = pNdb->startTransaction();
2057 if (pTrans == NULL) {
2058 const NdbError err = pNdb->getNdbError();
2059
2060 if (err.status == NdbError::TemporaryError){
2061 NDB_ERR(err);
2062 NdbSleep_MilliSleep(50);
2063 retryAttempt++;
2064 continue;
2065 }
2066 NDB_ERR(err);
2067 setNdbError(err);
2068 return NDBT_FAILED;
2069 }
2070
2071 for(int b=0; (b<batch) && (r+b < records); b++){
2072 if(!ordered){
2073 pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
2074 if (pOp == NULL) {
2075 NDB_ERR(pTrans->getNdbError());
2076 setNdbError(pTrans->getNdbError());
2077 closeTransaction(pNdb);
2078 return NDBT_FAILED;
2079 }
2080 check = pOp->readTuple();
2081 } else {
2082 pOp = sOp = pTrans->getNdbIndexScanOperation(idxName, tab.getName());
2083 if (sOp == NULL) {
2084 NDB_ERR(pTrans->getNdbError());
2085 setNdbError(pTrans->getNdbError());
2086 closeTransaction(pNdb);
2087 return NDBT_FAILED;
2088 }
2089 check = sOp->readTuples();
2090 }
2091
2092 if( check == -1 ) {
2093 NDB_ERR(pTrans->getNdbError());
2094 setNdbError(pTrans->getNdbError());
2095 closeTransaction(pNdb);
2096 return NDBT_FAILED;
2097 }
2098
2099 // Define primary keys
2100 if (equalForRow(pOp, r+b) != 0)
2101 {
2102 closeTransaction(pNdb);
2103 g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
2104 return NDBT_FAILED;
2105 }
2106
2107 // Define attributes to read
2108 for(a = 0; a<tab.getNoOfColumns(); a++){
2109 if((rows[b]->attributeStore(a) =
2110 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
2111 NDB_ERR(pTrans->getNdbError());
2112 setNdbError(pTrans->getNdbError());
2113 closeTransaction(pNdb);
2114 return NDBT_FAILED;
2115 }
2116 }
2117 }
2118
2119 check = pTrans->execute(Commit, AbortOnError);
2120 check = (check == -1 ? -1 : !ordered ? check : sOp->nextResult(true));
2121 if( check == -1 ) {
2122 const NdbError err = pTrans->getNdbError();
2123
2124 if (err.status == NdbError::TemporaryError){
2125 NDB_ERR(err);
2126 closeTransaction(pNdb);
2127 NdbSleep_MilliSleep(50);
2128 retryAttempt++;
2129 continue;
2130 }
2131 switch(err.code){
2132 case 626: // Tuple did not exist
2133 g_info << r << ": " << err.code << " " << err.message << endl;
2134 r++;
2135 break;
2136
2137 default:
2138 NDB_ERR(err);
2139 setNdbError(err);
2140 closeTransaction(pNdb);
2141 return NDBT_FAILED;
2142 }
2143 } else{
2144 for (int b=0; (b<batch) && (r+b<records); b++){
2145 if (calc.verifyRowValues(rows[b]) != 0){
2146 closeTransaction(pNdb);
2147 g_err << "Line: " << __LINE__
2148 << " verify row failed"
2149 << ", record: " << r << " of: " << records
2150 << ", row: " << b << " in a batch of: " << batch
2151 << endl;
2152 return NDBT_FAILED;
2153 }
2154 reads++;
2155 r++;
2156 }
2157 if(ordered && sOp->nextResult(true) == 0){
2158 ndbout << "Error when comparing records "
2159 << " - index op next_result to many" << endl;
2160 ndbout << "Line: " << __LINE__ << endl;
2161 closeTransaction(pNdb);
2162 return NDBT_FAILED;
2163 }
2164 }
2165 closeTransaction(pNdb);
2166 }
2167 deallocRows();
2168 g_info << reads << " records read" << endl;
2169 return NDBT_OK;
2170 }
2171
2172
2173
2174 int
indexUpdateRecords(Ndb * pNdb,const char * idxName,int records,int batch)2175 HugoTransactions::indexUpdateRecords(Ndb* pNdb,
2176 const char * idxName,
2177 int records,
2178 int batch){
2179
2180 int updated = 0;
2181 int r = 0;
2182 int retryAttempt = 0;
2183 int check, a, b;
2184 NdbOperation *pOp;
2185 NdbScanOperation * sOp;
2186
2187 const NdbDictionary::Index* pIndex
2188 = pNdb->getDictionary()->getIndex(idxName, tab.getName());
2189
2190 const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex);
2191 if (ordered){
2192 batch = 1;
2193 }
2194
2195 allocRows(batch);
2196
2197 while (r < records){
2198 if (retryAttempt >= m_retryMax){
2199 g_err << "ERROR: has retried this operation " << retryAttempt
2200 << " times, failing!, line: " << __LINE__ << endl;
2201 return NDBT_FAILED;
2202 }
2203
2204 pTrans = pNdb->startTransaction();
2205 if (pTrans == NULL) {
2206 const NdbError err = pNdb->getNdbError();
2207
2208 if (err.status == NdbError::TemporaryError){
2209 NDB_ERR(err);
2210 NdbSleep_MilliSleep(50);
2211 retryAttempt++;
2212 continue;
2213 }
2214 NDB_ERR(err);
2215 setNdbError(err);
2216 return NDBT_FAILED;
2217 }
2218
2219 for(b = 0; b<batch && (b+r)<records; b++){
2220 if(!ordered){
2221 pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
2222 if (pOp == NULL) {
2223 NDB_ERR(pTrans->getNdbError());
2224 setNdbError(pTrans->getNdbError());
2225 closeTransaction(pNdb);
2226 return NDBT_FAILED;
2227 }
2228
2229 check = pOp->readTupleExclusive();
2230 if( check == -1 ) {
2231 NDB_ERR(pTrans->getNdbError());
2232 setNdbError(pTrans->getNdbError());
2233 closeTransaction(pNdb);
2234 return NDBT_FAILED;
2235 }
2236 } else {
2237 pOp = sOp = pTrans->getNdbIndexScanOperation(idxName, tab.getName());
2238 if (pOp == NULL) {
2239 NDB_ERR(pTrans->getNdbError());
2240 setNdbError(pTrans->getNdbError());
2241 closeTransaction(pNdb);
2242 return NDBT_FAILED;
2243 }
2244
2245 check = 0;
2246 sOp->readTuplesExclusive();
2247 }
2248
2249 // Define primary keys
2250 if (equalForRow(pOp, r+b) != 0)
2251 {
2252 closeTransaction(pNdb);
2253 g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
2254 return NDBT_FAILED;
2255 }
2256
2257 // Define attributes to read
2258 for(a = 0; a<tab.getNoOfColumns(); a++){
2259 if((rows[b]->attributeStore(a) =
2260 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
2261 NDB_ERR(pTrans->getNdbError());
2262 setNdbError(pTrans->getNdbError());
2263 closeTransaction(pNdb);
2264 return NDBT_FAILED;
2265 }
2266 }
2267 }
2268
2269 check = pTrans->execute(NoCommit, AbortOnError);
2270 check = (check == -1 ? -1 : !ordered ? check : sOp->nextResult(true));
2271 if( check == -1 ) {
2272 const NdbError err = pTrans->getNdbError();
2273 NDB_ERR(err);
2274 closeTransaction(pNdb);
2275
2276 if (err.status == NdbError::TemporaryError){
2277 NdbSleep_MilliSleep(50);
2278 retryAttempt++;
2279 continue;
2280 }
2281 setNdbError(err);
2282 return NDBT_FAILED;
2283 }
2284
2285 if(ordered && check != 0){
2286 g_err << check << " - Row: " << r << " not found!!" << endl;
2287 closeTransaction(pNdb);
2288 return NDBT_FAILED;
2289 }
2290
2291 for(b = 0; b<batch && (b+r)<records; b++){
2292 if (calc.verifyRowValues(rows[b]) != 0){
2293 closeTransaction(pNdb);
2294 g_err << "Line: " << __LINE__ << " verify row failed" << endl;
2295 return NDBT_FAILED;
2296 }
2297
2298 int updates = calc.getUpdatesValue(rows[b]) + (m_empty_update? 0 : 1);
2299
2300 NdbOperation* pUpdOp;
2301 if(!ordered){
2302 pUpdOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
2303 check = (pUpdOp == 0 ? -1 : pUpdOp->updateTuple());
2304 } else {
2305 pUpdOp = sOp->updateCurrentTuple();
2306 }
2307
2308 if (pUpdOp == NULL) {
2309 NDB_ERR(pTrans->getNdbError());
2310 setNdbError(pTrans->getNdbError());
2311 closeTransaction(pNdb);
2312 return NDBT_FAILED;
2313 }
2314
2315 if( check == -1 ) {
2316 NDB_ERR(pTrans->getNdbError());
2317 setNdbError(pTrans->getNdbError());
2318 closeTransaction(pNdb);
2319 return NDBT_FAILED;
2320 }
2321
2322 if(!ordered)
2323 {
2324 if (equalForRow(pUpdOp, r+b) != 0)
2325 {
2326 closeTransaction(pNdb);
2327 g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
2328 return NDBT_FAILED;
2329 }
2330 }
2331
2332 for(a = 0; a<tab.getNoOfColumns(); a++){
2333 if (tab.getColumn(a)->getPrimaryKey() == false){
2334 if(setValueForAttr(pUpdOp, a, r+b, updates ) != 0){
2335 NDB_ERR(pTrans->getNdbError());
2336 setNdbError(pTrans->getNdbError());
2337 closeTransaction(pNdb);
2338 return NDBT_FAILED;
2339 }
2340 }
2341 }
2342 }
2343
2344 check = pTrans->execute(Commit, AbortOnError);
2345 if( check == -1 ) {
2346 const NdbError err = pTrans->getNdbError();
2347 NDB_ERR(err);
2348 closeTransaction(pNdb);
2349
2350 if (err.status == NdbError::TemporaryError){
2351 NdbSleep_MilliSleep(50);
2352 retryAttempt++;
2353 continue;
2354 }
2355 ndbout << "r = " << r << endl;
2356 setNdbError(err);
2357 return NDBT_FAILED;
2358 } else {
2359 updated += batch;
2360 pTrans->getGCI(&m_latest_gci);
2361 }
2362
2363 closeTransaction(pNdb);
2364
2365 r+= batch; // Read next record
2366 }
2367
2368 g_info << "|- " << updated << " records updated" << endl;
2369 return NDBT_OK;
2370 }
2371
2372 template class Vector<NDBT_ResultRow*>;
2373 template class Vector<NdbIndexScanOperation*>;
2374