1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "HugoTransactions.hpp"
26 #include <NDBT_Stats.hpp>
27 #include <NdbSleep.h>
28 #include <NdbTick.h>
29 
HugoTransactions(const NdbDictionary::Table & _tab,const NdbDictionary::Index * idx)30 HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
31 				   const NdbDictionary::Index* idx):
32   HugoOperations(_tab, idx),
33   row(_tab){
34 
35   m_defaultScanUpdateMethod = 3;
36   setRetryMax();
37   m_retryMaxReached = false;
38   m_stats_latency = 0;
39 
40   m_thr_count = 0;
41   m_thr_no = -1;
42 
43   m_empty_update = false;
44 }
45 
~HugoTransactions()46 HugoTransactions::~HugoTransactions(){
47   deallocRows();
48 }
49 
50 int
scanReadRecords(Ndb * pNdb,int records,int abortPercent,int parallelism,NdbOperation::LockMode lm,int scan_flags)51 HugoTransactions::scanReadRecords(Ndb* pNdb,
52 				  int records,
53 				  int abortPercent,
54 				  int parallelism,
55 				  NdbOperation::LockMode lm,
56                                   int scan_flags)
57 {
58 
59   int                  retryAttempt = 0;
60   int                  check, a;
61   NdbScanOperation	       *pOp;
62 
63   while (true){
64 
65     if (retryAttempt >= m_retryMax){
66       g_err << __LINE__ << " ERROR: has retried this operation "
67             << retryAttempt << " times, failing!, line: " << __LINE__ << endl;
68       return NDBT_FAILED;
69     }
70 
71     pTrans = pNdb->startTransaction();
72     if (pTrans == NULL) {
73       const NdbError err = pNdb->getNdbError();
74 
75       if (err.status == NdbError::TemporaryError){
76 	NDB_ERR(err);
77 	NdbSleep_MilliSleep(50);
78 	retryAttempt++;
79 	continue;
80       }
81       NDB_ERR(err);
82       setNdbError(err);
83       return NDBT_FAILED;
84     }
85 
86     pOp = getScanOperation(pTrans);
87     if (pOp == NULL) {
88       NDB_ERR(pTrans->getNdbError());
89       setNdbError(pTrans->getNdbError());
90       closeTransaction(pNdb);
91       return NDBT_FAILED;
92     }
93 
94     if( pOp ->readTuples(lm, scan_flags, parallelism) ) {
95       NDB_ERR(pTrans->getNdbError());
96       setNdbError(pTrans->getNdbError());
97       closeTransaction(pNdb);
98       return NDBT_FAILED;
99     }
100 
101     for(a = 0; a<tab.getNoOfColumns(); a++){
102       if((row.attributeStore(a) =
103 	  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
104 	NDB_ERR(pTrans->getNdbError());
105 	setNdbError(pTrans->getNdbError());
106 	closeTransaction(pNdb);
107 	return NDBT_FAILED;
108       }
109     }
110 
111     check = pTrans->execute(NoCommit, AbortOnError);
112     if( check == -1 ) {
113       const NdbError err = pTrans->getNdbError();
114       if (err.status == NdbError::TemporaryError){
115 	NDB_ERR(err);
116 	closeTransaction(pNdb);
117 	NdbSleep_MilliSleep(50);
118 	retryAttempt++;
119 	continue;
120       }
121       NDB_ERR(err);
122       setNdbError(err);
123       closeTransaction(pNdb);
124       return NDBT_FAILED;
125     }
126 
127     // Abort after 1-100 or 1-records rows
128     int ranVal = rand();
129     int abortCount = ranVal % (records == 0 ? 100 : records);
130     bool abortTrans = false;
131     if (abortPercent > 0){
132       // Abort if abortCount is less then abortPercent
133       if (abortCount < abortPercent)
134 	abortTrans = true;
135     }
136 
137     int eof;
138     int rows = 0;
139     while((eof = pOp->nextResult(true)) == 0){
140       rows++;
141       if (calc.verifyRowValues(&row) != 0){
142 	closeTransaction(pNdb);
143         g_err << "Line: " << __LINE__ << " verify row failed" << endl;
144 	return NDBT_FAILED;
145       }
146 
147       if (abortCount == rows && abortTrans == true){
148 	ndbout << "Scan is aborted" << endl;
149 	g_info << "Scan is aborted" << endl;
150 	pOp->close();
151 	if( check == -1 ) {
152 	  NDB_ERR(pTrans->getNdbError());
153 	  setNdbError(pTrans->getNdbError());
154 	  closeTransaction(pNdb);
155 	  return NDBT_FAILED;
156 	}
157 
158 	closeTransaction(pNdb);
159 	return NDBT_OK;
160       }
161     }
162     if (eof == -1) {
163       const NdbError err = pTrans->getNdbError();
164 
165       if (err.status == NdbError::TemporaryError){
166 	NDB_ERR_INFO(err);
167 	closeTransaction(pNdb);
168 	NdbSleep_MilliSleep(50);
169 	switch (err.code){
170 	case 488:
171 	case 245:
172 	case 490:
173 	  // Too many active scans, no limit on number of retry attempts
174 	  break;
175 	default:
176           if (err.classification == NdbError::TimeoutExpired)
177           {
178             if (retryAttempt >= (m_retryMax / 10) &&
179                 (parallelism == 0 || parallelism > 1))
180             {
181               /**
182                * decrease parallelism
183                */
184               parallelism = 1;
185               ndbout_c("decrease parallelism");
186             }
187           }
188 	  retryAttempt++;
189 	}
190 	continue;
191       }
192       NDB_ERR(err);
193       setNdbError(err);
194       closeTransaction(pNdb);
195       return NDBT_FAILED;
196     }
197 
198     closeTransaction(pNdb);
199 
200     g_info << rows << " rows have been read" << endl;
201     if (records != 0 && rows != records){
202       g_err << "Check expected number of records failed" << endl
203 	    << "  expected=" << records <<", " << endl
204 	    << "  read=" << rows << endl;
205       return NDBT_FAILED;
206     }
207 
208     return NDBT_OK;
209   }
210   abort(); /* Should never happen */
211   return NDBT_FAILED;
212 }
213 
214 int
scanReadRecords(Ndb * pNdb,const NdbDictionary::Index * pIdx,int records,int abortPercent,int parallelism,NdbOperation::LockMode lm,int scan_flags,int bound_cnt,const HugoBound * bound_arr)215 HugoTransactions::scanReadRecords(Ndb* pNdb,
216 				  const NdbDictionary::Index * pIdx,
217 				  int records,
218 				  int abortPercent,
219 				  int parallelism,
220 				  NdbOperation::LockMode lm,
221                                   int scan_flags,
222                                   int bound_cnt, const HugoBound* bound_arr)
223 {
224 
225   int                  retryAttempt = 0;
226   int                  check, a;
227   NdbScanOperation     *pOp;
228   NdbIndexScanOperation  *pIxOp;
229 
230   while (true){
231 
232     if (retryAttempt >= m_retryMax){
233       g_err << __LINE__ << " ERROR: has retried this operation "
234             << retryAttempt  << " times, failing!" << endl;
235       g_err << "lm: " << Uint32(lm) << " flags: H'" << hex << scan_flags
236             << endl;
237       return NDBT_FAILED;
238     }
239 
240     pTrans = pNdb->startTransaction();
241     if (pTrans == NULL) {
242       const NdbError err = pNdb->getNdbError();
243 
244       if (err.status == NdbError::TemporaryError){
245 	NDB_ERR(err);
246 	NdbSleep_MilliSleep(50);
247 	retryAttempt++;
248 	continue;
249       }
250       NDB_ERR(err);
251       setNdbError(err);
252       return NDBT_FAILED;
253     }
254 
255     if (pIdx != NULL) {
256       pOp = pIxOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName());
257     } else {
258       pOp = pTrans->getNdbScanOperation(tab.getName());
259       pIxOp = NULL;
260     }
261 
262     if (pOp == NULL) {
263       NDB_ERR(pTrans->getNdbError());
264       setNdbError(pTrans->getNdbError());
265       closeTransaction(pNdb);
266       return NDBT_FAILED;
267     }
268 
269     if( pOp ->readTuples(lm, scan_flags, parallelism) ) {
270       NDB_ERR(pTrans->getNdbError());
271       setNdbError(pTrans->getNdbError());
272       closeTransaction(pNdb);
273       return NDBT_FAILED;
274     }
275 
276     for (int i = 0; i < bound_cnt; i++) {
277       const HugoBound& b = bound_arr[i];
278       if (pIxOp->setBound(b.attr, b.type, b.value) != 0) {
279         NDB_ERR(pIxOp->getNdbError());
280         setNdbError(pIxOp->getNdbError());
281         return NDBT_FAILED;
282       }
283     }
284 
285     for(a = 0; a<tab.getNoOfColumns(); a++){
286       if((row.attributeStore(a) =
287 	  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
288 	NDB_ERR(pTrans->getNdbError());
289 	setNdbError(pTrans->getNdbError());
290 	closeTransaction(pNdb);
291 	return NDBT_FAILED;
292       }
293     }
294 
295     check = pTrans->execute(NoCommit, AbortOnError);
296     if( check == -1 ) {
297       const NdbError err = pTrans->getNdbError();
298       if (err.status == NdbError::TemporaryError){
299 	NDB_ERR(err);
300 	closeTransaction(pNdb);
301 	NdbSleep_MilliSleep(50);
302 	retryAttempt++;
303 	continue;
304       }
305       NDB_ERR(err);
306       setNdbError(err);
307       closeTransaction(pNdb);
308       return NDBT_FAILED;
309     }
310 
311     // Abort after 1-100 or 1-records rows
312     int ranVal = rand();
313     int abortCount = ranVal % (records == 0 ? 100 : records);
314     bool abortTrans = false;
315     if (abortPercent > 0){
316       // Abort if abortCount is less then abortPercent
317       if (abortCount < abortPercent)
318 	abortTrans = true;
319     }
320 
321     int eof;
322     int rows = 0;
323     while((eof = pOp->nextResult(true)) == 0){
324       rows++;
325       if (calc.verifyRowValues(&row) != 0){
326 	closeTransaction(pNdb);
327         g_err << "Line: " << __LINE__ << " verify row failed" << endl;
328 	return NDBT_FAILED;
329       }
330 
331       if (abortCount == rows && abortTrans == true){
332 	ndbout << "Scan is aborted" << endl;
333 	g_info << "Scan is aborted" << endl;
334 	pOp->close();
335 	if( check == -1 ) {
336 	  NDB_ERR(pTrans->getNdbError());
337 	  setNdbError(pTrans->getNdbError());
338 	  closeTransaction(pNdb);
339 	  return NDBT_FAILED;
340 	}
341 
342 	closeTransaction(pNdb);
343 	return NDBT_OK;
344       }
345     }
346     if (eof == -1) {
347       const NdbError err = pTrans->getNdbError();
348 
349       if (err.status == NdbError::TemporaryError){
350 	NDB_ERR_INFO(err);
351 	closeTransaction(pNdb);
352 	NdbSleep_MilliSleep(50);
353 	switch (err.code){
354 	case 488:
355 	case 245:
356 	case 490:
357 	  // Too many active scans, no limit on number of retry attempts
358 	  break;
359 	default:
360           if (err.classification == NdbError::TimeoutExpired)
361           {
362             if (retryAttempt >= (m_retryMax / 10) &&
363                 (parallelism == 0 || parallelism > 1))
364             {
365               /**
366                * decrease parallelism
367                */
368               parallelism = 1;
369               ndbout_c("decrease parallelism");
370             }
371             else if (retryAttempt >= (m_retryMax / 5) &&
372                      (lm != NdbOperation::LM_CommittedRead))
373             {
374               lm = NdbOperation::LM_CommittedRead;
375               ndbout_c("switch to LM_CommittedRead");
376             }
377             else if (retryAttempt >= (m_retryMax / 4) &&
378                      (pIdx != 0))
379             {
380               pIdx = NULL;
381               bound_cnt = 0;
382               scan_flags |= NdbScanOperation::SF_TupScan;
383               ndbout_c("switch to table-scan (SF_TupScan) from index-scan");
384             }
385           }
386 	  retryAttempt++;
387 	}
388 	continue;
389       }
390       NDB_ERR(err);
391       setNdbError(err);
392       closeTransaction(pNdb);
393       return NDBT_FAILED;
394     }
395 
396     closeTransaction(pNdb);
397 
398     g_info << rows << " rows have been read"
399            << ", number of index bounds " << bound_cnt << endl;
400     // TODO verify expected number of records with index bounds
401     if (records != 0 && rows != records && bound_cnt == 0){
402       g_err << "Check expected number of records failed" << endl
403 	    << "  expected=" << records <<", " << endl
404 	    << "  read=" << rows << endl;
405       return NDBT_FAILED;
406     }
407 
408     return NDBT_OK;
409   }
410   abort(); /* Should never happen */
411   return NDBT_FAILED;
412 }
413 
414 
415 #define RESTART_SCAN 99
416 
417 int
scanUpdateRecords(Ndb * pNdb,NdbScanOperation::ScanFlag flags,int records,int abortPercent,int parallelism)418 HugoTransactions::scanUpdateRecords(Ndb* pNdb,
419                                     NdbScanOperation::ScanFlag flags,
420                                     int records,
421                                     int abortPercent,
422                                     int parallelism){
423   int retryAttempt = 0;
424   int check, a;
425   NdbScanOperation *pOp;
426   m_retryMaxReached = false;
427 
428   while (true){
429 restart:
430     if (retryAttempt++ >= m_retryMax){
431       g_err << "ERROR: has retried this operation " << retryAttempt
432 	     << " times, failing!, line: " << __LINE__ << endl;
433       m_retryMaxReached = true;
434       return NDBT_FAILED;
435     }
436 
437     pTrans = pNdb->startTransaction();
438     if (pTrans == NULL) {
439       const NdbError err = pNdb->getNdbError();
440       NDB_ERR(err);
441       if (err.status == NdbError::TemporaryError){
442 	NdbSleep_MilliSleep(50);
443 	continue;
444       }
445       setNdbError(err);
446       return NDBT_FAILED;
447     }
448 
449     pOp = getScanOperation(pTrans);
450     if (pOp == NULL)
451     {
452       const NdbError err = pTrans->getNdbError();
453       NDB_ERR(err);
454       closeTransaction(pNdb);
455       if (err.status == NdbError::TemporaryError)
456       {
457         NdbSleep_MilliSleep(50);
458         continue;
459       }
460       setNdbError(err);
461       return NDBT_FAILED;
462     }
463 
464     if( pOp->readTuples(NdbOperation::LM_Exclusive, flags,
465                         parallelism))
466     {
467       NDB_ERR(pOp->getNdbError());
468       setNdbError(pOp->getNdbError());
469       closeTransaction(pNdb);
470       return NDBT_FAILED;
471     }
472 
473     // Read all attributes from this table
474     for(a=0; a<tab.getNoOfColumns(); a++){
475       if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
476 	NDB_ERR(pTrans->getNdbError());
477 	setNdbError(pTrans->getNdbError());
478 	closeTransaction(pNdb);
479 	return NDBT_FAILED;
480       }
481     }
482 
483     check = pTrans->execute(NoCommit, AbortOnError);
484     if( check == -1 ) {
485       const NdbError err = pTrans->getNdbError();
486       NDB_ERR(err);
487       closeTransaction(pNdb);
488       if (err.status == NdbError::TemporaryError){
489 	NdbSleep_MilliSleep(50);
490 	continue;
491       }
492       setNdbError(err);
493       return NDBT_FAILED;
494     }
495 
496     // Abort after 1-100 or 1-records rows
497     int ranVal = rand();
498     int abortCount = ranVal % (records == 0 ? 100 : records);
499     bool abortTrans = false;
500     if (abortPercent > 0){
501       // Abort if abortCount is less then abortPercent
502       if (abortCount < abortPercent)
503 	abortTrans = true;
504     }
505 
506     int rows = 0;
507     while((check = pOp->nextResult(true)) == 0){
508       do {
509 	rows++;
510 	NdbOperation* pUp = pOp->updateCurrentTuple();
511 	if(pUp == 0){
512 	  NDB_ERR(pTrans->getNdbError());
513 	  setNdbError(pTrans->getNdbError());
514 	  closeTransaction(pNdb);
515 	  return NDBT_FAILED;
516 	}
517 	const int updates = calc.getUpdatesValue(&row) + (m_empty_update? 0 : 1);
518 	const int r = calc.getIdValue(&row);
519 
520   	for(a = 0; a<tab.getNoOfColumns(); a++){
521 	  if (tab.getColumn(a)->getPrimaryKey() == false){
522 	    if(setValueForAttr(pUp, a, r, updates ) != 0){
523 	      NDB_ERR(pTrans->getNdbError());
524 	      setNdbError(pTrans->getNdbError());
525 	      closeTransaction(pNdb);
526 	      return NDBT_FAILED;
527 	    }
528 	  }
529 	}
530 
531 	if (rows == abortCount && abortTrans == true){
532 	  g_info << "Scan is aborted" << endl;
533 	  // This scan should be aborted
534 	  closeTransaction(pNdb);
535 	  return NDBT_OK;
536 	}
537       } while((check = pOp->nextResult(false)) == 0);
538 
539       if(check != -1){
540 	check = pTrans->execute(Commit, AbortOnError);
541 	if(check != -1)
542 	  m_latest_gci = pTrans->getGCI();
543 	pTrans->restart();
544       }
545 
546       const NdbError err = pTrans->getNdbError();
547       if( check == -1 ) {
548 	closeTransaction(pNdb);
549 	NDB_ERR(err);
550 	if (err.status == NdbError::TemporaryError){
551 	  NdbSleep_MilliSleep(50);
552 	  goto restart;
553 	}
554 	setNdbError(err);
555 	return NDBT_FAILED;
556       }
557     }
558 
559     const NdbError err = pTrans->getNdbError();
560     if( check == -1 ) {
561       closeTransaction(pNdb);
562       NDB_ERR(err);
563       if (err.status == NdbError::TemporaryError){
564 	NdbSleep_MilliSleep(50);
565 	goto restart;
566       }
567       setNdbError(err);
568       return NDBT_FAILED;
569     }
570 
571     closeTransaction(pNdb);
572 
573     g_info << rows << " rows have been updated" << endl;
574     return NDBT_OK;
575   }
576   abort(); /* Should never happen */
577   return NDBT_FAILED;
578 }
579 
580 int
scanUpdateRecords(Ndb * pNdb,int records,int abortPercent,int parallelism)581 HugoTransactions::scanUpdateRecords(Ndb* pNdb,
582 				    int records,
583 				    int abortPercent,
584 				    int parallelism){
585 
586   return scanUpdateRecords(pNdb,
587                            (NdbScanOperation::ScanFlag)0,
588                            records, abortPercent, parallelism);
589 }
590 
591 // Scan all records exclusive and update
592 // them one by one
593 int
scanUpdateRecords1(Ndb * pNdb,int records,int abortPercent,int parallelism)594 HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
595 				     int records,
596 				     int abortPercent,
597 				     int parallelism){
598   return scanUpdateRecords(pNdb,
599                            (NdbScanOperation::ScanFlag)0,
600                            records, abortPercent, 1);
601 }
602 
603 // Scan all records exclusive and update
604 // them batched by asking nextScanResult to
605 // give us all cached records before fetching new
606 // records from db
607 int
scanUpdateRecords2(Ndb * pNdb,int records,int abortPercent,int parallelism)608 HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
609 				     int records,
610 				     int abortPercent,
611 				     int parallelism){
612   return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
613                            records, abortPercent, parallelism);
614 }
615 
616 int
scanUpdateRecords3(Ndb * pNdb,int records,int abortPercent,int parallelism)617 HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
618 				     int records,
619 				     int abortPercent,
620 				     int parallelism)
621 {
622   return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
623                            records, abortPercent, parallelism);
624 }
625 
626 int
loadTable(Ndb * pNdb,int records,int batch,bool allowConstraintViolation,int doSleep,bool oneTrans,int value,bool abort,bool abort_on_first_error)627 HugoTransactions::loadTable(Ndb* pNdb,
628 			    int records,
629 			    int batch,
630 			    bool allowConstraintViolation,
631 			    int doSleep,
632                             bool oneTrans,
633 			    int value,
634 			    bool abort,
635                             bool abort_on_first_error)
636 {
637   return loadTableStartFrom(pNdb, 0, records, batch, allowConstraintViolation,
638                             doSleep, oneTrans, value, abort,
639                             abort_on_first_error);
640 }
641 
642 int
loadTableStartFrom(Ndb * pNdb,int startFrom,int records,int batch,bool allowConstraintViolation,int doSleep,bool oneTrans,int value,bool abort,bool abort_on_first_error)643 HugoTransactions::loadTableStartFrom(Ndb* pNdb,
644                                      int startFrom,
645                                      int records,
646                                      int batch,
647                                      bool allowConstraintViolation,
648                                      int doSleep,
649                                      bool oneTrans,
650                                      int value,
651                                      bool abort,
652                                      bool abort_on_first_error){
653   int             check;
654   int             retryAttempt = 0;
655   int             retryMax = 5;
656   bool            first_batch = true;
657 
658   const int org = batch;
659   const int cols = tab.getNoOfColumns();
660   const int brow = tab.getRowSizeInBytes();
661   const int bytes = 12 + brow + 4 * cols;
662   batch = (batch * 256); // -> 512 -> 65536k per commit
663   batch = batch/bytes;   //
664   batch = batch == 0 ? 1 : batch;
665 
666   if(batch != org){
667     g_info << "batch = " << org << " rowsize = " << bytes
668 	   << " -> rows/commit = " << batch << endl;
669   }
670 
671   //Uint32 orgbatch = batch;
672   g_info << "|- Inserting records..." << endl;
673   for (int c=0 ; c<records; ){
674     bool closeTrans = true;
675 
676     if(c + batch > records)
677       batch = records - c;
678 
679     if (retryAttempt >= retryMax){
680       g_info << "Record " << c << " could not be inserted, has retried "
681 	     << retryAttempt << " times " << endl;
682       // Reset retry counters and continue with next record
683       retryAttempt = 0;
684       c++;
685     }
686     if (doSleep > 0)
687       NdbSleep_MilliSleep(doSleep);
688 
689     //    if (first_batch || !oneTrans) {
690     if (first_batch || !pTrans) {
691       first_batch = false;
692       pTrans = pNdb->startTransaction();
693       if (pTrans == NULL) {
694         const NdbError err = pNdb->getNdbError();
695 
696         if (err.status == NdbError::TemporaryError){
697           NDB_ERR(err);
698 	  NdbSleep_MilliSleep(50);
699 	  retryAttempt++;
700 	  continue;
701         }
702         NDB_ERR(err);
703         setNdbError(err);
704         return NDBT_FAILED;
705       }
706     }
707 
708     if(pkInsertRecord(pNdb, c + startFrom, batch, value) != NDBT_OK)
709     {
710       NDB_ERR(pTrans->getNdbError());
711       setNdbError(pTrans->getNdbError());
712       closeTransaction(pNdb);
713       return NDBT_FAILED;
714     }
715 
716     // Execute the transaction and insert the record
717     if (!oneTrans || (c + batch) >= records) {
718       //      closeTrans = true;
719       closeTrans = false;
720       if (!abort)
721       {
722 	check = pTrans->execute(Commit, AbortOnError);
723 	if(check != -1)
724 	  pTrans->getGCI(&m_latest_gci);
725 	pTrans->restart();
726       }
727       else
728       {
729 	check = pTrans->execute(NoCommit, AbortOnError);
730 	if (check != -1)
731 	{
732 	  check = pTrans->execute( Rollback );
733 	  closeTransaction(pNdb);
734 	}
735       }
736     } else {
737       closeTrans = false;
738       check = pTrans->execute(NoCommit, AbortOnError);
739     }
740     if(check == -1 ) {
741       const NdbError err = pTrans->getNdbError();
742       closeTransaction(pNdb);
743       pTrans= 0;
744       switch(err.status){
745       case NdbError::Success:
746 	NDB_ERR(err);
747 	g_info << "ERROR: NdbError reports success when transcaction failed"
748 	       << endl;
749 	setNdbError(err);
750 	return NDBT_FAILED;
751 	break;
752 
753       case NdbError::TemporaryError:
754         if (abort_on_first_error)
755         {
756           return err.code;
757         }
758 	NDB_ERR(err);
759 	NdbSleep_MilliSleep(50);
760 	retryAttempt++;
761         batch = 1;
762 	continue;
763 	break;
764 
765       case NdbError::UnknownResult:
766 	NDB_ERR(err);
767 	setNdbError(err);
768 	return NDBT_FAILED;
769 	break;
770 
771       case NdbError::PermanentError:
772 	if (allowConstraintViolation == true){
773 	  switch (err.classification){
774 	  case NdbError::ConstraintViolation:
775 	    // Tuple already existed, OK but should be reported
776 	    g_info << c << ": " << err.code << " " << err.message << endl;
777 	    c++;
778 	    continue;
779 	    break;
780 	  default:
781 	    break;
782 	  }
783 	}
784 	NDB_ERR(err);
785 	setNdbError(err);
786 	return err.code;
787 	break;
788       }
789     }
790     else{
791       if (closeTrans) {
792         closeTransaction(pNdb);
793 	pTrans= 0;
794       }
795     }
796 
797     // Step to next record
798     c = c+batch;
799     retryAttempt = 0;
800   }
801 
802   if(pTrans)
803     closeTransaction(pNdb);
804   return NDBT_OK;
805 }
806 
807 int
fillTable(Ndb * pNdb,int batch)808 HugoTransactions::fillTable(Ndb* pNdb,
809                                      int batch){
810   return fillTableStartFrom(pNdb, 0, batch);
811 }
812 
813 int
fillTableStartFrom(Ndb * pNdb,int startFrom,int batch)814 HugoTransactions::fillTableStartFrom(Ndb* pNdb,
815                                      int startFrom,
816                                      int batch){
817   int             check;
818   int             retryFull = 0;
819   int             retryAttempt = 0;
820   int             retryMax = 5;
821 
822   const int org = batch;
823   const int cols = tab.getNoOfColumns();
824   const int brow = tab.getRowSizeInBytes();
825   const int bytes = 12 + brow + 4 * cols;
826   batch = (batch * 256); // -> 512 -> 65536k per commit
827   batch = batch/bytes;   //
828   batch = batch == 0 ? 1 : batch;
829 
830   if(batch != org){
831     g_info << "batch = " << org << " rowsize = " << bytes
832 	   << " -> rows/commit = " << batch << endl;
833   }
834 
835   for (int c=startFrom ; ; ){
836 
837     if (retryAttempt >= retryMax){
838       g_info << "Record " << c << " could not be inserted, has retried "
839 	     << retryAttempt << " times " << endl;
840       // Reset retry counters and continue with next record
841       retryAttempt = 0;
842       c++;
843     }
844 
845     pTrans = pNdb->startTransaction();
846     if (pTrans == NULL) {
847       const NdbError err = pNdb->getNdbError();
848 
849       if (err.status == NdbError::TemporaryError){
850 	NDB_ERR(err);
851 	NdbSleep_MilliSleep(50);
852 	retryAttempt++;
853 	continue;
854       }
855       NDB_ERR(err);
856       setNdbError(err);
857       return NDBT_FAILED;
858     }
859 
860     if(pkInsertRecord(pNdb, c, batch) != NDBT_OK)
861     {
862       NDB_ERR(pTrans->getNdbError());
863       setNdbError(pTrans->getNdbError());
864       closeTransaction(pNdb);
865       return NDBT_FAILED;
866     }
867 
868     // Execute the transaction and insert the record
869     check = pTrans->execute(Commit, CommitAsMuchAsPossible);
870     const NdbError err = pTrans->getNdbError();
871     if(check == -1 || err.code != 0) {
872       closeTransaction(pNdb);
873 
874       switch(err.status){
875       case NdbError::Success:
876 	NDB_ERR(err);
877 	setNdbError(err);
878 	g_info << "ERROR: NdbError reports success when transcaction failed"
879 	       << endl;
880 	return NDBT_FAILED;
881 	break;
882 
883       case NdbError::TemporaryError:
884 	NDB_ERR(err);
885 	NdbSleep_MilliSleep(50);
886 	retryAttempt++;
887 	continue;
888 	break;
889 
890       case NdbError::UnknownResult:
891 	NDB_ERR(err);
892 	setNdbError(err);
893 	return NDBT_FAILED;
894 	break;
895 
896       case NdbError::PermanentError:
897 	//  if (allowConstraintViolation == true){
898 	//    switch (err.classification){
899 	//    case NdbError::ConstraintViolation:
900 	//      // Tuple already existed, OK but should be reported
901 	//      g_info << c << ": " << err.code << " " << err.message << endl;
902 	//      c++;
903 	//      continue;
904 	//      break;
905 	//    default:
906 	//      break;es
907 	//     }
908 	//   }
909 
910 	// Check if this is the "db full" error
911 	if (err.classification==NdbError::InsufficientSpace){
912           // Datamemory might have been released by abort of
913           // batch insert. Retry fill with a smaller batch
914           // in order to ensure table is filled to last row.
915           if (batch > 1){
916             c = c+batch;
917             batch = batch/2;
918             continue;
919           }
920           // Only some datanodes might be full. Retry with
921           // another record until we are *really sure* that
922           // all datanodes are full.
923           if (retryFull < 64) {
924             retryFull++;
925             c++;
926 	     continue;
927           }
928 
929 	  NDB_ERR(err);
930 	  return NDBT_OK;
931 	}
932 
933 	if (err.classification == NdbError::ConstraintViolation){
934 	  NDB_ERR(err);
935 	  break;
936 	}
937 	NDB_ERR(err);
938 	setNdbError(err);
939 	return NDBT_FAILED;
940 	break;
941       }
942     }
943     else{
944       pTrans->getGCI(&m_latest_gci);
945       closeTransaction(pNdb);
946     }
947 
948     // Step to next record
949     c = c+batch;
950     retryAttempt = 0;
951     retryFull = 0;
952   }
953   return NDBT_OK;
954 }
955 
956 int
pkReadRecords(Ndb * pNdb,int records,int batch,NdbOperation::LockMode lm,int _rand)957 HugoTransactions::pkReadRecords(Ndb* pNdb,
958 				int records,
959 				int batch,
960 				NdbOperation::LockMode lm,
961                                 int _rand){
962   int                  reads = 0;
963   int                  r = 0;
964   int                  retryAttempt = 0;
965   int                  check;
966 
967   if (batch == 0) {
968     g_err << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed.";
969     g_err << "Line: " << __LINE__ << endl;
970     return NDBT_FAILED;
971   }
972 
973   while (r < records){
974     if(r + batch > records)
975       batch = records - r;
976 
977     if (retryAttempt >= m_retryMax){
978       g_err << "ERROR: has retried this operation " << retryAttempt
979 	     << " times, failing!, line: " << __LINE__ << endl;
980       return NDBT_FAILED;
981     }
982 
983     pTrans = pNdb->startTransaction();
984     if (pTrans == NULL) {
985       const NdbError err = pNdb->getNdbError();
986 
987       if (err.status == NdbError::TemporaryError){
988 	NDB_ERR(err);
989 	NdbSleep_MilliSleep(500);
990 	retryAttempt++;
991 	continue;
992       }
993       NDB_ERR(err);
994       setNdbError(err);
995       return NDBT_FAILED;
996     }
997     retryAttempt = 0;
998 
999     NDB_TICKS timer_start;
1000     NDB_TICKS timer_stop;
1001     bool timer_active =
1002       m_stats_latency != 0 &&
1003       r >= batch &&             // first batch is "warmup"
1004       r + batch != records;     // last batch is usually partial
1005 
1006     if (timer_active)
1007       timer_start = NdbTick_getCurrentTicks();
1008 
1009     NdbOperation::LockMode lmused;
1010     if (_rand == 0)
1011     {
1012       if(pkReadRecord(pNdb, r, batch, lm, &lmused) != NDBT_OK)
1013       {
1014         NDB_ERR(pTrans->getNdbError());
1015         setNdbError(pTrans->getNdbError());
1016         closeTransaction(pNdb);
1017         return NDBT_FAILED;
1018       }
1019     }
1020     else
1021     {
1022       if(pkReadRandRecord(pNdb, records, batch, lm, &lmused) != NDBT_OK)
1023       {
1024         NDB_ERR(pTrans->getNdbError());
1025         setNdbError(pTrans->getNdbError());
1026         closeTransaction(pNdb);
1027         return NDBT_FAILED;
1028       }
1029     }
1030 
1031     check = pTrans->execute(Commit, AbortOnError);
1032 
1033     if (check != -1 && lmused == NdbOperation::LM_CommittedRead)
1034     {
1035       /**
1036        * LM_CommittedRead will not abort transaction
1037        *   even if doing execute(AbortOnError);
1038        *   so also check pTrans->getNdbError() in this case
1039        */
1040       if (pTrans->getNdbError().status != NdbError::Success)
1041       {
1042         check = -1;
1043       }
1044     }
1045 
1046     if( check == -1 ) {
1047       const NdbError err = pTrans->getNdbError();
1048 
1049       if (err.status == NdbError::TemporaryError){
1050 	NDB_ERR(err);
1051 	closeTransaction(pNdb);
1052 	NdbSleep_MilliSleep(50);
1053 	retryAttempt++;
1054 	continue;
1055       }
1056       switch(err.code){
1057       case 626: // Tuple did not exist
1058 	g_info << r << ": " << err.code << " " << err.message << endl;
1059 	r++;
1060 	break;
1061 
1062       default:
1063 	NDB_ERR(err);
1064 	setNdbError(err);
1065 	closeTransaction(pNdb);
1066 	return NDBT_FAILED;
1067       }
1068     } else {
1069 
1070       /**
1071        * Extra debug aid:
1072        * We do not (yet) expect any transaction or operation
1073        * errors if ::execute() does not return with error.
1074        */
1075       const NdbError err1 = pTrans->getNdbError();
1076       if (err1.code)
1077       {
1078         ndbout << "BEWARE: HugoTransactions::pkReadRecords"
1079                << ", execute succeeded with Trans error: " << err1.code
1080                << endl;
1081 
1082       }
1083       const NdbOperation* pOp = pTrans->getNdbErrorOperation();
1084       if (pOp != NULL)
1085       {
1086         const NdbError err2 = pOp->getNdbError();
1087         ndbout << "BEWARE HugoTransactions::pkReadRecords"
1088              << ", NdbOperation error: " << err2.code
1089              << endl;
1090       }
1091 
1092       retryAttempt = 0;
1093       if(indexScans.size() > 0)
1094       {
1095         /* Index scan used to read records....*/
1096 	int rows_found = 0;
1097         for (Uint32 scanOp=0; scanOp < indexScans.size(); scanOp++)
1098         {
1099           while((check = indexScans[scanOp]->nextResult()) == 0)
1100           {
1101             rows_found++;
1102             if (calc.verifyRowValues(rows[0]) != 0){
1103               closeTransaction(pNdb);
1104               g_err << "Line: " << __LINE__ << " verify row failed" << endl;
1105               return NDBT_FAILED;
1106             }
1107           }
1108         }
1109 	if(check != 1 || rows_found > batch)
1110 	{
1111 	  closeTransaction(pNdb);
1112           g_err << "Line: " << __LINE__ << " check rows failed" << endl;
1113 	  return NDBT_FAILED;
1114 	}
1115 	else if(rows_found < batch)
1116 	{
1117 	  if(batch == 1){
1118 	    g_info << r << ": not found" << endl; abort(); }
1119 	  else
1120 	    g_info << "Found " << rows_found << " of "
1121 		   << batch << " rows" << endl;
1122 	}
1123 	r += batch;
1124 	reads += rows_found;
1125       }
1126       else
1127       {
1128 	for (int b=0; (b<batch) && (r+b<records); b++){
1129 	  if (calc.verifyRowValues(rows[b]) != 0){
1130 	    closeTransaction(pNdb);
1131             g_err << "Line: " << __LINE__
1132                   << " verify row failed"
1133                   << ", record: " << r << " of: " << records
1134                   << ", row: " << b << " in a batch of: " << batch
1135                   << endl;
1136 	    return NDBT_FAILED;
1137 	  }
1138 	  reads++;
1139 	  r++;
1140 	}
1141       }
1142     }
1143 
1144     closeTransaction(pNdb);
1145 
1146     if (timer_active) {
1147       timer_stop = NdbTick_getCurrentTicks();
1148       Uint64 elapsed = NdbTick_Elapsed(timer_start, timer_stop).microSec();
1149       m_stats_latency->addObservation((double)elapsed);
1150     }
1151   }
1152   deallocRows();
1153   indexScans.clear();
1154   g_info << reads << " records read" << endl;
1155   return NDBT_OK;
1156 }
1157 
1158 
1159 
1160 int
pkUpdateRecords(Ndb * pNdb,int records,int batch,int doSleep)1161 HugoTransactions::pkUpdateRecords(Ndb* pNdb,
1162 				  int records,
1163 				  int batch,
1164 				  int doSleep){
1165   int updated = 0;
1166   int                  r = 0;
1167   int                  retryAttempt = 0;
1168   int                  check, b;
1169 
1170   allocRows(batch);
1171 
1172   g_info << "|- Updating records (batch=" << batch << ")..." << endl;
1173   int batch_no = 0;
1174   while (r < records){
1175     if(r + batch > records)
1176       batch = records - r;
1177 
1178     if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
1179     {
1180       r += batch;
1181       batch_no++;
1182       continue;
1183     }
1184 
1185     if (retryAttempt >= m_retryMax){
1186       g_err << "ERROR: has retried this operation " << retryAttempt
1187 	     << " times, failing!, line: " << __LINE__ << endl;
1188       return NDBT_FAILED;
1189     }
1190 
1191     if (doSleep > 0)
1192       NdbSleep_MilliSleep(doSleep);
1193 
1194     pTrans = pNdb->startTransaction();
1195     if (pTrans == NULL) {
1196       const NdbError err = pNdb->getNdbError();
1197 
1198       if (err.status == NdbError::TemporaryError){
1199 	NDB_ERR(err);
1200 	NdbSleep_MilliSleep(50);
1201 	retryAttempt++;
1202 	continue;
1203       }
1204       NDB_ERR(err);
1205       setNdbError(err);
1206       return NDBT_FAILED;
1207     }
1208 
1209     if(pkReadRecord(pNdb, r, batch, NdbOperation::LM_Exclusive) != NDBT_OK)
1210     {
1211       NDB_ERR(pTrans->getNdbError());
1212       setNdbError(pTrans->getNdbError());
1213       closeTransaction(pNdb);
1214       return NDBT_FAILED;
1215     }
1216 
1217     check = pTrans->execute(NoCommit, AbortOnError);
1218     if( check == -1 ) {
1219       const NdbError err = pTrans->getNdbError();
1220 
1221       if (err.status == NdbError::TemporaryError){
1222 	NDB_ERR(err);
1223 	closeTransaction(pNdb);
1224 	NdbSleep_MilliSleep(50);
1225 	retryAttempt++;
1226 	continue;
1227       }
1228       NDB_ERR(err);
1229       setNdbError(err);
1230       closeTransaction(pNdb);
1231       return NDBT_FAILED;
1232     }
1233 
1234     NDB_TICKS timer_start;
1235     NDB_TICKS timer_stop;
1236     bool timer_active =
1237       m_stats_latency != 0 &&
1238       r >= batch &&             // first batch is "warmup"
1239       r + batch != records;     // last batch is usually partial
1240 
1241     if (timer_active)
1242       timer_start = NdbTick_getCurrentTicks();
1243 
1244     int rows_found = 0;
1245 
1246     if(indexScans.size() > 0)
1247     {
1248       /* Index scans used to read records */
1249       for (Uint32 scanOp=0; scanOp < indexScans.size(); scanOp++)
1250       {
1251         while((check = indexScans[scanOp]->nextResult(true)) == 0)
1252         {
1253           do {
1254 
1255             if (calc.verifyRowValues(rows[0]) != 0){
1256               g_err << "Row validation failure, line: " << __LINE__ << endl;
1257               closeTransaction(pNdb);
1258               return NDBT_FAILED;
1259             }
1260 
1261             int updates = calc.getUpdatesValue(rows[0]) + (m_empty_update? 0 : 1);
1262 
1263             /* Rows may not arrive in the order they were requested
1264              * (When multiple partitions scanned without ORDERBY)
1265              * therefore we use the id from the row to update it
1266              */
1267             const Uint32 rowId= calc.getIdValue(rows[0]);
1268             if(pkUpdateRecord(pNdb, rowId, 1, updates) != NDBT_OK)
1269             {
1270               NDB_ERR(pTrans->getNdbError());
1271               setNdbError(pTrans->getNdbError());
1272               closeTransaction(pNdb);
1273               return NDBT_FAILED;
1274             }
1275             rows_found++;
1276           } while((check = indexScans[scanOp]->nextResult(false)) == 0);
1277 
1278           if(check != 2)
1279             break;
1280           if((check = pTrans->execute(NoCommit, AbortOnError)) != 0)
1281             break;
1282         } // Next fetch on this scan op...
1283 
1284         if(check != 1)
1285         {
1286           g_err << "Check failed, line: " << __LINE__ << endl;
1287           closeTransaction(pNdb);
1288           return NDBT_FAILED;
1289         }
1290       } // Next scan op...
1291 
1292       if (rows_found != batch)
1293       {
1294         g_err << "Incorrect num of rows found.  Expected "
1295                << batch << ". Found " << rows_found << endl;
1296         g_err << "Line: " << __LINE__ << endl;
1297         closeTransaction(pNdb);
1298         return NDBT_FAILED;
1299       }
1300     }
1301     else
1302     {
1303       for(b = 0; b<batch && (b+r)<records; b++)
1304       {
1305 	if (calc.verifyRowValues(rows[b]) != 0)
1306 	{
1307 	  closeTransaction(pNdb);
1308           g_err << "Line: " << __LINE__ << " verify row failed" << endl;
1309 	  return NDBT_FAILED;
1310 	}
1311 
1312 	int updates = calc.getUpdatesValue(rows[b]) + (m_empty_update? 0 : 1);
1313 
1314 	if(pkUpdateRecord(pNdb, r+b, 1, updates) != NDBT_OK)
1315 	{
1316 	  NDB_ERR(pTrans->getNdbError());
1317 	  setNdbError(pTrans->getNdbError());
1318 	  closeTransaction(pNdb);
1319 	  return NDBT_FAILED;
1320 	}
1321       }
1322       check = pTrans->execute(Commit, AbortOnError);
1323     }
1324     if( check == -1 ) {
1325       const NdbError err = pTrans->getNdbError();
1326 
1327       if (err.status == NdbError::TemporaryError){
1328 	NDB_ERR(err);
1329 	closeTransaction(pNdb);
1330 	NdbSleep_MilliSleep(50);
1331 	retryAttempt++;
1332 	continue;
1333       }
1334       NDB_ERR(err);
1335       setNdbError(err);
1336       ndbout << "r = " << r << endl;
1337       closeTransaction(pNdb);
1338       return NDBT_FAILED;
1339     }
1340     else{
1341       updated += batch;
1342       pTrans->getGCI(&m_latest_gci);
1343     }
1344 
1345     closeTransaction(pNdb);
1346 
1347     if (timer_active) {
1348       timer_stop = NdbTick_getCurrentTicks();
1349       Uint64 elapsed = NdbTick_Elapsed(timer_start, timer_stop).microSec();
1350       m_stats_latency->addObservation((double)elapsed);
1351     }
1352 
1353     r += batch; // Read next record
1354     batch_no++;
1355   }
1356 
1357   deallocRows();
1358   indexScans.clear();
1359   g_info << "|- " << updated << " records updated" << endl;
1360   return NDBT_OK;
1361 }
1362 
1363 int
pkInterpretedUpdateRecords(Ndb * pNdb,int records,int batch)1364 HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb,
1365 					     int records,
1366 					     int batch){
1367   int updated = 0;
1368   int r = 0;
1369   int retryAttempt = 0;
1370   int check, a;
1371 
1372   while (r < records){
1373 
1374     if (retryAttempt >= m_retryMax){
1375       g_err << "ERROR: has retried this operation " << retryAttempt
1376 	     << " times, failing!, line: " << __LINE__ << endl;
1377       return NDBT_FAILED;
1378     }
1379 
1380     pTrans = pNdb->startTransaction();
1381     if (pTrans == NULL) {
1382       const NdbError err = pNdb->getNdbError();
1383 
1384       if (err.status == NdbError::TemporaryError){
1385 	NDB_ERR(err);
1386 	NdbSleep_MilliSleep(50);
1387 	retryAttempt++;
1388 	continue;
1389       }
1390       NDB_ERR(err);
1391       setNdbError(err);
1392       return NDBT_FAILED;
1393     }
1394 
1395    NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
1396    if (pOp == NULL) {
1397      NDB_ERR(pTrans->getNdbError());
1398      setNdbError(pTrans->getNdbError());
1399      closeTransaction(pNdb);
1400      return NDBT_FAILED;
1401    }
1402 
1403    check = pOp->readTupleExclusive();
1404    if( check == -1 ) {
1405      NDB_ERR(pTrans->getNdbError());
1406      setNdbError(pTrans->getNdbError());
1407      closeTransaction(pNdb);
1408      return NDBT_FAILED;
1409    }
1410 
1411    // Define primary keys
1412    if (equalForRow(pOp, r) != 0)
1413    {
1414      closeTransaction(pNdb);
1415      g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
1416      return NDBT_FAILED;
1417    }
1418 
1419    // Read update value
1420    for(a = 0; a<tab.getNoOfColumns(); a++){
1421      if (calc.isUpdateCol(a) == true){
1422        if((row.attributeStore(a) =
1423 	   pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1424 	 NDB_ERR(pTrans->getNdbError());
1425 	 setNdbError(pTrans->getNdbError());
1426 	 closeTransaction(pNdb);
1427 	 return NDBT_FAILED;
1428        }
1429      }
1430    }
1431 
1432     check = pTrans->execute(NoCommit, AbortOnError);
1433     if( check == -1 ) {
1434       const NdbError err = pTrans->getNdbError();
1435 
1436       if (err.status == NdbError::TemporaryError){
1437 	NDB_ERR(err);
1438 	closeTransaction(pNdb);
1439 	NdbSleep_MilliSleep(50);
1440 	retryAttempt++;
1441 	continue;
1442       }
1443       NDB_ERR(err);
1444       setNdbError(err);
1445       closeTransaction(pNdb);
1446       return NDBT_FAILED;
1447     }
1448 
1449     int updates = calc.getUpdatesValue(&row) + (m_empty_update? 0 : 1);
1450 
1451     NdbOperation* pUpdOp;
1452     pUpdOp = pTrans->getNdbOperation(tab.getName());
1453     if (pUpdOp == NULL) {
1454       NDB_ERR(pTrans->getNdbError());
1455       setNdbError(pTrans->getNdbError());
1456       closeTransaction(pNdb);
1457       return NDBT_FAILED;
1458     }
1459 
1460     check = pUpdOp->interpretedUpdateTuple();
1461     if( check == -1 ) {
1462       NDB_ERR(pTrans->getNdbError());
1463       setNdbError(pTrans->getNdbError());
1464       closeTransaction(pNdb);
1465       return NDBT_FAILED;
1466     }
1467 
1468     // PKs
1469     if (equalForRow(pUpdOp, r) != 0)
1470     {
1471       closeTransaction(pNdb);
1472        g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
1473       return NDBT_FAILED;
1474     }
1475 
1476     // Update col
1477     for(a = 0; a<tab.getNoOfColumns(); a++){
1478       if ((tab.getColumn(a)->getPrimaryKey() == false) &&
1479 	  (calc.isUpdateCol(a) == true)){
1480 
1481 	// TODO switch for 32/64 bit
1482 	const NdbDictionary::Column* attr = tab.getColumn(a);
1483 	Uint32 valToIncWith = 1;
1484 	check = pUpdOp->incValue(attr->getName(), valToIncWith);
1485 	if( check == -1 ) {
1486 	  NDB_ERR(pTrans->getNdbError());
1487 	  setNdbError(pTrans->getNdbError());
1488 	  closeTransaction(pNdb);
1489 	  return NDBT_FAILED;
1490 	}
1491       }
1492     }
1493 
1494     // Remaining attributes
1495     for(a = 0; a<tab.getNoOfColumns(); a++){
1496       if ((tab.getColumn(a)->getPrimaryKey() == false) &&
1497 	  (calc.isUpdateCol(a) == false)){
1498 	if(setValueForAttr(pUpdOp, a, r, updates ) != 0){
1499 	  NDB_ERR(pTrans->getNdbError());
1500 	  setNdbError(pTrans->getNdbError());
1501 	  closeTransaction(pNdb);
1502 	  return NDBT_FAILED;
1503 	}
1504       }
1505     }
1506 
1507 
1508 
1509     check = pTrans->execute(Commit, AbortOnError);
1510     if( check == -1 ) {
1511       const NdbError err = pTrans->getNdbError();
1512 
1513       if (err.status == NdbError::TemporaryError){
1514 	NDB_ERR(err);
1515 	closeTransaction(pNdb);
1516 	NdbSleep_MilliSleep(50);
1517 	retryAttempt++;
1518 	continue;
1519       }
1520       NDB_ERR(err);
1521       setNdbError(err);
1522       ndbout << "r = " << r << endl;
1523       closeTransaction(pNdb);
1524       return NDBT_FAILED;
1525     }
1526     else{
1527       updated++;
1528       pTrans->getGCI(&m_latest_gci);
1529     }
1530 
1531 
1532     closeTransaction(pNdb);
1533 
1534     r++; // Read next record
1535 
1536   }
1537 
1538   g_info << "|- " << updated << " records updated" << endl;
1539   return NDBT_OK;
1540 }
1541 
1542 int
pkDelRecords(Ndb * pNdb,int records,int batch,bool allowConstraintViolation,int doSleep)1543 HugoTransactions::pkDelRecords(Ndb* pNdb,
1544 			       int records,
1545 			       int batch,
1546 			       bool allowConstraintViolation,
1547 			       int doSleep){
1548   // TODO Batch is not implemented
1549   int deleted = 0;
1550   int                  r = 0;
1551   int                  retryAttempt = 0;
1552   int                  check;
1553 
1554   g_info << "|- Deleting records..." << endl;
1555   int batch_no = 0;
1556   while (r < records){
1557     if(r + batch > records)
1558       batch = records - r;
1559 
1560     if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
1561     {
1562       r += batch;
1563       batch_no++;
1564       continue;
1565     }
1566 
1567     if (retryAttempt >= m_retryMax){
1568       g_err << "ERROR: has retried this operation " << retryAttempt
1569 	     << " times, failing!, line: " << __LINE__ << endl;
1570       return NDBT_FAILED;
1571     }
1572 
1573     if (doSleep > 0)
1574       NdbSleep_MilliSleep(doSleep);
1575 
1576     pTrans = pNdb->startTransaction();
1577     if (pTrans == NULL) {
1578       const NdbError err = pNdb->getNdbError();
1579 
1580       if (err.status == NdbError::TemporaryError){
1581 	NDB_ERR(err);
1582 	NdbSleep_MilliSleep(50);
1583 	retryAttempt++;
1584 	continue;
1585       }
1586       NDB_ERR(err);
1587       setNdbError(err);
1588       return NDBT_FAILED;
1589     }
1590 
1591     NDB_TICKS timer_start;
1592     NDB_TICKS timer_stop;
1593     bool timer_active =
1594       m_stats_latency != 0 &&
1595       r >= batch &&             // first batch is "warmup"
1596       r + batch != records;     // last batch is usually partial
1597 
1598     if (timer_active)
1599       timer_start = NdbTick_getCurrentTicks();
1600 
1601     if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)
1602     {
1603       NDB_ERR(pTrans->getNdbError());
1604       setNdbError(pTrans->getNdbError());
1605       closeTransaction(pNdb);
1606       return NDBT_FAILED;
1607     }
1608 
1609     check = pTrans->execute(Commit, AbortOnError);
1610     if( check == -1) {
1611       const NdbError err = pTrans->getNdbError();
1612 
1613       switch(err.status){
1614       case NdbError::TemporaryError:
1615 	NDB_ERR(err);
1616 	closeTransaction(pNdb);
1617 	NdbSleep_MilliSleep(50);
1618 	retryAttempt++;
1619 	continue;
1620 	break;
1621 
1622       case NdbError::PermanentError:
1623 	if (allowConstraintViolation == true){
1624 	  switch (err.classification){
1625 	  case NdbError::ConstraintViolation:
1626 	    // Tuple did not exist, OK but should be reported
1627 	    g_info << r << ": " << err.code << " " << err.message << endl;
1628 	    continue;
1629 	    break;
1630 	  default:
1631 	    break;
1632 	  }
1633 	}
1634 	NDB_ERR(err);
1635 	setNdbError(err);
1636 	closeTransaction(pNdb);
1637 	return NDBT_FAILED;
1638 	break;
1639 
1640       default:
1641 	NDB_ERR(err);
1642 	setNdbError(err);
1643 	closeTransaction(pNdb);
1644 	return NDBT_FAILED;
1645       }
1646     }
1647     else {
1648       deleted += batch;
1649       pTrans->getGCI(&m_latest_gci);
1650     }
1651     closeTransaction(pNdb);
1652 
1653     if (timer_active) {
1654       timer_stop = NdbTick_getCurrentTicks();
1655       Uint64 elapsed = NdbTick_Elapsed(timer_start, timer_stop).microSec();
1656       m_stats_latency->addObservation((double)elapsed);
1657     }
1658 
1659     r += batch; // Read next record
1660     batch_no++;
1661   }
1662 
1663   g_info << "|- " << deleted << " records deleted" << endl;
1664   return NDBT_OK;
1665 }
1666 
1667 int
pkRefreshRecords(Ndb * pNdb,int startFrom,int count,int batch)1668 HugoTransactions::pkRefreshRecords(Ndb* pNdb,
1669                                    int startFrom,
1670                                    int count,
1671                                    int batch)
1672 {
1673   int r = 0;
1674   int retryAttempt = 0;
1675 
1676   g_info << "|- Refreshing records..." << startFrom << "-" << (startFrom+count)
1677          << " (batch=" << batch << ")" << endl;
1678 
1679   while (r < count)
1680   {
1681     if(r + batch > count)
1682       batch = count - r;
1683 
1684     if (retryAttempt >= m_retryMax)
1685     {
1686       g_err << "ERROR: has retried this operation " << retryAttempt
1687 	     << " times, failing!, line: " << __LINE__ << endl;
1688       return NDBT_FAILED;
1689     }
1690 
1691     pTrans = pNdb->startTransaction();
1692     if (pTrans == NULL)
1693     {
1694       const NdbError err = pNdb->getNdbError();
1695 
1696       if (err.status == NdbError::TemporaryError){
1697 	NDB_ERR(err);
1698 	NdbSleep_MilliSleep(50);
1699 	retryAttempt++;
1700 	continue;
1701       }
1702       NDB_ERR(err);
1703       return NDBT_FAILED;
1704     }
1705 
1706     if (pkRefreshRecord(pNdb, r, batch) != NDBT_OK)
1707     {
1708       NDB_ERR(pTrans->getNdbError());
1709       closeTransaction(pNdb);
1710       return NDBT_FAILED;
1711     }
1712 
1713     if (pTrans->execute(Commit, AbortOnError) == -1)
1714     {
1715       const NdbError err = pTrans->getNdbError();
1716 
1717       switch(err.status){
1718       case NdbError::TemporaryError:
1719 	NDB_ERR(err);
1720 	closeTransaction(pNdb);
1721 	NdbSleep_MilliSleep(50);
1722 	retryAttempt++;
1723 	continue;
1724 	break;
1725 
1726       default:
1727 	NDB_ERR(err);
1728 	closeTransaction(pNdb);
1729 	return NDBT_FAILED;
1730       }
1731     }
1732 
1733     closeTransaction(pNdb);
1734     r += batch; // Read next record
1735   }
1736 
1737   return NDBT_OK;
1738 }
1739 
1740 int
pkReadUnlockRecords(Ndb * pNdb,int records,int batch,NdbOperation::LockMode lm)1741 HugoTransactions::pkReadUnlockRecords(Ndb* pNdb,
1742                                       int records,
1743                                       int batch,
1744                                       NdbOperation::LockMode lm)
1745 {
1746   int                  reads = 0;
1747   int                  r = 0;
1748   int                  retryAttempt = 0;
1749   int                  check;
1750 
1751   if (batch == 0) {
1752     g_err << "ERROR: Argument batch == 0 in pkReadRecords(). Not allowed.";
1753     g_err << " line: " << __LINE__ << endl;
1754     return NDBT_FAILED;
1755   }
1756 
1757   if (idx != NULL) {
1758     g_err << "ERROR: Cannot call pkReadUnlockRecords for index";
1759     g_err << " line: " << __LINE__ << endl;
1760     return NDBT_FAILED;
1761   }
1762 
1763   while (r < records){
1764     if(r + batch > records)
1765       batch = records - r;
1766 
1767     if (retryAttempt >= m_retryMax){
1768       g_err << "ERROR: has retried this operation " << retryAttempt
1769 	     << " times, failing!, line: " << __LINE__ << endl;
1770       return NDBT_FAILED;
1771     }
1772 
1773     pTrans = pNdb->startTransaction();
1774     if (pTrans == NULL) {
1775       const NdbError err = pNdb->getNdbError();
1776 
1777       if (err.status == NdbError::TemporaryError){
1778 	NDB_ERR(err);
1779 	NdbSleep_MilliSleep(50);
1780 	retryAttempt++;
1781 	continue;
1782       }
1783       NDB_ERR(err);
1784       return NDBT_FAILED;
1785     }
1786 
1787     NDB_TICKS timer_start;
1788     NDB_TICKS timer_stop;
1789     bool timer_active =
1790       m_stats_latency != 0 &&
1791       r >= batch &&             // first batch is "warmup"
1792       r + batch != records;     // last batch is usually partial
1793 
1794     if (timer_active)
1795       timer_start = NdbTick_getCurrentTicks();
1796 
1797     Vector<const NdbLockHandle*> lockHandles;
1798 
1799     NdbOperation::LockMode lmused;
1800     if(pkReadRecordLockHandle(pNdb, lockHandles, r, batch, lm, &lmused) != NDBT_OK)
1801     {
1802       NDB_ERR(pTrans->getNdbError());
1803       closeTransaction(pNdb);
1804       return NDBT_FAILED;
1805     }
1806 
1807     check = pTrans->execute(NoCommit, AbortOnError);
1808 
1809     if( check == -1 ) {
1810       const NdbError err = pTrans->getNdbError();
1811 
1812       if (err.status == NdbError::TemporaryError){
1813 	NDB_ERR(err);
1814 	closeTransaction(pNdb);
1815 	NdbSleep_MilliSleep(50);
1816 	retryAttempt++;
1817 	continue;
1818       }
1819       switch(err.code){
1820       case 626: // Tuple did not exist
1821 	g_info << r << ": " << err.code << " " << err.message << endl;
1822 	r++;
1823 	break;
1824 
1825       default:
1826 	NDB_ERR(err);
1827 	closeTransaction(pNdb);
1828 	return NDBT_FAILED;
1829       }
1830     } else {
1831       /* Execute succeeded ok */
1832       for (int b=0; (b<batch) && (r+b<records); b++){
1833         if (calc.verifyRowValues(rows[b]) != 0){
1834           closeTransaction(pNdb);
1835           g_err << "Line: " << __LINE__ << " verify row failed" << endl;
1836           return NDBT_FAILED;
1837         }
1838         reads++;
1839         r++;
1840       }
1841 
1842       if (pkUnlockRecord(pNdb,
1843                          lockHandles) != NDBT_OK)
1844       {
1845         closeTransaction(pNdb);
1846         g_err << "Line: " << __LINE__ << " unlock row failed" << endl;
1847         return NDBT_FAILED;
1848       }
1849 
1850       check = pTrans->execute(Commit, AbortOnError);
1851 
1852       if (check == -1 )
1853       {
1854         const NdbError err = pTrans->getNdbError();
1855 
1856         if (err.status == NdbError::TemporaryError){
1857           NDB_ERR(err);
1858           closeTransaction(pNdb);
1859           NdbSleep_MilliSleep(50);
1860           retryAttempt++;
1861           continue;
1862         }
1863         NDB_ERR(err);
1864         closeTransaction(pNdb);
1865         return NDBT_FAILED;
1866       }
1867     }
1868 
1869     closeTransaction(pNdb);
1870 
1871     if (timer_active) {
1872       timer_stop = NdbTick_getCurrentTicks();
1873       Uint64 elapsed = NdbTick_Elapsed(timer_start, timer_stop).microSec();
1874       m_stats_latency->addObservation((double)elapsed);
1875     }
1876   }
1877   deallocRows();
1878   g_info << reads << " records read" << endl;
1879   return NDBT_OK;
1880 }
1881 
1882 
1883 int
lockRecords(Ndb * pNdb,int records,int percentToLock,int lockTime)1884 HugoTransactions::lockRecords(Ndb* pNdb,
1885 			      int records,
1886 			      int percentToLock,
1887 			      int lockTime){
1888   // Place a lock on percentToLock% of the records in the Db
1889   // Keep the locks for lockTime ms, commit operation
1890   // and lock som other records
1891   int                  r = 0;
1892   int                  retryAttempt = 0;
1893   int                  check;
1894   NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
1895 
1896   // Calculate how many records to lock in each batch
1897   if (percentToLock <= 0)
1898     percentToLock = 1;
1899   double percentVal = (double)percentToLock / 100;
1900   int lockBatch = (int)(records * percentVal);
1901   if (lockBatch <= 0)
1902     lockBatch = 1;
1903 
1904   allocRows(lockBatch);
1905 
1906   while (r < records){
1907     if(r + lockBatch > records)
1908       lockBatch = records - r;
1909 
1910     g_info << "|- Locking " << lockBatch << " records..." << endl;
1911 
1912     if (retryAttempt >= m_retryMax){
1913       g_err << "ERROR: has retried this operation " << retryAttempt
1914 	     << " times, failing!, line: " << __LINE__ << endl;
1915       return NDBT_FAILED;
1916     }
1917 
1918     pTrans = pNdb->startTransaction();
1919     if (pTrans == NULL) {
1920       const NdbError err = pNdb->getNdbError();
1921 
1922       if (err.status == NdbError::TemporaryError){
1923 	NDB_ERR(err);
1924 	NdbSleep_MilliSleep(50);
1925 	retryAttempt++;
1926 	continue;
1927       }
1928       NDB_ERR(err);
1929       setNdbError(err);
1930       return NDBT_FAILED;
1931     }
1932 
1933     if(pkReadRecord(pNdb, r, lockBatch, lm) != NDBT_OK)
1934     {
1935       NDB_ERR(pTrans->getNdbError());
1936       setNdbError(pTrans->getNdbError());
1937       closeTransaction(pNdb);
1938       return NDBT_FAILED;
1939     }
1940 
1941     // NoCommit lockTime times with 100 millis interval
1942     int sleepInterval = 50;
1943     int lockCount = lockTime / sleepInterval;
1944     int commitCount = 0;
1945     bool tempErr = false;
1946     do {
1947       check = pTrans->execute(NoCommit, AbortOnError);
1948       if( check == -1) {
1949 	const NdbError err = pTrans->getNdbError();
1950 
1951 	if (err.status == NdbError::TemporaryError){
1952 	  NDB_ERR(err);
1953 	  closeTransaction(pNdb);
1954 	  NdbSleep_MilliSleep(50);
1955 	  tempErr = true;
1956           retryAttempt++;
1957           break;
1958 	}
1959 	NDB_ERR(err);
1960 	setNdbError(err);
1961 	closeTransaction(pNdb);
1962 	return NDBT_FAILED;
1963       }
1964       for (int b=0; (b<lockBatch) && (r+b<records); b++){
1965 	if (calc.verifyRowValues(rows[b]) != 0){
1966 	  closeTransaction(pNdb);
1967           g_err << "Line: " << __LINE__ << " verify row failed" << endl;
1968 	  return NDBT_FAILED;
1969 	}
1970       }
1971       commitCount++;
1972       NdbSleep_MilliSleep(sleepInterval);
1973     } while (commitCount < lockCount);
1974 
1975     if (tempErr)
1976       continue; /* Retry lock attempt */
1977 
1978     // Really commit the trans, puuh!
1979     check = pTrans->execute(Commit, AbortOnError);
1980     if( check == -1) {
1981       const NdbError err = pTrans->getNdbError();
1982 
1983       if (err.status == NdbError::TemporaryError){
1984 	NDB_ERR(err);
1985 	closeTransaction(pNdb);
1986 	NdbSleep_MilliSleep(50);
1987 	retryAttempt++;
1988 	continue;
1989       }
1990       NDB_ERR(err);
1991       setNdbError(err);
1992       closeTransaction(pNdb);
1993       return NDBT_FAILED;
1994     }
1995     else{
1996       for (int b=0; (b<lockBatch) && (r<records); b++){
1997 	if (calc.verifyRowValues(rows[b]) != 0){
1998 	  closeTransaction(pNdb);
1999           g_err << "Line: " << __LINE__ << " verify row failed" << endl;
2000 	  return NDBT_FAILED;
2001 	}
2002 	r++; // Read next record
2003       }
2004     }
2005 
2006     closeTransaction(pNdb);
2007 
2008   }
2009   deallocRows();
2010   g_info << "|- Record locking completed" << endl;
2011   return NDBT_OK;
2012 }
2013 
2014 int
indexReadRecords(Ndb * pNdb,const char * idxName,int records,int batch)2015 HugoTransactions::indexReadRecords(Ndb* pNdb,
2016 				   const char * idxName,
2017 				   int records,
2018 				   int batch){
2019   int                  reads = 0;
2020   int                  r = 0;
2021   int                  retryAttempt = 0;
2022   int                  check, a;
2023   NdbOperation *pOp;
2024   NdbIndexScanOperation *sOp;
2025 
2026   const NdbDictionary::Index* pIndex
2027     = pNdb->getDictionary()->getIndex(idxName, tab.getName());
2028 
2029   if (pIndex == NULL)
2030   {
2031     g_info << "Index " << idxName << " not existing" << endl;
2032     return NDBT_FAILED;
2033   }
2034 
2035   const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex);
2036 
2037   if (batch == 0) {
2038     g_err << "ERROR: Argument batch == 0 in indexReadRecords(). "
2039 	   << "Not allowed, line: " << __LINE__ << endl;
2040     return NDBT_FAILED;
2041   }
2042 
2043   if (ordered) {
2044     batch = 1;
2045   }
2046 
2047   allocRows(batch);
2048 
2049   while (r < records){
2050     if (retryAttempt >= m_retryMax){
2051       g_err << "ERROR: has retried this operation " << retryAttempt
2052 	     << " times, failing!, line: " << __LINE__ << endl;
2053       return NDBT_FAILED;
2054     }
2055 
2056     pTrans = pNdb->startTransaction();
2057     if (pTrans == NULL) {
2058       const NdbError err = pNdb->getNdbError();
2059 
2060       if (err.status == NdbError::TemporaryError){
2061 	NDB_ERR(err);
2062 	NdbSleep_MilliSleep(50);
2063 	retryAttempt++;
2064 	continue;
2065       }
2066       NDB_ERR(err);
2067       setNdbError(err);
2068       return NDBT_FAILED;
2069     }
2070 
2071     for(int b=0; (b<batch) && (r+b < records); b++){
2072       if(!ordered){
2073 	pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
2074 	if (pOp == NULL) {
2075 	  NDB_ERR(pTrans->getNdbError());
2076 	  setNdbError(pTrans->getNdbError());
2077 	  closeTransaction(pNdb);
2078 	  return NDBT_FAILED;
2079 	}
2080 	check = pOp->readTuple();
2081       } else {
2082 	pOp = sOp = pTrans->getNdbIndexScanOperation(idxName, tab.getName());
2083 	if (sOp == NULL) {
2084 	  NDB_ERR(pTrans->getNdbError());
2085 	  setNdbError(pTrans->getNdbError());
2086 	  closeTransaction(pNdb);
2087 	  return NDBT_FAILED;
2088 	}
2089 	check = sOp->readTuples();
2090       }
2091 
2092       if( check == -1 ) {
2093 	NDB_ERR(pTrans->getNdbError());
2094 	setNdbError(pTrans->getNdbError());
2095 	closeTransaction(pNdb);
2096 	return NDBT_FAILED;
2097       }
2098 
2099       // Define primary keys
2100       if (equalForRow(pOp, r+b) != 0)
2101       {
2102         closeTransaction(pNdb);
2103         g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
2104         return NDBT_FAILED;
2105       }
2106 
2107       // Define attributes to read
2108       for(a = 0; a<tab.getNoOfColumns(); a++){
2109 	if((rows[b]->attributeStore(a) =
2110 	    pOp->getValue(tab.getColumn(a)->getName())) == 0) {
2111 	  NDB_ERR(pTrans->getNdbError());
2112 	  setNdbError(pTrans->getNdbError());
2113 	  closeTransaction(pNdb);
2114 	  return NDBT_FAILED;
2115 	}
2116       }
2117     }
2118 
2119     check = pTrans->execute(Commit, AbortOnError);
2120     check = (check == -1 ? -1 : !ordered ? check : sOp->nextResult(true));
2121     if( check == -1 ) {
2122       const NdbError err = pTrans->getNdbError();
2123 
2124       if (err.status == NdbError::TemporaryError){
2125 	NDB_ERR(err);
2126 	closeTransaction(pNdb);
2127 	NdbSleep_MilliSleep(50);
2128 	retryAttempt++;
2129 	continue;
2130       }
2131       switch(err.code){
2132       case 626: // Tuple did not exist
2133 	  g_info << r << ": " << err.code << " " << err.message << endl;
2134 	  r++;
2135 	  break;
2136 
2137       default:
2138 	NDB_ERR(err);
2139 	setNdbError(err);
2140 	closeTransaction(pNdb);
2141 	return NDBT_FAILED;
2142       }
2143     } else{
2144       for (int b=0; (b<batch) && (r+b<records); b++){
2145 	if (calc.verifyRowValues(rows[b]) != 0){
2146 	  closeTransaction(pNdb);
2147           g_err << "Line: " << __LINE__
2148                 << " verify row failed"
2149                 << ", record: " << r << " of: " << records
2150                 << ", row: " << b << " in a batch of: " << batch
2151                 << endl;
2152 	  return NDBT_FAILED;
2153 	}
2154 	reads++;
2155 	r++;
2156       }
2157       if(ordered && sOp->nextResult(true) == 0){
2158 	ndbout << "Error when comparing records "
2159 	       << " - index op next_result to many" << endl;
2160         ndbout << "Line: " << __LINE__ << endl;
2161 	closeTransaction(pNdb);
2162 	return NDBT_FAILED;
2163       }
2164     }
2165     closeTransaction(pNdb);
2166   }
2167   deallocRows();
2168   g_info << reads << " records read" << endl;
2169   return NDBT_OK;
2170 }
2171 
2172 
2173 
2174 int
indexUpdateRecords(Ndb * pNdb,const char * idxName,int records,int batch)2175 HugoTransactions::indexUpdateRecords(Ndb* pNdb,
2176 				     const char * idxName,
2177 				     int records,
2178 				     int batch){
2179 
2180   int updated = 0;
2181   int                  r = 0;
2182   int                  retryAttempt = 0;
2183   int                  check, a, b;
2184   NdbOperation *pOp;
2185   NdbScanOperation * sOp;
2186 
2187   const NdbDictionary::Index* pIndex
2188     = pNdb->getDictionary()->getIndex(idxName, tab.getName());
2189 
2190   const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex);
2191   if (ordered){
2192     batch = 1;
2193   }
2194 
2195   allocRows(batch);
2196 
2197   while (r < records){
2198     if (retryAttempt >= m_retryMax){
2199       g_err << "ERROR: has retried this operation " << retryAttempt
2200 	     << " times, failing!, line: " << __LINE__ << endl;
2201       return NDBT_FAILED;
2202     }
2203 
2204     pTrans = pNdb->startTransaction();
2205     if (pTrans == NULL) {
2206       const NdbError err = pNdb->getNdbError();
2207 
2208       if (err.status == NdbError::TemporaryError){
2209 	NDB_ERR(err);
2210 	NdbSleep_MilliSleep(50);
2211 	retryAttempt++;
2212 	continue;
2213       }
2214       NDB_ERR(err);
2215       setNdbError(err);
2216       return NDBT_FAILED;
2217     }
2218 
2219     for(b = 0; b<batch && (b+r)<records; b++){
2220       if(!ordered){
2221 	pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
2222 	if (pOp == NULL) {
2223 	  NDB_ERR(pTrans->getNdbError());
2224 	  setNdbError(pTrans->getNdbError());
2225 	  closeTransaction(pNdb);
2226 	  return NDBT_FAILED;
2227 	}
2228 
2229 	check = pOp->readTupleExclusive();
2230 	if( check == -1 ) {
2231 	  NDB_ERR(pTrans->getNdbError());
2232 	  setNdbError(pTrans->getNdbError());
2233 	  closeTransaction(pNdb);
2234 	  return NDBT_FAILED;
2235 	}
2236       } else {
2237 	pOp = sOp = pTrans->getNdbIndexScanOperation(idxName, tab.getName());
2238 	if (pOp == NULL) {
2239 	  NDB_ERR(pTrans->getNdbError());
2240 	  setNdbError(pTrans->getNdbError());
2241 	  closeTransaction(pNdb);
2242 	  return NDBT_FAILED;
2243 	}
2244 
2245 	check = 0;
2246 	sOp->readTuplesExclusive();
2247       }
2248 
2249       // Define primary keys
2250       if (equalForRow(pOp, r+b) != 0)
2251       {
2252         closeTransaction(pNdb);
2253         g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
2254         return NDBT_FAILED;
2255       }
2256 
2257       // Define attributes to read
2258       for(a = 0; a<tab.getNoOfColumns(); a++){
2259 	if((rows[b]->attributeStore(a) =
2260 	    pOp->getValue(tab.getColumn(a)->getName())) == 0) {
2261 	  NDB_ERR(pTrans->getNdbError());
2262 	  setNdbError(pTrans->getNdbError());
2263 	  closeTransaction(pNdb);
2264 	  return NDBT_FAILED;
2265 	}
2266       }
2267     }
2268 
2269     check = pTrans->execute(NoCommit, AbortOnError);
2270     check = (check == -1 ? -1 : !ordered ? check : sOp->nextResult(true));
2271     if( check == -1 ) {
2272       const NdbError err = pTrans->getNdbError();
2273       NDB_ERR(err);
2274       closeTransaction(pNdb);
2275 
2276       if (err.status == NdbError::TemporaryError){
2277 	NdbSleep_MilliSleep(50);
2278 	retryAttempt++;
2279 	continue;
2280       }
2281       setNdbError(err);
2282       return NDBT_FAILED;
2283     }
2284 
2285     if(ordered && check != 0){
2286       g_err << check << " - Row: " << r << " not found!!" << endl;
2287       closeTransaction(pNdb);
2288       return NDBT_FAILED;
2289     }
2290 
2291     for(b = 0; b<batch && (b+r)<records; b++){
2292       if (calc.verifyRowValues(rows[b]) != 0){
2293 	closeTransaction(pNdb);
2294         g_err << "Line: " << __LINE__ << " verify row failed" << endl;
2295 	return NDBT_FAILED;
2296       }
2297 
2298       int updates = calc.getUpdatesValue(rows[b]) + (m_empty_update? 0 : 1);
2299 
2300       NdbOperation* pUpdOp;
2301       if(!ordered){
2302 	pUpdOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
2303 	check = (pUpdOp == 0 ? -1 : pUpdOp->updateTuple());
2304       } else {
2305 	pUpdOp = sOp->updateCurrentTuple();
2306       }
2307 
2308       if (pUpdOp == NULL) {
2309 	NDB_ERR(pTrans->getNdbError());
2310 	setNdbError(pTrans->getNdbError());
2311 	closeTransaction(pNdb);
2312 	return NDBT_FAILED;
2313       }
2314 
2315       if( check == -1 ) {
2316 	NDB_ERR(pTrans->getNdbError());
2317 	setNdbError(pTrans->getNdbError());
2318 	closeTransaction(pNdb);
2319 	return NDBT_FAILED;
2320       }
2321 
2322       if(!ordered)
2323       {
2324         if (equalForRow(pUpdOp, r+b) != 0)
2325         {
2326           closeTransaction(pNdb);
2327           g_err << "Line: " << __LINE__ << " equal for row failed" << endl;
2328           return NDBT_FAILED;
2329         }
2330       }
2331 
2332       for(a = 0; a<tab.getNoOfColumns(); a++){
2333 	if (tab.getColumn(a)->getPrimaryKey() == false){
2334 	  if(setValueForAttr(pUpdOp, a, r+b, updates ) != 0){
2335 	    NDB_ERR(pTrans->getNdbError());
2336 	    setNdbError(pTrans->getNdbError());
2337 	    closeTransaction(pNdb);
2338 	    return NDBT_FAILED;
2339 	  }
2340 	}
2341       }
2342     }
2343 
2344     check = pTrans->execute(Commit, AbortOnError);
2345     if( check == -1 ) {
2346       const NdbError err = pTrans->getNdbError();
2347       NDB_ERR(err);
2348       closeTransaction(pNdb);
2349 
2350       if (err.status == NdbError::TemporaryError){
2351 	NdbSleep_MilliSleep(50);
2352 	retryAttempt++;
2353 	continue;
2354       }
2355       ndbout << "r = " << r << endl;
2356       setNdbError(err);
2357       return NDBT_FAILED;
2358     } else {
2359       updated += batch;
2360       pTrans->getGCI(&m_latest_gci);
2361     }
2362 
2363     closeTransaction(pNdb);
2364 
2365     r+= batch; // Read next record
2366   }
2367 
2368   g_info << "|- " << updated << " records updated" << endl;
2369   return NDBT_OK;
2370 }
2371 
2372 template class Vector<NDBT_ResultRow*>;
2373 template class Vector<NdbIndexScanOperation*>;
2374