1 /* Copyright (c) 2003-2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #include <HugoOperations.hpp>
18
startTransaction(Ndb * pNdb,const NdbDictionary::Table * table,const char * keyData,Uint32 keyLen)19 int HugoOperations::startTransaction(Ndb* pNdb,
20 const NdbDictionary::Table *table,
21 const char *keyData, Uint32 keyLen){
22
23 if (pTrans != NULL){
24 ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl;
25 return NDBT_FAILED;
26 }
27 pTrans = pNdb->startTransaction(table, keyData, keyLen);
28 if (pTrans == NULL) {
29 const NdbError err = pNdb->getNdbError();
30 ERR(err);
31 return NDBT_FAILED;
32 }
33 return NDBT_OK;
34 }
35
setTransaction(NdbTransaction * new_trans,bool not_null_ok)36 int HugoOperations::setTransaction(NdbTransaction* new_trans, bool not_null_ok){
37
38 if (pTrans != NULL && !not_null_ok){
39 ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl;
40 return NDBT_FAILED;
41 }
42 pTrans = new_trans;
43 if (pTrans == NULL) {
44 return NDBT_FAILED;
45 }
46 return NDBT_OK;
47 }
48
49 void
setTransactionId(Uint64 id)50 HugoOperations::setTransactionId(Uint64 id){
51 if (pTrans != NULL){
52 pTrans->setTransactionId(id);
53 }
54 }
55
closeTransaction(Ndb * pNdb)56 int HugoOperations::closeTransaction(Ndb* pNdb){
57
58 UtilTransactions::closeTransaction(pNdb);
59
60 m_result_sets.clear();
61 m_executed_result_sets.clear();
62
63 return NDBT_OK;
64 }
65
getTransaction()66 NdbConnection* HugoOperations::getTransaction(){
67 return pTrans;
68 }
69
pkReadRecord(Ndb * pNdb,int recordNo,int numRecords,NdbOperation::LockMode lm)70 int HugoOperations::pkReadRecord(Ndb* pNdb,
71 int recordNo,
72 int numRecords,
73 NdbOperation::LockMode lm){
74 int a;
75 allocRows(numRecords);
76 int check;
77
78 NdbOperation* pOp = 0;
79 pIndexScanOp = 0;
80
81 for(int r=0; r < numRecords; r++){
82
83 if(pOp == 0)
84 {
85 pOp = getOperation(pTrans, NdbOperation::ReadRequest);
86 }
87 if (pOp == NULL) {
88 ERR(pTrans->getNdbError());
89 return NDBT_FAILED;
90 }
91
92 rand_lock_mode:
93 switch(lm){
94 case NdbOperation::LM_Read:
95 case NdbOperation::LM_Exclusive:
96 case NdbOperation::LM_CommittedRead:
97 case NdbOperation::LM_SimpleRead:
98 if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex &&
99 pIndexScanOp == 0)
100 {
101 pIndexScanOp = ((NdbIndexScanOperation*)pOp);
102 check = pIndexScanOp->readTuples(lm);
103 }
104 else
105 check = pOp->readTuple(lm);
106 break;
107 default:
108 lm = (NdbOperation::LockMode)((rand() >> 16) & 3);
109 goto rand_lock_mode;
110 }
111
112 if( check == -1 ) {
113 ERR(pTrans->getNdbError());
114 return NDBT_FAILED;
115 }
116
117 // Define primary keys
118 if (equalForRow(pOp, r+recordNo) != 0)
119 return NDBT_FAILED;
120
121 if(pIndexScanOp)
122 pIndexScanOp->end_of_bound(r);
123
124 if(r == 0 || pIndexScanOp == 0)
125 {
126 // Define attributes to read
127 for(a = 0; a<tab.getNoOfColumns(); a++){
128 if((rows[r]->attributeStore(a) =
129 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
130 ERR(pTrans->getNdbError());
131 return NDBT_FAILED;
132 }
133 }
134 }
135 pOp = pIndexScanOp;
136 }
137 return NDBT_OK;
138 }
139
pkUpdateRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)140 int HugoOperations::pkUpdateRecord(Ndb* pNdb,
141 int recordNo,
142 int numRecords,
143 int updatesValue){
144 allocRows(numRecords);
145 int check;
146 for(int r=0; r < numRecords; r++){
147 NdbOperation* pOp = getOperation(pTrans, NdbOperation::UpdateRequest);
148 if (pOp == NULL) {
149 ERR(pTrans->getNdbError());
150 return NDBT_FAILED;
151 }
152
153 check = pOp->updateTuple();
154 if( check == -1 ) {
155 ERR(pTrans->getNdbError());
156 return NDBT_FAILED;
157 }
158
159 if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK)
160 {
161 return NDBT_FAILED;
162 }
163 }
164 return NDBT_OK;
165 }
166
167 int
setValues(NdbOperation * pOp,int rowId,int updateId)168 HugoOperations::setValues(NdbOperation* pOp, int rowId, int updateId)
169 {
170 // Define primary keys
171 int a;
172 if (equalForRow(pOp, rowId) != 0)
173 return NDBT_FAILED;
174
175 for(a = 0; a<tab.getNoOfColumns(); a++){
176 if (tab.getColumn(a)->getPrimaryKey() == false){
177 if(setValueForAttr(pOp, a, rowId, updateId ) != 0){
178 ERR(pTrans->getNdbError());
179 return NDBT_FAILED;
180 }
181 }
182 }
183
184 return NDBT_OK;
185 }
186
pkInsertRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)187 int HugoOperations::pkInsertRecord(Ndb* pNdb,
188 int recordNo,
189 int numRecords,
190 int updatesValue){
191
192 int check;
193 for(int r=0; r < numRecords; r++){
194 NdbOperation* pOp = getOperation(pTrans, NdbOperation::InsertRequest);
195 if (pOp == NULL) {
196 ERR(pTrans->getNdbError());
197 return NDBT_FAILED;
198 }
199
200 check = pOp->insertTuple();
201 if( check == -1 ) {
202 ERR(pTrans->getNdbError());
203 return NDBT_FAILED;
204 }
205
206 if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK)
207 {
208 return NDBT_FAILED;
209 }
210 }
211 return NDBT_OK;
212 }
213
pkWriteRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)214 int HugoOperations::pkWriteRecord(Ndb* pNdb,
215 int recordNo,
216 int numRecords,
217 int updatesValue){
218
219 int a, check;
220 for(int r=0; r < numRecords; r++){
221 NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
222 if (pOp == NULL) {
223 ERR(pTrans->getNdbError());
224 return NDBT_FAILED;
225 }
226
227 check = pOp->writeTuple();
228 if( check == -1 ) {
229 ERR(pTrans->getNdbError());
230 return NDBT_FAILED;
231 }
232
233 // Define primary keys
234 if (equalForRow(pOp, r+recordNo) != 0)
235 return NDBT_FAILED;
236
237 // Define attributes to update
238 for(a = 0; a<tab.getNoOfColumns(); a++){
239 if (tab.getColumn(a)->getPrimaryKey() == false){
240 if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
241 ERR(pTrans->getNdbError());
242 return NDBT_FAILED;
243 }
244 }
245 }
246 }
247 return NDBT_OK;
248 }
249
pkWritePartialRecord(Ndb * pNdb,int recordNo,int numRecords)250 int HugoOperations::pkWritePartialRecord(Ndb* pNdb,
251 int recordNo,
252 int numRecords){
253
254 int check;
255 for(int r=0; r < numRecords; r++){
256 NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
257 if (pOp == NULL) {
258 ERR(pTrans->getNdbError());
259 return NDBT_FAILED;
260 }
261
262 check = pOp->writeTuple();
263 if( check == -1 ) {
264 ERR(pTrans->getNdbError());
265 return NDBT_FAILED;
266 }
267
268 // Define primary keys
269 if (equalForRow(pOp, r+recordNo) != 0)
270 return NDBT_FAILED;
271 }
272 return NDBT_OK;
273 }
274
pkDeleteRecord(Ndb * pNdb,int recordNo,int numRecords)275 int HugoOperations::pkDeleteRecord(Ndb* pNdb,
276 int recordNo,
277 int numRecords){
278
279 int check;
280 for(int r=0; r < numRecords; r++){
281 NdbOperation* pOp = getOperation(pTrans, NdbOperation::DeleteRequest);
282 if (pOp == NULL) {
283 ERR(pTrans->getNdbError());
284 return NDBT_FAILED;
285 }
286
287 check = pOp->deleteTuple();
288 if( check == -1 ) {
289 ERR(pTrans->getNdbError());
290 return NDBT_FAILED;
291 }
292
293 // Define primary keys
294 if (equalForRow(pOp, r+recordNo) != 0)
295 return NDBT_FAILED;
296 }
297 return NDBT_OK;
298 }
299
execute_Commit(Ndb * pNdb,AbortOption eao)300 int HugoOperations::execute_Commit(Ndb* pNdb,
301 AbortOption eao){
302
303 int check = 0;
304 check = pTrans->execute(Commit, eao);
305
306 const NdbError err = pTrans->getNdbError();
307 if( check == -1 || err.code) {
308 ERR(err);
309 NdbOperation* pOp = pTrans->getNdbErrorOperation();
310 if (pOp != NULL){
311 const NdbError err2 = pOp->getNdbError();
312 ERR(err2);
313 }
314 if (err.code == 0)
315 return NDBT_FAILED;
316 return err.code;
317 }
318
319 for(int i = 0; i<m_result_sets.size(); i++){
320 m_executed_result_sets.push_back(m_result_sets[i]);
321
322 int rows = m_result_sets[i].records;
323 NdbScanOperation* rs = m_result_sets[i].m_result_set;
324 int res = rs->nextResult();
325 switch(res){
326 case 1:
327 return 626;
328 case -1:
329 const NdbError err = pTrans->getNdbError();
330 ERR(err);
331 return (err.code > 0 ? err.code : NDBT_FAILED);
332 }
333
334 // A row found
335
336 switch(rows){
337 case 0:
338 return 4000;
339 default:
340 m_result_sets[i].records--;
341 break;
342 }
343 }
344
345 m_result_sets.clear();
346
347 return NDBT_OK;
348 }
349
execute_NoCommit(Ndb * pNdb,AbortOption eao)350 int HugoOperations::execute_NoCommit(Ndb* pNdb, AbortOption eao){
351
352 int check;
353 check = pTrans->execute(NoCommit, eao);
354
355 const NdbError err = pTrans->getNdbError();
356 if( check == -1 || err.code) {
357 ERR(err);
358 const NdbOperation* pOp = pTrans->getNdbErrorOperation();
359 while (pOp != NULL)
360 {
361 const NdbError err2 = pOp->getNdbError();
362 if (err2.code)
363 ERR(err2);
364 pOp = pTrans->getNextCompletedOperation(pOp);
365 }
366 if (err.code == 0)
367 return NDBT_FAILED;
368 return err.code;
369 }
370
371 for(int i = 0; i<m_result_sets.size(); i++){
372 m_executed_result_sets.push_back(m_result_sets[i]);
373
374 int rows = m_result_sets[i].records;
375 NdbScanOperation* rs = m_result_sets[i].m_result_set;
376 int res = rs->nextResult();
377 switch(res){
378 case 1:
379 return 626;
380 case -1:
381 const NdbError err = pTrans->getNdbError();
382 ERR(err);
383 return (err.code > 0 ? err.code : NDBT_FAILED);
384 }
385
386 // A row found
387
388 switch(rows){
389 case 0:
390 return 4000;
391 default:
392 case 1:
393 break;
394 }
395 }
396
397 m_result_sets.clear();
398
399 return NDBT_OK;
400 }
401
execute_Rollback(Ndb * pNdb)402 int HugoOperations::execute_Rollback(Ndb* pNdb){
403 int check;
404 check = pTrans->execute(Rollback);
405 if( check == -1 ) {
406 const NdbError err = pTrans->getNdbError();
407 ERR(err);
408 return NDBT_FAILED;
409 }
410 return NDBT_OK;
411 }
412
413 void
HugoOperations_async_callback(int res,NdbTransaction * pCon,void * ho)414 HugoOperations_async_callback(int res, NdbTransaction* pCon, void* ho)
415 {
416 ((HugoOperations*)ho)->callback(res, pCon);
417 }
418
419 void
callback(int res,NdbTransaction * pCon)420 HugoOperations::callback(int res, NdbTransaction* pCon)
421 {
422 assert(pCon == pTrans);
423 m_async_reply= 1;
424 if(res)
425 {
426 m_async_return = pCon->getNdbError().code;
427 }
428 else
429 {
430 m_async_return = 0;
431 }
432 }
433
434 int
execute_async(Ndb * pNdb,NdbTransaction::ExecType et,NdbOperation::AbortOption eao)435 HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
436 NdbOperation::AbortOption eao){
437
438 m_async_reply= 0;
439 pTrans->executeAsynchPrepare(et,
440 HugoOperations_async_callback,
441 this,
442 eao);
443
444 pNdb->sendPreparedTransactions();
445
446 return NDBT_OK;
447 }
448
449 int
execute_async_prepare(Ndb * pNdb,NdbTransaction::ExecType et,NdbOperation::AbortOption eao)450 HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et,
451 NdbOperation::AbortOption eao){
452
453 m_async_reply= 0;
454 pTrans->executeAsynchPrepare(et,
455 HugoOperations_async_callback,
456 this,
457 eao);
458
459 return NDBT_OK;
460 }
461
462 int
wait_async(Ndb * pNdb,int timeout)463 HugoOperations::wait_async(Ndb* pNdb, int timeout)
464 {
465 volatile int * wait = &m_async_reply;
466 while (!* wait)
467 {
468 pNdb->sendPollNdb(1000);
469
470 if(* wait)
471 {
472 if(m_async_return)
473 ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
474 return m_async_return;
475 }
476 }
477 ndbout_c("wait returned nothing...");
478 return -1;
479 }
480
HugoOperations(const NdbDictionary::Table & _tab,const NdbDictionary::Index * idx)481 HugoOperations::HugoOperations(const NdbDictionary::Table& _tab,
482 const NdbDictionary::Index* idx):
483 UtilTransactions(_tab, idx),
484 calc(_tab)
485 {
486 }
487
~HugoOperations()488 HugoOperations::~HugoOperations(){
489 deallocRows();
490 if (pTrans != NULL)
491 {
492 pTrans->close();
493 pTrans = NULL;
494 }
495 }
496
497 int
equalForRow(NdbOperation * pOp,int row)498 HugoOperations::equalForRow(NdbOperation* pOp, int row)
499 {
500 for(int a = 0; a<tab.getNoOfColumns(); a++)
501 {
502 if (tab.getColumn(a)->getPrimaryKey() == true)
503 {
504 if(equalForAttr(pOp, a, row) != 0)
505 {
506 ERR(pOp->getNdbError());
507 return NDBT_FAILED;
508 }
509 }
510 }
511 return NDBT_OK;
512 }
513
equalForAttr(NdbOperation * pOp,int attrId,int rowId)514 int HugoOperations::equalForAttr(NdbOperation* pOp,
515 int attrId,
516 int rowId){
517 int check = -1;
518 const NdbDictionary::Column* attr = tab.getColumn(attrId);
519 if (attr->getPrimaryKey() == false){
520 g_info << "Can't call equalForAttr on non PK attribute" << endl;
521 return NDBT_FAILED;
522 }
523
524 int len = attr->getSizeInBytes();
525 char buf[8000];
526 memset(buf, 0, sizeof(buf));
527 Uint32 real_len;
528 const char * value = calc.calcValue(rowId, attrId, 0, buf, len, &real_len);
529 return pOp->equal( attr->getName(), value, real_len);
530 }
531
setValueForAttr(NdbOperation * pOp,int attrId,int rowId,int updateId)532 int HugoOperations::setValueForAttr(NdbOperation* pOp,
533 int attrId,
534 int rowId,
535 int updateId){
536 int check = -1;
537 const NdbDictionary::Column* attr = tab.getColumn(attrId);
538
539 int len = attr->getSizeInBytes();
540 char buf[8000];
541 memset(buf, 0, sizeof(buf));
542 Uint32 real_len;
543 const char * value = calc.calcValue(rowId, attrId,
544 updateId, buf, len, &real_len);
545 return pOp->setValue( attr->getName(), value, real_len);
546 }
547
548 int
verifyUpdatesValue(int updatesValue,int _numRows)549 HugoOperations::verifyUpdatesValue(int updatesValue, int _numRows){
550 _numRows = (_numRows == 0 ? rows.size() : _numRows);
551
552 int result = NDBT_OK;
553
554 for(int i = 0; i<_numRows; i++){
555 if(calc.verifyRowValues(rows[i]) != NDBT_OK){
556 g_err << "Inconsistent row"
557 << endl << "\t" << rows[i]->c_str().c_str() << endl;
558 result = NDBT_FAILED;
559 continue;
560 }
561
562 if(calc.getUpdatesValue(rows[i]) != updatesValue){
563 result = NDBT_FAILED;
564 g_err << "Invalid updates value for row " << i << endl
565 << " updatesValue: " << updatesValue << endl
566 << " calc.getUpdatesValue: " << calc.getUpdatesValue(rows[i]) << endl
567 << rows[i]->c_str().c_str() << endl;
568 continue;
569 }
570 }
571
572 if(_numRows == 0){
573 g_err << "No rows -> Invalid updates value" << endl;
574 return NDBT_FAILED;
575 }
576
577 return result;
578 }
579
allocRows(int _numRows)580 void HugoOperations::allocRows(int _numRows){
581 if(_numRows <= 0){
582 g_info << "Illegal value for num rows : " << _numRows << endl;
583 abort();
584 }
585
586 for(int b=rows.size(); b<_numRows; b++){
587 rows.push_back(new NDBT_ResultRow(tab));
588 }
589 }
590
deallocRows()591 void HugoOperations::deallocRows(){
592 while(rows.size() > 0){
593 delete rows.back();
594 rows.erase(rows.size() - 1);
595 }
596 }
597
saveCopyOfRecord(int numRecords)598 int HugoOperations::saveCopyOfRecord(int numRecords ){
599
600 if (numRecords > (int)rows.size())
601 return NDBT_FAILED;
602
603 for (int i = 0; i < numRecords; i++){
604 savedRecords.push_back(rows[i]->c_str());
605 }
606 return NDBT_OK;
607 }
608
getRecordStr(int recordNum)609 BaseString HugoOperations::getRecordStr(int recordNum){
610 if (recordNum > (int)rows.size())
611 return NULL;
612 return rows[recordNum]->c_str();
613 }
614
getRecordGci(int recordNum)615 int HugoOperations::getRecordGci(int recordNum){
616 return pTrans->getGCI();
617 }
618
619
compareRecordToCopy(int numRecords)620 int HugoOperations::compareRecordToCopy(int numRecords ){
621 if (numRecords > (int)rows.size())
622 return NDBT_FAILED;
623 if ((unsigned)numRecords > savedRecords.size())
624 return NDBT_FAILED;
625
626 int result = NDBT_OK;
627 for (int i = 0; i < numRecords; i++){
628 BaseString str = rows[i]->c_str();
629 ndbout << "row["<<i<<"]: " << str << endl;
630 ndbout << "sav["<<i<<"]: " << savedRecords[i] << endl;
631 if (savedRecords[i] == str){
632 ;
633 } else {
634 result = NDBT_FAILED;
635 }
636 }
637 return result;
638 }
639
640 void
refresh()641 HugoOperations::refresh() {
642 NdbTransaction * t = getTransaction();
643 if(t)
644 t->refresh();
645 }
646
indexReadRecords(Ndb *,const char * idxName,int recordNo,bool exclusive,int numRecords)647 int HugoOperations::indexReadRecords(Ndb*, const char * idxName, int recordNo,
648 bool exclusive,
649 int numRecords){
650
651 int a;
652 allocRows(numRecords);
653 int check;
654 for(int r=0; r < numRecords; r++){
655 NdbOperation* pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
656 if (pOp == NULL) {
657 ERR(pTrans->getNdbError());
658 return NDBT_FAILED;
659 }
660
661 if (exclusive == true)
662 check = pOp->readTupleExclusive();
663 else
664 check = pOp->readTuple();
665 if( check == -1 ) {
666 ERR(pTrans->getNdbError());
667 return NDBT_FAILED;
668 }
669
670 // Define primary keys
671 if (equalForRow(pOp, r+recordNo) != 0)
672 return NDBT_FAILED;
673
674 // Define attributes to read
675 for(a = 0; a<tab.getNoOfColumns(); a++){
676 if((rows[r]->attributeStore(a) =
677 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
678 ERR(pTrans->getNdbError());
679 return NDBT_FAILED;
680 }
681 }
682 }
683 return NDBT_OK;
684 }
685
686 int
indexUpdateRecord(Ndb *,const char * idxName,int recordNo,int numRecords,int updatesValue)687 HugoOperations::indexUpdateRecord(Ndb*,
688 const char * idxName,
689 int recordNo,
690 int numRecords,
691 int updatesValue){
692 int a;
693 allocRows(numRecords);
694 int check;
695 for(int r=0; r < numRecords; r++){
696 NdbOperation* pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
697 if (pOp == NULL) {
698 ERR(pTrans->getNdbError());
699 return NDBT_FAILED;
700 }
701
702 check = pOp->updateTuple();
703 if( check == -1 ) {
704 ERR(pTrans->getNdbError());
705 return NDBT_FAILED;
706 }
707
708 // Define primary keys
709 if (equalForRow(pOp, r+recordNo) != 0)
710 return NDBT_FAILED;
711
712 // Define attributes to update
713 for(a = 0; a<tab.getNoOfColumns(); a++){
714 if (tab.getColumn(a)->getPrimaryKey() == false){
715 if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
716 ERR(pTrans->getNdbError());
717 return NDBT_FAILED;
718 }
719 }
720 }
721 }
722 return NDBT_OK;
723 }
724
725 int
scanReadRecords(Ndb * pNdb,NdbScanOperation::LockMode lm,int records)726 HugoOperations::scanReadRecords(Ndb* pNdb, NdbScanOperation::LockMode lm,
727 int records){
728
729 allocRows(records);
730 NdbScanOperation * pOp = pTrans->getNdbScanOperation(tab.getName());
731
732 if(!pOp)
733 return -1;
734
735 if(pOp->readTuples(lm, 0, 1)){
736 return -1;
737 }
738
739 for(int a = 0; a<tab.getNoOfColumns(); a++){
740 if((rows[0]->attributeStore(a) =
741 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
742 ERR(pTrans->getNdbError());
743 return NDBT_FAILED;
744 }
745 }
746
747 RsPair p = {pOp, records};
748 m_result_sets.push_back(p);
749
750 return 0;
751 }
752
753 template class Vector<HugoOperations::RsPair>;
754