1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <HugoOperations.hpp>
26 
27 #undef ERR
28 #define ERR(error) \
29 { \
30   const NdbError &_error= (error); \
31   if (!m_quiet) ERR_OUT(g_err, _error); \
32 }
33 
startTransaction(Ndb * pNdb,const NdbDictionary::Table * table,const char * keyData,Uint32 keyLen)34 int HugoOperations::startTransaction(Ndb* pNdb,
35                                      const NdbDictionary::Table *table,
36                                      const char  *keyData, Uint32 keyLen){
37 
38   if (pTrans != NULL){
39     ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl;
40     return NDBT_FAILED;
41   }
42   pTrans = pNdb->startTransaction(table, keyData, keyLen);
43   if (pTrans == NULL) {
44     const NdbError err = pNdb->getNdbError();
45     ERR(err);
46     setNdbError(err);
47     return NDBT_FAILED;
48   }
49   return NDBT_OK;
50 }
51 
setTransaction(NdbTransaction * new_trans,bool not_null_ok)52 int HugoOperations::setTransaction(NdbTransaction* new_trans, bool not_null_ok){
53 
54   if (pTrans != NULL && !not_null_ok){
55     ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl;
56     return NDBT_FAILED;
57   }
58   pTrans = new_trans;
59   if (pTrans == NULL) {
60     return NDBT_FAILED;
61   }
62   return NDBT_OK;
63 }
64 
65 void
setTransactionId(Uint64 id)66 HugoOperations::setTransactionId(Uint64 id){
67   if (pTrans != NULL){
68     pTrans->setTransactionId(id);
69   }
70 }
71 
closeTransaction(Ndb * pNdb)72 int HugoOperations::closeTransaction(Ndb* pNdb){
73 
74   UtilTransactions::closeTransaction(pNdb);
75 
76   m_result_sets.clear();
77   m_executed_result_sets.clear();
78 
79   return NDBT_OK;
80 }
81 
getTransaction()82 NdbConnection* HugoOperations::getTransaction(){
83   return pTrans;
84 }
85 
pkReadRecord(Ndb * pNdb,int recordNo,int numRecords,NdbOperation::LockMode lm,NdbOperation::LockMode * lmused)86 int HugoOperations::pkReadRecord(Ndb* pNdb,
87 				 int recordNo,
88 				 int numRecords,
89 				 NdbOperation::LockMode lm,
90                                  NdbOperation::LockMode *lmused){
91   int a;
92   allocRows(numRecords);
93   indexScans.clear();
94   int check;
95 
96   NdbOperation* pOp = 0;
97   pIndexScanOp = 0;
98 
99   for(int r=0; r < numRecords; r++){
100 
101     if(pOp == 0)
102     {
103       pOp = getOperation(pTrans, NdbOperation::ReadRequest);
104     }
105     if (pOp == NULL) {
106       ERR(pTrans->getNdbError());
107       setNdbError(pTrans->getNdbError());
108       return NDBT_FAILED;
109     }
110 
111 rand_lock_mode:
112     switch(lm){
113     case NdbOperation::LM_Read:
114     case NdbOperation::LM_Exclusive:
115     case NdbOperation::LM_CommittedRead:
116     case NdbOperation::LM_SimpleRead:
117       if (lmused)
118         * lmused = lm;
119       if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex)
120       {
121         if (pIndexScanOp == 0)
122         {
123           pIndexScanOp = ((NdbIndexScanOperation*)pOp);
124           bool mrrScan= (numRecords > 1);
125           Uint32 flags= mrrScan? NdbScanOperation::SF_MultiRange : 0;
126           check = pIndexScanOp->readTuples(lm, flags);
127           /* Record NdbIndexScanOperation ptr for later... */
128           indexScans.push_back(pIndexScanOp);
129         }
130       }
131       else
132 	check = pOp->readTuple(lm);
133       break;
134     default:
135       lm = (NdbOperation::LockMode)((rand() >> 16) & 3);
136       goto rand_lock_mode;
137     }
138 
139     if( check == -1 ) {
140       ERR(pTrans->getNdbError());
141       setNdbError(pTrans->getNdbError());
142       return NDBT_FAILED;
143     }
144 
145     // Define primary keys
146     if (equalForRow(pOp, r+recordNo) != 0)
147       return NDBT_FAILED;
148 
149     Uint32 partId;
150     /* Do we need to set the partitionId for this operation? */
151     if (getPartIdForRow(pOp, r+recordNo, partId))
152     {
153       g_info << "Setting operation partition Id" << endl;
154       pOp->setPartitionId(partId);
155     }
156 
157     if(pIndexScanOp)
158       pIndexScanOp->end_of_bound(r);
159 
160     if(r == 0 || pIndexScanOp == 0)
161     {
162       // Define attributes to read
163       for(a = 0; a<tab.getNoOfColumns(); a++){
164 	if((rows[r]->attributeStore(a) =
165 	    pOp->getValue(tab.getColumn(a)->getName())) == 0) {
166 	  ERR(pTrans->getNdbError());
167           setNdbError(pTrans->getNdbError());
168 	  return NDBT_FAILED;
169 	}
170       }
171     }
172     /* Note pIndexScanOp will point to the 'last' index scan op
173      * we used.  The full list is in the indexScans vector
174      */
175     pOp = pIndexScanOp;
176   }
177   return NDBT_OK;
178 }
179 
pkReadRandRecord(Ndb * pNdb,int records,int numRecords,NdbOperation::LockMode lm,NdbOperation::LockMode * lmused)180 int HugoOperations::pkReadRandRecord(Ndb* pNdb,
181                                      int records,
182                                      int numRecords,
183                                      NdbOperation::LockMode lm,
184                                      NdbOperation::LockMode *lmused){
185   int a;
186   allocRows(numRecords);
187   indexScans.clear();
188   int check;
189 
190   NdbOperation* pOp = 0;
191   pIndexScanOp = 0;
192 
193   for(int r=0; r < numRecords; r++){
194 
195     if(pOp == 0)
196     {
197       pOp = getOperation(pTrans, NdbOperation::ReadRequest);
198     }
199     if (pOp == NULL) {
200       ERR(pTrans->getNdbError());
201       setNdbError(pTrans->getNdbError());
202       return NDBT_FAILED;
203     }
204 
205 rand_lock_mode:
206     switch(lm){
207     case NdbOperation::LM_Read:
208     case NdbOperation::LM_Exclusive:
209     case NdbOperation::LM_CommittedRead:
210     case NdbOperation::LM_SimpleRead:
211       if (lmused)
212         * lmused = lm;
213       if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex &&
214 	 pIndexScanOp == 0)
215       {
216 	pIndexScanOp = ((NdbIndexScanOperation*)pOp);
217 	check = pIndexScanOp->readTuples(lm);
218         /* Record NdbIndexScanOperation ptr for later... */
219         indexScans.push_back(pIndexScanOp);
220       }
221       else
222 	check = pOp->readTuple(lm);
223       break;
224     default:
225       lm = (NdbOperation::LockMode)((rand() >> 16) & 3);
226       goto rand_lock_mode;
227     }
228 
229     if( check == -1 ) {
230       ERR(pTrans->getNdbError());
231       setNdbError(pTrans->getNdbError());
232       return NDBT_FAILED;
233     }
234 
235     int rowid= rand() % records;
236 
237     // Define primary keys
238     if (equalForRow(pOp, rowid) != 0)
239       return NDBT_FAILED;
240 
241     Uint32 partId;
242     /* Do we need to set the partitionId for this operation? */
243     if (getPartIdForRow(pOp, rowid, partId))
244     {
245       g_info << "Setting operation partition Id" << endl;
246       pOp->setPartitionId(partId);
247     }
248 
249     if(pIndexScanOp)
250       pIndexScanOp->end_of_bound(r);
251 
252     if(r == 0 || pIndexScanOp == 0)
253     {
254       // Define attributes to read
255       for(a = 0; a<tab.getNoOfColumns(); a++){
256 	if((rows[r]->attributeStore(a) =
257 	    pOp->getValue(tab.getColumn(a)->getName())) == 0) {
258 	  ERR(pTrans->getNdbError());
259           setNdbError(pTrans->getNdbError());
260 	  return NDBT_FAILED;
261 	}
262       }
263     }
264     /* Note pIndexScanOp will point to the 'last' index scan op
265      * we used.  The full list is in the indexScans vector
266      */
267     pOp = pIndexScanOp;
268   }
269   return NDBT_OK;
270 }
271 
pkReadRecordLockHandle(Ndb * pNdb,Vector<const NdbLockHandle * > & lockHandles,int recordNo,int numRecords,NdbOperation::LockMode lm,NdbOperation::LockMode * lmused)272 int HugoOperations::pkReadRecordLockHandle(Ndb* pNdb,
273                                            Vector<const NdbLockHandle*>& lockHandles,
274                                            int recordNo,
275                                            int numRecords,
276                                            NdbOperation::LockMode lm,
277                                            NdbOperation::LockMode *lmused){
278   if (idx)
279   {
280     g_err << "ERROR : Cannot call pkReadRecordLockHandle on an index"
281           << endl;
282     return NDBT_FAILED;
283   }
284 
285   /* If something other than LM_Read or LM_Exclusive is
286    * passed in then we'll choose, and PkReadRecord
287    * will update lm_used
288    */
289   while (lm != NdbOperation::LM_Read &&
290          lm != NdbOperation::LM_Exclusive)
291   {
292     lm = (NdbOperation::LockMode)((rand() >> 16) & 1);
293   }
294 
295   const NdbOperation* prevOp = pTrans->getLastDefinedOperation();
296 
297   int readRc = pkReadRecord(pNdb,
298                             recordNo,
299                             numRecords,
300                             lm,
301                             lmused);
302 
303   if (readRc == NDBT_OK)
304   {
305     /* Now traverse operations added, requesting
306      * LockHandles
307      */
308     const NdbOperation* definedOp = (prevOp)? prevOp->next() :
309       pTrans->getFirstDefinedOperation();
310 
311     while (definedOp)
312     {
313       /* Look away now */
314       const NdbLockHandle* lh =
315         (const_cast<NdbOperation*>(definedOp))->getLockHandle();
316 
317       if (lh == NULL)
318       {
319         ERR(definedOp->getNdbError());
320         setNdbError(definedOp->getNdbError());
321         return NDBT_FAILED;
322       }
323 
324       lockHandles.push_back(lh);
325       definedOp = definedOp->next();
326     }
327   }
328 
329   return readRc;
330 }
331 
pkUnlockRecord(Ndb * pNdb,Vector<const NdbLockHandle * > & lockHandles,int offset,int numRecords,NdbOperation::AbortOption ao)332 int HugoOperations::pkUnlockRecord(Ndb* pNdb,
333                                    Vector<const NdbLockHandle*>& lockHandles,
334                                    int offset,
335                                    int numRecords,
336                                    NdbOperation::AbortOption ao)
337 {
338   if (numRecords == ~(0))
339   {
340     numRecords = lockHandles.size() - offset;
341   }
342 
343   if (lockHandles.size() < (unsigned)(offset + numRecords))
344   {
345     g_err << "ERROR : LockHandles size is "
346           << lockHandles.size()
347           << " offset ("
348           << offset
349           << ") and/or numRecords ("
350           << numRecords
351           << ") too large." << endl;
352     return NDBT_FAILED;
353   }
354 
355   for (int i = 0; i < numRecords; i++)
356   {
357     const NdbLockHandle* lh = lockHandles[offset + i];
358     if (lh != NULL)
359     {
360       const NdbOperation* unlockOp = pTrans->unlock(lh, ao);
361 
362       if (unlockOp == NULL)
363       {
364         ERR(pTrans->getNdbError());
365         setNdbError(pTrans->getNdbError());
366         return NDBT_FAILED;
367       }
368     }
369     else
370     {
371       g_err << "ERROR : LockHandle number "
372             << i+offset << " is NULL. "
373             << " offset is " << offset << endl;
374       return NDBT_FAILED;
375     }
376   }
377 
378   return NDBT_OK;
379 }
380 
pkUpdateRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)381 int HugoOperations::pkUpdateRecord(Ndb* pNdb,
382 				   int recordNo,
383 				   int numRecords,
384 				   int updatesValue){
385   allocRows(numRecords);
386   int check;
387   for(int r=0; r < numRecords; r++){
388     NdbOperation* pOp = getOperation(pTrans, NdbOperation::UpdateRequest);
389     if (pOp == NULL) {
390       ERR(pTrans->getNdbError());
391       setNdbError(pTrans->getNdbError());
392       return NDBT_FAILED;
393     }
394 
395     check = pOp->updateTuple();
396     if( check == -1 ) {
397       ERR(pTrans->getNdbError());
398       setNdbError(pTrans->getNdbError());
399       return NDBT_FAILED;
400     }
401 
402     if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK)
403     {
404       return NDBT_FAILED;
405     }
406 
407     Uint32 partId;
408     if(getPartIdForRow(pOp, r+recordNo, partId))
409       pOp->setPartitionId(partId);
410 
411     pOp->setAnyValue(getAnyValueForRowUpd(r+recordNo, updatesValue));
412 
413   }
414   return NDBT_OK;
415 }
416 
417 int
setValues(NdbOperation * pOp,int rowId,int updateId)418 HugoOperations::setValues(NdbOperation* pOp, int rowId, int updateId)
419 {
420   // Define primary keys
421   int a;
422   if (equalForRow(pOp, rowId) != 0)
423     return NDBT_FAILED;
424 
425   for(a = 0; a<tab.getNoOfColumns(); a++){
426     if (tab.getColumn(a)->getPrimaryKey() == false){
427       if(setValueForAttr(pOp, a, rowId, updateId ) != 0){
428 	ERR(pTrans->getNdbError());
429         setNdbError(pTrans->getNdbError());
430 	return NDBT_FAILED;
431       }
432     }
433   }
434 
435   return NDBT_OK;
436 }
437 
pkInsertRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)438 int HugoOperations::pkInsertRecord(Ndb* pNdb,
439 				   int recordNo,
440 				   int numRecords,
441 				   int updatesValue){
442 
443   int check;
444   for(int r=0; r < numRecords; r++){
445     NdbOperation* pOp = getOperation(pTrans, NdbOperation::InsertRequest);
446     if (pOp == NULL) {
447       ERR(pTrans->getNdbError());
448       setNdbError(pTrans->getNdbError());
449       return NDBT_FAILED;
450     }
451 
452     check = pOp->insertTuple();
453     if( check == -1 ) {
454       ERR(pTrans->getNdbError());
455       setNdbError(pTrans->getNdbError());
456       return NDBT_FAILED;
457     }
458 
459     if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK)
460     {
461       m_error.code = pTrans->getNdbError().code;
462       return NDBT_FAILED;
463     }
464 
465     Uint32 partId;
466     if(getPartIdForRow(pOp, r+recordNo, partId))
467       pOp->setPartitionId(partId);
468 
469   }
470   return NDBT_OK;
471 }
472 
pkWriteRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)473 int HugoOperations::pkWriteRecord(Ndb* pNdb,
474 				  int recordNo,
475 				  int numRecords,
476 				  int updatesValue){
477   int a, check;
478   for(int r=0; r < numRecords; r++){
479     NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
480     if (pOp == NULL) {
481       ERR(pTrans->getNdbError());
482       setNdbError(pTrans->getNdbError());
483       return NDBT_FAILED;
484     }
485 
486     check = pOp->writeTuple();
487     if( check == -1 ) {
488       ERR(pTrans->getNdbError());
489       setNdbError(pTrans->getNdbError());
490       return NDBT_FAILED;
491     }
492 
493     // Define primary keys
494     if (equalForRow(pOp, r+recordNo) != 0)
495       return NDBT_FAILED;
496 
497     Uint32 partId;
498     if(getPartIdForRow(pOp, r+recordNo, partId))
499       pOp->setPartitionId(partId);
500 
501 
502     // Define attributes to update
503     for(a = 0; a<tab.getNoOfColumns(); a++){
504       if (tab.getColumn(a)->getPrimaryKey() == false){
505 	if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
506 	  ERR(pTrans->getNdbError());
507           setNdbError(pTrans->getNdbError());
508 	  return NDBT_FAILED;
509 	}
510       }
511     }
512   }
513   return NDBT_OK;
514 }
515 
pkWritePartialRecord(Ndb * pNdb,int recordNo,int numRecords)516 int HugoOperations::pkWritePartialRecord(Ndb* pNdb,
517 					 int recordNo,
518 					 int numRecords){
519 
520   int check;
521   for(int r=0; r < numRecords; r++){
522     NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
523     if (pOp == NULL) {
524       ERR(pTrans->getNdbError());
525       setNdbError(pTrans->getNdbError());
526       return NDBT_FAILED;
527     }
528 
529     check = pOp->writeTuple();
530     if( check == -1 ) {
531       ERR(pTrans->getNdbError());
532       setNdbError(pTrans->getNdbError());
533       return NDBT_FAILED;
534     }
535 
536     // Define primary keys
537     if (equalForRow(pOp, r+recordNo) != 0)
538       return NDBT_FAILED;
539 
540     Uint32 partId;
541     if(getPartIdForRow(pOp, r+recordNo, partId))
542       pOp->setPartitionId(partId);
543 
544   }
545   return NDBT_OK;
546 }
547 
pkDeleteRecord(Ndb * pNdb,int recordNo,int numRecords)548 int HugoOperations::pkDeleteRecord(Ndb* pNdb,
549 				   int recordNo,
550 				   int numRecords){
551 
552   int check;
553   for(int r=0; r < numRecords; r++){
554     NdbOperation* pOp = getOperation(pTrans, NdbOperation::DeleteRequest);
555     if (pOp == NULL) {
556       ERR(pTrans->getNdbError());
557       setNdbError(pTrans->getNdbError());
558       return NDBT_FAILED;
559     }
560 
561     check = pOp->deleteTuple();
562     if( check == -1 ) {
563       ERR(pTrans->getNdbError());
564       setNdbError(pTrans->getNdbError());
565       return NDBT_FAILED;
566     }
567 
568     // Define primary keys
569     if (equalForRow(pOp, r+recordNo) != 0)
570       return NDBT_FAILED;
571 
572     Uint32 partId;
573     if(getPartIdForRow(pOp, r+recordNo, partId))
574       pOp->setPartitionId(partId);
575   }
576   return NDBT_OK;
577 }
578 
pkRefreshRecord(Ndb * pNdb,int recordNo,int numRecords,int anyValueInfo)579 int HugoOperations::pkRefreshRecord(Ndb* pNdb,
580                                     int recordNo,
581                                     int numRecords,
582                                     int anyValueInfo){
583 
584   char buffer[NDB_MAX_TUPLE_SIZE];
585   const NdbDictionary::Table * pTab =
586     pNdb->getDictionary()->getTable(tab.getName());
587 
588   if (pTab == 0)
589   {
590     return NDBT_FAILED;
591   }
592 
593   const NdbRecord * record = pTab->getDefaultRecord();
594   NdbOperation::OperationOptions opts;
595   opts.optionsPresent = NdbOperation::OperationOptions::OO_ANYVALUE;
596   for(int r=0; r < numRecords; r++)
597   {
598     bzero(buffer, sizeof(buffer));
599     if (calc.equalForRow((Uint8*)buffer, record, r + recordNo))
600     {
601       return NDBT_FAILED;
602     }
603 
604     opts.anyValue = anyValueInfo?
605       (anyValueInfo << 16) | (r+recordNo) :
606       0;
607 
608     const NdbOperation* pOp = pTrans->refreshTuple(record, buffer,
609                                                    &opts, sizeof(opts));
610     if (pOp == NULL)
611     {
612       ERR(pTrans->getNdbError());
613       setNdbError(pTrans->getNdbError());
614       return NDBT_FAILED;
615     }
616   }
617   return NDBT_OK;
618 }
619 
execute_Commit(Ndb * pNdb,AbortOption eao)620 int HugoOperations::execute_Commit(Ndb* pNdb,
621 				   AbortOption eao){
622 
623   int check = 0;
624   check = pTrans->execute(Commit, eao);
625 
626   const NdbError err = pTrans->getNdbError();
627   if( check == -1 || err.code) {
628     ERR(err);
629     setNdbError(err);
630     NdbOperation* pOp = pTrans->getNdbErrorOperation();
631     if (pOp != NULL){
632       const NdbError err2 = pOp->getNdbError();
633       ERR(err2);
634       setNdbError(err2);
635     }
636     if (err.code == 0)
637       return NDBT_FAILED;
638     return err.code;
639   }
640 
641   for(unsigned int i = 0; i<m_result_sets.size(); i++){
642     m_executed_result_sets.push_back(m_result_sets[i]);
643 
644     int rows = m_result_sets[i].records;
645     NdbScanOperation* rs = m_result_sets[i].m_result_set;
646     int res = rs->nextResult();
647     switch(res){
648     case 1:
649       return 626;
650     case -1:
651       const NdbError err = pTrans->getNdbError();
652       ERR(err);
653       setNdbError(err);
654       return (err.code > 0 ? err.code : NDBT_FAILED);
655     }
656 
657     // A row found
658 
659     switch(rows){
660     case 0:
661       return 4000;
662     default:
663       m_result_sets[i].records--;
664       break;
665     }
666   }
667 
668   m_result_sets.clear();
669 
670   return NDBT_OK;
671 }
672 
execute_NoCommit(Ndb * pNdb,AbortOption eao)673 int HugoOperations::execute_NoCommit(Ndb* pNdb, AbortOption eao){
674 
675   int check;
676   check = pTrans->execute(NoCommit, eao);
677 
678   const NdbError err = pTrans->getNdbError();
679   if( check == -1 || err.code) {
680     ERR(err);
681     setNdbError(err);
682     const NdbOperation* pOp = pTrans->getNdbErrorOperation();
683     while (pOp != NULL)
684     {
685       const NdbError err2 = pOp->getNdbError();
686       if (err2.code)
687       {
688 	ERR(err2);
689         setNdbError(err2);
690       }
691       pOp = pTrans->getNextCompletedOperation(pOp);
692     }
693     if (err.code == 0)
694       return NDBT_FAILED;
695     return err.code;
696   }
697 
698   for(unsigned int i = 0; i<m_result_sets.size(); i++){
699     m_executed_result_sets.push_back(m_result_sets[i]);
700 
701     int rows = m_result_sets[i].records;
702     NdbScanOperation* rs = m_result_sets[i].m_result_set;
703     int res = rs->nextResult();
704     switch(res){
705     case 1:
706       return 626;
707     case -1:
708       const NdbError err = pTrans->getNdbError();
709       ERR(err);
710       setNdbError(err);
711       return (err.code > 0 ? err.code : NDBT_FAILED);
712     }
713 
714     // A row found
715 
716     switch(rows){
717     case 0:
718       return 4000;
719     default:
720     case 1:
721       break;
722     }
723   }
724 
725   m_result_sets.clear();
726 
727   return NDBT_OK;
728 }
729 
execute_Rollback(Ndb * pNdb)730 int HugoOperations::execute_Rollback(Ndb* pNdb){
731   int check;
732   check = pTrans->execute(Rollback);
733   if( check == -1 ) {
734     const NdbError err = pTrans->getNdbError();
735     ERR(err);
736     setNdbError(err);
737     return NDBT_FAILED;
738   }
739   return NDBT_OK;
740 }
741 
742 void
HugoOperations_async_callback(int res,NdbTransaction * pCon,void * ho)743 HugoOperations_async_callback(int res, NdbTransaction* pCon, void* ho)
744 {
745   ((HugoOperations*)ho)->callback(res, pCon);
746 }
747 
748 void
callback(int res,NdbTransaction * pCon)749 HugoOperations::callback(int res, NdbTransaction* pCon)
750 {
751   assert(pCon == pTrans);
752   m_async_reply= 1;
753   if(res)
754   {
755     m_async_return = pCon->getNdbError().code;
756   }
757   else
758   {
759     m_async_return = 0;
760   }
761 }
762 
763 int
execute_async(Ndb * pNdb,NdbTransaction::ExecType et,NdbOperation::AbortOption eao)764 HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
765 			      NdbOperation::AbortOption eao){
766 
767   m_async_reply= 0;
768   pTrans->executeAsynchPrepare(et,
769 			       HugoOperations_async_callback,
770 			       this,
771 			       eao);
772 
773   pNdb->sendPreparedTransactions();
774 
775   return NDBT_OK;
776 }
777 
778 int
execute_async_prepare(Ndb * pNdb,NdbTransaction::ExecType et,NdbOperation::AbortOption eao)779 HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et,
780 				      NdbOperation::AbortOption eao){
781 
782   m_async_reply= 0;
783   pTrans->executeAsynchPrepare(et,
784 			       HugoOperations_async_callback,
785 			       this,
786 			       eao);
787 
788   return NDBT_OK;
789 }
790 
791 int
wait_async(Ndb * pNdb,int timeout)792 HugoOperations::wait_async(Ndb* pNdb, int timeout)
793 {
794   volatile int * wait = &m_async_reply;
795   while (!* wait)
796   {
797     pNdb->sendPollNdb(1000);
798 
799     if(* wait)
800     {
801       if(m_async_return)
802 	ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
803       return m_async_return;
804     }
805   }
806   ndbout_c("wait returned nothing...");
807   return -1;
808 }
809 
HugoOperations(const NdbDictionary::Table & _tab,const NdbDictionary::Index * idx)810 HugoOperations::HugoOperations(const NdbDictionary::Table& _tab,
811 			       const NdbDictionary::Index* idx):
812   UtilTransactions(_tab, idx),
813   pIndexScanOp(NULL),
814   calc(_tab),
815   m_quiet(false),
816   avCallback(NULL)
817 {
818 }
819 
~HugoOperations()820 HugoOperations::~HugoOperations(){
821   deallocRows();
822   if (pTrans != NULL)
823   {
824     pTrans->close();
825     pTrans = NULL;
826   }
827 }
828 
829 int
equalForRow(NdbOperation * pOp,int row)830 HugoOperations::equalForRow(NdbOperation* pOp, int row)
831 {
832   for(int a = 0; a<tab.getNoOfColumns(); a++)
833   {
834     if (tab.getColumn(a)->getPrimaryKey() == true)
835     {
836       if(equalForAttr(pOp, a, row) != 0)
837       {
838         ERR(pOp->getNdbError());
839         setNdbError(pOp->getNdbError());
840         return NDBT_FAILED;
841       }
842     }
843   }
844   return NDBT_OK;
845 }
846 
getPartIdForRow(const NdbOperation * pOp,int rowid,Uint32 & partId)847 bool HugoOperations::getPartIdForRow(const NdbOperation* pOp,
848                                      int rowid,
849                                      Uint32& partId)
850 {
851   if (tab.getFragmentType() == NdbDictionary::Object::UserDefined)
852   {
853     /* Primary keys and Ordered indexes are partitioned according
854      * to the row number
855      * PartitionId must be set for PK access.  Ordered indexes
856      * can scan all partitions.
857      */
858     if (pOp->getType() == NdbOperation::PrimaryKeyAccess)
859     {
860       /* Need to set the partitionId for this op
861        * For Hugo, we use 'HASH' partitioning, which is probably
862        * better called 'MODULO' partitioning with
863        * FragId == rowNum % NumPartitions
864        * This gives a good balance with the normal Hugo data, but different
865        * row to partition assignments than normal key partitioning.
866        */
867       const Uint32 numFrags= tab.getFragmentCount();
868       partId= rowid % numFrags;
869       g_info << "Returning partition Id of " << partId << endl;
870       return true;
871     }
872   }
873   partId= ~0;
874   return false;
875 }
876 
equalForAttr(NdbOperation * pOp,int attrId,int rowId)877 int HugoOperations::equalForAttr(NdbOperation* pOp,
878 				   int attrId,
879 				   int rowId){
880   const NdbDictionary::Column* attr = tab.getColumn(attrId);
881   if (attr->getPrimaryKey() == false){
882     g_info << "Can't call equalForAttr on non PK attribute" << endl;
883     return NDBT_FAILED;
884   }
885 
886   int len = attr->getSizeInBytes();
887   char buf[NDB_MAX_TUPLE_SIZE];
888   memset(buf, 0, sizeof(buf));
889   Uint32 real_len;
890   const char * value = calc.calcValue(rowId, attrId, 0, buf, len, &real_len);
891   return pOp->equal( attr->getName(), value, real_len);
892 }
893 
setValueForAttr(NdbOperation * pOp,int attrId,int rowId,int updateId)894 int HugoOperations::setValueForAttr(NdbOperation* pOp,
895 				      int attrId,
896 				      int rowId,
897 				      int updateId){
898   const NdbDictionary::Column* attr = tab.getColumn(attrId);
899 
900   if (! (attr->getType() == NdbDictionary::Column::Blob))
901   {
902     int len = attr->getSizeInBytes();
903     char buf[NDB_MAX_TUPLE_SIZE];
904     memset(buf, 0, sizeof(buf));
905     Uint32 real_len;
906     const char * value = calc.calcValue(rowId, attrId,
907                                         updateId, buf, len, &real_len);
908     return pOp->setValue( attr->getName(), value, real_len);
909   }
910   else
911   {
912     char buf[32000];
913     int len = (int)sizeof(buf);
914     Uint32 real_len;
915     const char * value = calc.calcValue(rowId, attrId,
916                                         updateId, buf, len, &real_len);
917     NdbBlob * b = pOp->getBlobHandle(attrId);
918     if (b == 0)
919       return -1;
920 
921     if (real_len == 0)
922       return b->setNull();
923     else
924       return b->setValue(value, real_len);
925   }
926 }
927 
928 int
verifyUpdatesValue(int updatesValue,int _numRows)929 HugoOperations::verifyUpdatesValue(int updatesValue, int _numRows){
930   _numRows = (_numRows == 0 ? rows.size() : _numRows);
931 
932   int result = NDBT_OK;
933 
934   for(int i = 0; i<_numRows; i++){
935     if(calc.verifyRowValues(rows[i]) != NDBT_OK){
936       g_err << "Inconsistent row"
937 	    << endl << "\t" << rows[i]->c_str().c_str() << endl;
938       result = NDBT_FAILED;
939       continue;
940     }
941 
942     if(calc.getUpdatesValue(rows[i]) != updatesValue){
943       result = NDBT_FAILED;
944       g_err << "Invalid updates value for row " << i << endl
945 	    << " updatesValue: " << updatesValue << endl
946 	    << " calc.getUpdatesValue: " << calc.getUpdatesValue(rows[i]) << endl
947 	    << rows[i]->c_str().c_str() << endl;
948       continue;
949     }
950   }
951 
952   if(_numRows == 0){
953     g_err << "No rows -> Invalid updates value" << endl;
954     return NDBT_FAILED;
955   }
956 
957   return result;
958 }
959 
allocRows(int _numRows)960 void HugoOperations::allocRows(int _numRows){
961   if(_numRows <= 0){
962     g_info << "Illegal value for num rows : " << _numRows << endl;
963     abort();
964   }
965 
966   for(int b=rows.size(); b<_numRows; b++){
967     rows.push_back(new NDBT_ResultRow(tab));
968   }
969 }
970 
deallocRows()971 void HugoOperations::deallocRows(){
972   while(rows.size() > 0){
973     delete rows.back();
974     rows.erase(rows.size() - 1);
975   }
976 }
977 
saveCopyOfRecord(int numRecords)978 int HugoOperations::saveCopyOfRecord(int numRecords ){
979 
980   if (numRecords > (int)rows.size())
981     return NDBT_FAILED;
982 
983   for (int i = 0; i < numRecords; i++){
984     savedRecords.push_back(rows[i]->c_str());
985   }
986   return NDBT_OK;
987 }
988 
getRecordStr(int recordNum)989 BaseString HugoOperations::getRecordStr(int recordNum){
990   if (recordNum > (int)rows.size())
991     return NULL;
992   return rows[recordNum]->c_str();
993 }
994 
getRecordGci(int recordNum)995 int HugoOperations::getRecordGci(int recordNum){
996   return pTrans->getGCI();
997 }
998 
999 
compareRecordToCopy(int numRecords)1000 int HugoOperations::compareRecordToCopy(int numRecords ){
1001   if (numRecords > (int)rows.size())
1002     return NDBT_FAILED;
1003   if ((unsigned)numRecords > savedRecords.size())
1004     return NDBT_FAILED;
1005 
1006   int result = NDBT_OK;
1007   for (int i = 0; i < numRecords; i++){
1008     BaseString str = rows[i]->c_str();
1009     ndbout << "row["<<i<<"]: " << str << endl;
1010     ndbout << "sav["<<i<<"]: " << savedRecords[i] << endl;
1011     if (savedRecords[i] == str){
1012       ;
1013     } else {
1014       result = NDBT_FAILED;
1015     }
1016   }
1017   return result;
1018 }
1019 
1020 void
refresh()1021 HugoOperations::refresh() {
1022   NdbTransaction * t = getTransaction();
1023   if(t)
1024     t->refresh();
1025 }
1026 
indexReadRecords(Ndb *,const char * idxName,int recordNo,bool exclusive,int numRecords)1027 int HugoOperations::indexReadRecords(Ndb*, const char * idxName, int recordNo,
1028 				     bool exclusive,
1029 				     int numRecords){
1030 
1031   int a;
1032   allocRows(numRecords);
1033   int check;
1034   for(int r=0; r < numRecords; r++){
1035     NdbOperation* pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
1036     if (pOp == NULL) {
1037       ERR(pTrans->getNdbError());
1038       setNdbError(pTrans->getNdbError());
1039       return NDBT_FAILED;
1040     }
1041 
1042     if (exclusive == true)
1043       check = pOp->readTupleExclusive();
1044     else
1045       check = pOp->readTuple();
1046     if( check == -1 ) {
1047       ERR(pTrans->getNdbError());
1048       setNdbError(pTrans->getNdbError());
1049       return NDBT_FAILED;
1050     }
1051 
1052     // Define primary keys
1053     if (equalForRow(pOp, r+recordNo) != 0)
1054       return NDBT_FAILED;
1055 
1056     // Define attributes to read
1057     for(a = 0; a<tab.getNoOfColumns(); a++){
1058       if((rows[r]->attributeStore(a) =
1059 	  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1060 	ERR(pTrans->getNdbError());
1061         setNdbError(pTrans->getNdbError());
1062 	return NDBT_FAILED;
1063       }
1064     }
1065   }
1066   return NDBT_OK;
1067 }
1068 
1069 int
indexUpdateRecord(Ndb *,const char * idxName,int recordNo,int numRecords,int updatesValue)1070 HugoOperations::indexUpdateRecord(Ndb*,
1071 				  const char * idxName,
1072 				  int recordNo,
1073 				  int numRecords,
1074 				  int updatesValue){
1075   int a;
1076   allocRows(numRecords);
1077   int check;
1078   for(int r=0; r < numRecords; r++){
1079     NdbOperation* pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
1080     if (pOp == NULL) {
1081       ERR(pTrans->getNdbError());
1082       setNdbError(pTrans->getNdbError());
1083       return NDBT_FAILED;
1084     }
1085 
1086     check = pOp->updateTuple();
1087     if( check == -1 ) {
1088       ERR(pTrans->getNdbError());
1089       setNdbError(pTrans->getNdbError());
1090       return NDBT_FAILED;
1091     }
1092 
1093     // Define primary keys
1094     if (equalForRow(pOp, r+recordNo) != 0)
1095       return NDBT_FAILED;
1096 
1097     // Define attributes to update
1098     for(a = 0; a<tab.getNoOfColumns(); a++){
1099       if (tab.getColumn(a)->getPrimaryKey() == false){
1100 	if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
1101 	  ERR(pTrans->getNdbError());
1102           setNdbError(pTrans->getNdbError());
1103 	  return NDBT_FAILED;
1104 	}
1105       }
1106     }
1107   }
1108   return NDBT_OK;
1109 }
1110 
1111 int
scanReadRecords(Ndb * pNdb,NdbScanOperation::LockMode lm,int records)1112 HugoOperations::scanReadRecords(Ndb* pNdb, NdbScanOperation::LockMode lm,
1113 				int records){
1114 
1115   allocRows(records);
1116   NdbScanOperation * pOp = pTrans->getNdbScanOperation(tab.getName());
1117 
1118   if(!pOp)
1119     return -1;
1120 
1121   if(pOp->readTuples(lm, 0, 1)){
1122     return -1;
1123   }
1124 
1125   for(int a = 0; a<tab.getNoOfColumns(); a++){
1126     if((rows[0]->attributeStore(a) =
1127 	pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1128       ERR(pTrans->getNdbError());
1129       setNdbError(pTrans->getNdbError());
1130       return NDBT_FAILED;
1131     }
1132   }
1133 
1134   RsPair p = {pOp, records};
1135   m_result_sets.push_back(p);
1136 
1137   return 0;
1138 }
1139 
1140 int
releaseLockHandles(Ndb * pNdb,Vector<const NdbLockHandle * > & lockHandles,int offset,int numRecords)1141 HugoOperations::releaseLockHandles(Ndb* pNdb,
1142                                    Vector<const NdbLockHandle*>& lockHandles,
1143                                    int offset,
1144                                    int numRecords)
1145 {
1146   int totalSize = lockHandles.size();
1147   if (numRecords == ~(0))
1148   {
1149     numRecords = totalSize - offset;
1150   }
1151 
1152   if (totalSize < (offset + numRecords))
1153   {
1154     g_err << "ERROR : LockHandles size is "
1155           << lockHandles.size()
1156           << " offset ("
1157           << offset
1158           << ") and/or numRecords ("
1159           << numRecords
1160           << ") too large." << endl;
1161     return NDBT_FAILED;
1162   }
1163 
1164   for (int i = 0; i < numRecords; i++)
1165   {
1166     const NdbLockHandle* lh = lockHandles[offset + i];
1167     if (lh != NULL)
1168     {
1169       if (pTrans->releaseLockHandle(lh) != 0)
1170       {
1171         ERR(pTrans->getNdbError());
1172         setNdbError(pTrans->getNdbError());
1173         return NDBT_FAILED;
1174       }
1175       const NdbLockHandle* nullPtr = NULL;
1176       (void)nullPtr; // ??
1177       //lockHandles.set(nullPtr, offset + i, nullPtr);
1178     }
1179     else
1180     {
1181       g_err << "ERROR : LockHandle number "
1182             << i+offset << " is NULL. "
1183             << " offset is " << offset << endl;
1184       return NDBT_FAILED;
1185     }
1186   }
1187 
1188   return NDBT_OK;
1189 
1190 }
1191 
1192 static void
update(const NdbError & _err)1193 update(const NdbError & _err)
1194 {
1195   NdbError & error = (NdbError &) _err;
1196   ndberror_struct ndberror = (ndberror_struct)error;
1197   ndberror_update(&ndberror);
1198   error = NdbError(ndberror);
1199 }
1200 
1201 const NdbError &
getNdbError() const1202 HugoOperations::getNdbError() const
1203 {
1204   update(m_error);
1205   return m_error;
1206 }
1207 
1208 void
setNdbError(const NdbError & error)1209 HugoOperations::setNdbError(const NdbError& error)
1210 {
1211   m_error.code = error.code ? error.code : 1;
1212 }
1213 
1214 void
setAnyValueCallback(AnyValueCallback avc)1215 HugoOperations::setAnyValueCallback(AnyValueCallback avc)
1216 {
1217   avCallback = avc;
1218 }
1219 
1220 Uint32
getAnyValueForRowUpd(int row,int update)1221 HugoOperations::getAnyValueForRowUpd(int row, int update)
1222 {
1223   if (avCallback == NULL)
1224     return 0;
1225 
1226   return (avCallback)(pTrans->getNdb(), pTrans,
1227                       row, update);
1228 }
1229 
1230 template class Vector<HugoOperations::RsPair>;
1231 template class Vector<const NdbLockHandle*>;
1232