1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <HugoOperations.hpp>
26
27 #undef ERR
28 #define ERR(error) \
29 { \
30 const NdbError &_error= (error); \
31 if (!m_quiet) ERR_OUT(g_err, _error); \
32 }
33
startTransaction(Ndb * pNdb,const NdbDictionary::Table * table,const char * keyData,Uint32 keyLen)34 int HugoOperations::startTransaction(Ndb* pNdb,
35 const NdbDictionary::Table *table,
36 const char *keyData, Uint32 keyLen){
37
38 if (pTrans != NULL){
39 ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl;
40 return NDBT_FAILED;
41 }
42 pTrans = pNdb->startTransaction(table, keyData, keyLen);
43 if (pTrans == NULL) {
44 const NdbError err = pNdb->getNdbError();
45 ERR(err);
46 setNdbError(err);
47 return NDBT_FAILED;
48 }
49 return NDBT_OK;
50 }
51
setTransaction(NdbTransaction * new_trans,bool not_null_ok)52 int HugoOperations::setTransaction(NdbTransaction* new_trans, bool not_null_ok){
53
54 if (pTrans != NULL && !not_null_ok){
55 ndbout << "HugoOperations::startTransaction, pTrans != NULL" << endl;
56 return NDBT_FAILED;
57 }
58 pTrans = new_trans;
59 if (pTrans == NULL) {
60 return NDBT_FAILED;
61 }
62 return NDBT_OK;
63 }
64
65 void
setTransactionId(Uint64 id)66 HugoOperations::setTransactionId(Uint64 id){
67 if (pTrans != NULL){
68 pTrans->setTransactionId(id);
69 }
70 }
71
closeTransaction(Ndb * pNdb)72 int HugoOperations::closeTransaction(Ndb* pNdb){
73
74 UtilTransactions::closeTransaction(pNdb);
75
76 m_result_sets.clear();
77 m_executed_result_sets.clear();
78
79 return NDBT_OK;
80 }
81
getTransaction()82 NdbConnection* HugoOperations::getTransaction(){
83 return pTrans;
84 }
85
pkReadRecord(Ndb * pNdb,int recordNo,int numRecords,NdbOperation::LockMode lm,NdbOperation::LockMode * lmused)86 int HugoOperations::pkReadRecord(Ndb* pNdb,
87 int recordNo,
88 int numRecords,
89 NdbOperation::LockMode lm,
90 NdbOperation::LockMode *lmused){
91 int a;
92 allocRows(numRecords);
93 indexScans.clear();
94 int check;
95
96 NdbOperation* pOp = 0;
97 pIndexScanOp = 0;
98
99 for(int r=0; r < numRecords; r++){
100
101 if(pOp == 0)
102 {
103 pOp = getOperation(pTrans, NdbOperation::ReadRequest);
104 }
105 if (pOp == NULL) {
106 ERR(pTrans->getNdbError());
107 setNdbError(pTrans->getNdbError());
108 return NDBT_FAILED;
109 }
110
111 rand_lock_mode:
112 switch(lm){
113 case NdbOperation::LM_Read:
114 case NdbOperation::LM_Exclusive:
115 case NdbOperation::LM_CommittedRead:
116 case NdbOperation::LM_SimpleRead:
117 if (lmused)
118 * lmused = lm;
119 if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex)
120 {
121 if (pIndexScanOp == 0)
122 {
123 pIndexScanOp = ((NdbIndexScanOperation*)pOp);
124 bool mrrScan= (numRecords > 1);
125 Uint32 flags= mrrScan? NdbScanOperation::SF_MultiRange : 0;
126 check = pIndexScanOp->readTuples(lm, flags);
127 /* Record NdbIndexScanOperation ptr for later... */
128 indexScans.push_back(pIndexScanOp);
129 }
130 }
131 else
132 check = pOp->readTuple(lm);
133 break;
134 default:
135 lm = (NdbOperation::LockMode)((rand() >> 16) & 3);
136 goto rand_lock_mode;
137 }
138
139 if( check == -1 ) {
140 ERR(pTrans->getNdbError());
141 setNdbError(pTrans->getNdbError());
142 return NDBT_FAILED;
143 }
144
145 // Define primary keys
146 if (equalForRow(pOp, r+recordNo) != 0)
147 return NDBT_FAILED;
148
149 Uint32 partId;
150 /* Do we need to set the partitionId for this operation? */
151 if (getPartIdForRow(pOp, r+recordNo, partId))
152 {
153 g_info << "Setting operation partition Id" << endl;
154 pOp->setPartitionId(partId);
155 }
156
157 if(pIndexScanOp)
158 pIndexScanOp->end_of_bound(r);
159
160 if(r == 0 || pIndexScanOp == 0)
161 {
162 // Define attributes to read
163 for(a = 0; a<tab.getNoOfColumns(); a++){
164 if((rows[r]->attributeStore(a) =
165 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
166 ERR(pTrans->getNdbError());
167 setNdbError(pTrans->getNdbError());
168 return NDBT_FAILED;
169 }
170 }
171 }
172 /* Note pIndexScanOp will point to the 'last' index scan op
173 * we used. The full list is in the indexScans vector
174 */
175 pOp = pIndexScanOp;
176 }
177 return NDBT_OK;
178 }
179
pkReadRandRecord(Ndb * pNdb,int records,int numRecords,NdbOperation::LockMode lm,NdbOperation::LockMode * lmused)180 int HugoOperations::pkReadRandRecord(Ndb* pNdb,
181 int records,
182 int numRecords,
183 NdbOperation::LockMode lm,
184 NdbOperation::LockMode *lmused){
185 int a;
186 allocRows(numRecords);
187 indexScans.clear();
188 int check;
189
190 NdbOperation* pOp = 0;
191 pIndexScanOp = 0;
192
193 for(int r=0; r < numRecords; r++){
194
195 if(pOp == 0)
196 {
197 pOp = getOperation(pTrans, NdbOperation::ReadRequest);
198 }
199 if (pOp == NULL) {
200 ERR(pTrans->getNdbError());
201 setNdbError(pTrans->getNdbError());
202 return NDBT_FAILED;
203 }
204
205 rand_lock_mode:
206 switch(lm){
207 case NdbOperation::LM_Read:
208 case NdbOperation::LM_Exclusive:
209 case NdbOperation::LM_CommittedRead:
210 case NdbOperation::LM_SimpleRead:
211 if (lmused)
212 * lmused = lm;
213 if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex &&
214 pIndexScanOp == 0)
215 {
216 pIndexScanOp = ((NdbIndexScanOperation*)pOp);
217 check = pIndexScanOp->readTuples(lm);
218 /* Record NdbIndexScanOperation ptr for later... */
219 indexScans.push_back(pIndexScanOp);
220 }
221 else
222 check = pOp->readTuple(lm);
223 break;
224 default:
225 lm = (NdbOperation::LockMode)((rand() >> 16) & 3);
226 goto rand_lock_mode;
227 }
228
229 if( check == -1 ) {
230 ERR(pTrans->getNdbError());
231 setNdbError(pTrans->getNdbError());
232 return NDBT_FAILED;
233 }
234
235 int rowid= rand() % records;
236
237 // Define primary keys
238 if (equalForRow(pOp, rowid) != 0)
239 return NDBT_FAILED;
240
241 Uint32 partId;
242 /* Do we need to set the partitionId for this operation? */
243 if (getPartIdForRow(pOp, rowid, partId))
244 {
245 g_info << "Setting operation partition Id" << endl;
246 pOp->setPartitionId(partId);
247 }
248
249 if(pIndexScanOp)
250 pIndexScanOp->end_of_bound(r);
251
252 if(r == 0 || pIndexScanOp == 0)
253 {
254 // Define attributes to read
255 for(a = 0; a<tab.getNoOfColumns(); a++){
256 if((rows[r]->attributeStore(a) =
257 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
258 ERR(pTrans->getNdbError());
259 setNdbError(pTrans->getNdbError());
260 return NDBT_FAILED;
261 }
262 }
263 }
264 /* Note pIndexScanOp will point to the 'last' index scan op
265 * we used. The full list is in the indexScans vector
266 */
267 pOp = pIndexScanOp;
268 }
269 return NDBT_OK;
270 }
271
pkReadRecordLockHandle(Ndb * pNdb,Vector<const NdbLockHandle * > & lockHandles,int recordNo,int numRecords,NdbOperation::LockMode lm,NdbOperation::LockMode * lmused)272 int HugoOperations::pkReadRecordLockHandle(Ndb* pNdb,
273 Vector<const NdbLockHandle*>& lockHandles,
274 int recordNo,
275 int numRecords,
276 NdbOperation::LockMode lm,
277 NdbOperation::LockMode *lmused){
278 if (idx)
279 {
280 g_err << "ERROR : Cannot call pkReadRecordLockHandle on an index"
281 << endl;
282 return NDBT_FAILED;
283 }
284
285 /* If something other than LM_Read or LM_Exclusive is
286 * passed in then we'll choose, and PkReadRecord
287 * will update lm_used
288 */
289 while (lm != NdbOperation::LM_Read &&
290 lm != NdbOperation::LM_Exclusive)
291 {
292 lm = (NdbOperation::LockMode)((rand() >> 16) & 1);
293 }
294
295 const NdbOperation* prevOp = pTrans->getLastDefinedOperation();
296
297 int readRc = pkReadRecord(pNdb,
298 recordNo,
299 numRecords,
300 lm,
301 lmused);
302
303 if (readRc == NDBT_OK)
304 {
305 /* Now traverse operations added, requesting
306 * LockHandles
307 */
308 const NdbOperation* definedOp = (prevOp)? prevOp->next() :
309 pTrans->getFirstDefinedOperation();
310
311 while (definedOp)
312 {
313 /* Look away now */
314 const NdbLockHandle* lh =
315 (const_cast<NdbOperation*>(definedOp))->getLockHandle();
316
317 if (lh == NULL)
318 {
319 ERR(definedOp->getNdbError());
320 setNdbError(definedOp->getNdbError());
321 return NDBT_FAILED;
322 }
323
324 lockHandles.push_back(lh);
325 definedOp = definedOp->next();
326 }
327 }
328
329 return readRc;
330 }
331
pkUnlockRecord(Ndb * pNdb,Vector<const NdbLockHandle * > & lockHandles,int offset,int numRecords,NdbOperation::AbortOption ao)332 int HugoOperations::pkUnlockRecord(Ndb* pNdb,
333 Vector<const NdbLockHandle*>& lockHandles,
334 int offset,
335 int numRecords,
336 NdbOperation::AbortOption ao)
337 {
338 if (numRecords == ~(0))
339 {
340 numRecords = lockHandles.size() - offset;
341 }
342
343 if (lockHandles.size() < (unsigned)(offset + numRecords))
344 {
345 g_err << "ERROR : LockHandles size is "
346 << lockHandles.size()
347 << " offset ("
348 << offset
349 << ") and/or numRecords ("
350 << numRecords
351 << ") too large." << endl;
352 return NDBT_FAILED;
353 }
354
355 for (int i = 0; i < numRecords; i++)
356 {
357 const NdbLockHandle* lh = lockHandles[offset + i];
358 if (lh != NULL)
359 {
360 const NdbOperation* unlockOp = pTrans->unlock(lh, ao);
361
362 if (unlockOp == NULL)
363 {
364 ERR(pTrans->getNdbError());
365 setNdbError(pTrans->getNdbError());
366 return NDBT_FAILED;
367 }
368 }
369 else
370 {
371 g_err << "ERROR : LockHandle number "
372 << i+offset << " is NULL. "
373 << " offset is " << offset << endl;
374 return NDBT_FAILED;
375 }
376 }
377
378 return NDBT_OK;
379 }
380
pkUpdateRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)381 int HugoOperations::pkUpdateRecord(Ndb* pNdb,
382 int recordNo,
383 int numRecords,
384 int updatesValue){
385 allocRows(numRecords);
386 int check;
387 for(int r=0; r < numRecords; r++){
388 NdbOperation* pOp = getOperation(pTrans, NdbOperation::UpdateRequest);
389 if (pOp == NULL) {
390 ERR(pTrans->getNdbError());
391 setNdbError(pTrans->getNdbError());
392 return NDBT_FAILED;
393 }
394
395 check = pOp->updateTuple();
396 if( check == -1 ) {
397 ERR(pTrans->getNdbError());
398 setNdbError(pTrans->getNdbError());
399 return NDBT_FAILED;
400 }
401
402 if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK)
403 {
404 return NDBT_FAILED;
405 }
406
407 Uint32 partId;
408 if(getPartIdForRow(pOp, r+recordNo, partId))
409 pOp->setPartitionId(partId);
410
411 pOp->setAnyValue(getAnyValueForRowUpd(r+recordNo, updatesValue));
412
413 }
414 return NDBT_OK;
415 }
416
417 int
setValues(NdbOperation * pOp,int rowId,int updateId)418 HugoOperations::setValues(NdbOperation* pOp, int rowId, int updateId)
419 {
420 // Define primary keys
421 int a;
422 if (equalForRow(pOp, rowId) != 0)
423 return NDBT_FAILED;
424
425 for(a = 0; a<tab.getNoOfColumns(); a++){
426 if (tab.getColumn(a)->getPrimaryKey() == false){
427 if(setValueForAttr(pOp, a, rowId, updateId ) != 0){
428 ERR(pTrans->getNdbError());
429 setNdbError(pTrans->getNdbError());
430 return NDBT_FAILED;
431 }
432 }
433 }
434
435 return NDBT_OK;
436 }
437
pkInsertRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)438 int HugoOperations::pkInsertRecord(Ndb* pNdb,
439 int recordNo,
440 int numRecords,
441 int updatesValue){
442
443 int check;
444 for(int r=0; r < numRecords; r++){
445 NdbOperation* pOp = getOperation(pTrans, NdbOperation::InsertRequest);
446 if (pOp == NULL) {
447 ERR(pTrans->getNdbError());
448 setNdbError(pTrans->getNdbError());
449 return NDBT_FAILED;
450 }
451
452 check = pOp->insertTuple();
453 if( check == -1 ) {
454 ERR(pTrans->getNdbError());
455 setNdbError(pTrans->getNdbError());
456 return NDBT_FAILED;
457 }
458
459 if(setValues(pOp, r+recordNo, updatesValue) != NDBT_OK)
460 {
461 m_error.code = pTrans->getNdbError().code;
462 return NDBT_FAILED;
463 }
464
465 Uint32 partId;
466 if(getPartIdForRow(pOp, r+recordNo, partId))
467 pOp->setPartitionId(partId);
468
469 }
470 return NDBT_OK;
471 }
472
pkWriteRecord(Ndb * pNdb,int recordNo,int numRecords,int updatesValue)473 int HugoOperations::pkWriteRecord(Ndb* pNdb,
474 int recordNo,
475 int numRecords,
476 int updatesValue){
477 int a, check;
478 for(int r=0; r < numRecords; r++){
479 NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
480 if (pOp == NULL) {
481 ERR(pTrans->getNdbError());
482 setNdbError(pTrans->getNdbError());
483 return NDBT_FAILED;
484 }
485
486 check = pOp->writeTuple();
487 if( check == -1 ) {
488 ERR(pTrans->getNdbError());
489 setNdbError(pTrans->getNdbError());
490 return NDBT_FAILED;
491 }
492
493 // Define primary keys
494 if (equalForRow(pOp, r+recordNo) != 0)
495 return NDBT_FAILED;
496
497 Uint32 partId;
498 if(getPartIdForRow(pOp, r+recordNo, partId))
499 pOp->setPartitionId(partId);
500
501
502 // Define attributes to update
503 for(a = 0; a<tab.getNoOfColumns(); a++){
504 if (tab.getColumn(a)->getPrimaryKey() == false){
505 if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
506 ERR(pTrans->getNdbError());
507 setNdbError(pTrans->getNdbError());
508 return NDBT_FAILED;
509 }
510 }
511 }
512 }
513 return NDBT_OK;
514 }
515
pkWritePartialRecord(Ndb * pNdb,int recordNo,int numRecords)516 int HugoOperations::pkWritePartialRecord(Ndb* pNdb,
517 int recordNo,
518 int numRecords){
519
520 int check;
521 for(int r=0; r < numRecords; r++){
522 NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
523 if (pOp == NULL) {
524 ERR(pTrans->getNdbError());
525 setNdbError(pTrans->getNdbError());
526 return NDBT_FAILED;
527 }
528
529 check = pOp->writeTuple();
530 if( check == -1 ) {
531 ERR(pTrans->getNdbError());
532 setNdbError(pTrans->getNdbError());
533 return NDBT_FAILED;
534 }
535
536 // Define primary keys
537 if (equalForRow(pOp, r+recordNo) != 0)
538 return NDBT_FAILED;
539
540 Uint32 partId;
541 if(getPartIdForRow(pOp, r+recordNo, partId))
542 pOp->setPartitionId(partId);
543
544 }
545 return NDBT_OK;
546 }
547
pkDeleteRecord(Ndb * pNdb,int recordNo,int numRecords)548 int HugoOperations::pkDeleteRecord(Ndb* pNdb,
549 int recordNo,
550 int numRecords){
551
552 int check;
553 for(int r=0; r < numRecords; r++){
554 NdbOperation* pOp = getOperation(pTrans, NdbOperation::DeleteRequest);
555 if (pOp == NULL) {
556 ERR(pTrans->getNdbError());
557 setNdbError(pTrans->getNdbError());
558 return NDBT_FAILED;
559 }
560
561 check = pOp->deleteTuple();
562 if( check == -1 ) {
563 ERR(pTrans->getNdbError());
564 setNdbError(pTrans->getNdbError());
565 return NDBT_FAILED;
566 }
567
568 // Define primary keys
569 if (equalForRow(pOp, r+recordNo) != 0)
570 return NDBT_FAILED;
571
572 Uint32 partId;
573 if(getPartIdForRow(pOp, r+recordNo, partId))
574 pOp->setPartitionId(partId);
575 }
576 return NDBT_OK;
577 }
578
pkRefreshRecord(Ndb * pNdb,int recordNo,int numRecords,int anyValueInfo)579 int HugoOperations::pkRefreshRecord(Ndb* pNdb,
580 int recordNo,
581 int numRecords,
582 int anyValueInfo){
583
584 char buffer[NDB_MAX_TUPLE_SIZE];
585 const NdbDictionary::Table * pTab =
586 pNdb->getDictionary()->getTable(tab.getName());
587
588 if (pTab == 0)
589 {
590 return NDBT_FAILED;
591 }
592
593 const NdbRecord * record = pTab->getDefaultRecord();
594 NdbOperation::OperationOptions opts;
595 opts.optionsPresent = NdbOperation::OperationOptions::OO_ANYVALUE;
596 for(int r=0; r < numRecords; r++)
597 {
598 bzero(buffer, sizeof(buffer));
599 if (calc.equalForRow((Uint8*)buffer, record, r + recordNo))
600 {
601 return NDBT_FAILED;
602 }
603
604 opts.anyValue = anyValueInfo?
605 (anyValueInfo << 16) | (r+recordNo) :
606 0;
607
608 const NdbOperation* pOp = pTrans->refreshTuple(record, buffer,
609 &opts, sizeof(opts));
610 if (pOp == NULL)
611 {
612 ERR(pTrans->getNdbError());
613 setNdbError(pTrans->getNdbError());
614 return NDBT_FAILED;
615 }
616 }
617 return NDBT_OK;
618 }
619
execute_Commit(Ndb * pNdb,AbortOption eao)620 int HugoOperations::execute_Commit(Ndb* pNdb,
621 AbortOption eao){
622
623 int check = 0;
624 check = pTrans->execute(Commit, eao);
625
626 const NdbError err = pTrans->getNdbError();
627 if( check == -1 || err.code) {
628 ERR(err);
629 setNdbError(err);
630 NdbOperation* pOp = pTrans->getNdbErrorOperation();
631 if (pOp != NULL){
632 const NdbError err2 = pOp->getNdbError();
633 ERR(err2);
634 setNdbError(err2);
635 }
636 if (err.code == 0)
637 return NDBT_FAILED;
638 return err.code;
639 }
640
641 for(unsigned int i = 0; i<m_result_sets.size(); i++){
642 m_executed_result_sets.push_back(m_result_sets[i]);
643
644 int rows = m_result_sets[i].records;
645 NdbScanOperation* rs = m_result_sets[i].m_result_set;
646 int res = rs->nextResult();
647 switch(res){
648 case 1:
649 return 626;
650 case -1:
651 const NdbError err = pTrans->getNdbError();
652 ERR(err);
653 setNdbError(err);
654 return (err.code > 0 ? err.code : NDBT_FAILED);
655 }
656
657 // A row found
658
659 switch(rows){
660 case 0:
661 return 4000;
662 default:
663 m_result_sets[i].records--;
664 break;
665 }
666 }
667
668 m_result_sets.clear();
669
670 return NDBT_OK;
671 }
672
execute_NoCommit(Ndb * pNdb,AbortOption eao)673 int HugoOperations::execute_NoCommit(Ndb* pNdb, AbortOption eao){
674
675 int check;
676 check = pTrans->execute(NoCommit, eao);
677
678 const NdbError err = pTrans->getNdbError();
679 if( check == -1 || err.code) {
680 ERR(err);
681 setNdbError(err);
682 const NdbOperation* pOp = pTrans->getNdbErrorOperation();
683 while (pOp != NULL)
684 {
685 const NdbError err2 = pOp->getNdbError();
686 if (err2.code)
687 {
688 ERR(err2);
689 setNdbError(err2);
690 }
691 pOp = pTrans->getNextCompletedOperation(pOp);
692 }
693 if (err.code == 0)
694 return NDBT_FAILED;
695 return err.code;
696 }
697
698 for(unsigned int i = 0; i<m_result_sets.size(); i++){
699 m_executed_result_sets.push_back(m_result_sets[i]);
700
701 int rows = m_result_sets[i].records;
702 NdbScanOperation* rs = m_result_sets[i].m_result_set;
703 int res = rs->nextResult();
704 switch(res){
705 case 1:
706 return 626;
707 case -1:
708 const NdbError err = pTrans->getNdbError();
709 ERR(err);
710 setNdbError(err);
711 return (err.code > 0 ? err.code : NDBT_FAILED);
712 }
713
714 // A row found
715
716 switch(rows){
717 case 0:
718 return 4000;
719 default:
720 case 1:
721 break;
722 }
723 }
724
725 m_result_sets.clear();
726
727 return NDBT_OK;
728 }
729
execute_Rollback(Ndb * pNdb)730 int HugoOperations::execute_Rollback(Ndb* pNdb){
731 int check;
732 check = pTrans->execute(Rollback);
733 if( check == -1 ) {
734 const NdbError err = pTrans->getNdbError();
735 ERR(err);
736 setNdbError(err);
737 return NDBT_FAILED;
738 }
739 return NDBT_OK;
740 }
741
742 void
HugoOperations_async_callback(int res,NdbTransaction * pCon,void * ho)743 HugoOperations_async_callback(int res, NdbTransaction* pCon, void* ho)
744 {
745 ((HugoOperations*)ho)->callback(res, pCon);
746 }
747
748 void
callback(int res,NdbTransaction * pCon)749 HugoOperations::callback(int res, NdbTransaction* pCon)
750 {
751 assert(pCon == pTrans);
752 m_async_reply= 1;
753 if(res)
754 {
755 m_async_return = pCon->getNdbError().code;
756 }
757 else
758 {
759 m_async_return = 0;
760 }
761 }
762
763 int
execute_async(Ndb * pNdb,NdbTransaction::ExecType et,NdbOperation::AbortOption eao)764 HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
765 NdbOperation::AbortOption eao){
766
767 m_async_reply= 0;
768 pTrans->executeAsynchPrepare(et,
769 HugoOperations_async_callback,
770 this,
771 eao);
772
773 pNdb->sendPreparedTransactions();
774
775 return NDBT_OK;
776 }
777
778 int
execute_async_prepare(Ndb * pNdb,NdbTransaction::ExecType et,NdbOperation::AbortOption eao)779 HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et,
780 NdbOperation::AbortOption eao){
781
782 m_async_reply= 0;
783 pTrans->executeAsynchPrepare(et,
784 HugoOperations_async_callback,
785 this,
786 eao);
787
788 return NDBT_OK;
789 }
790
791 int
wait_async(Ndb * pNdb,int timeout)792 HugoOperations::wait_async(Ndb* pNdb, int timeout)
793 {
794 volatile int * wait = &m_async_reply;
795 while (!* wait)
796 {
797 pNdb->sendPollNdb(1000);
798
799 if(* wait)
800 {
801 if(m_async_return)
802 ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
803 return m_async_return;
804 }
805 }
806 ndbout_c("wait returned nothing...");
807 return -1;
808 }
809
HugoOperations(const NdbDictionary::Table & _tab,const NdbDictionary::Index * idx)810 HugoOperations::HugoOperations(const NdbDictionary::Table& _tab,
811 const NdbDictionary::Index* idx):
812 UtilTransactions(_tab, idx),
813 pIndexScanOp(NULL),
814 calc(_tab),
815 m_quiet(false),
816 avCallback(NULL)
817 {
818 }
819
~HugoOperations()820 HugoOperations::~HugoOperations(){
821 deallocRows();
822 if (pTrans != NULL)
823 {
824 pTrans->close();
825 pTrans = NULL;
826 }
827 }
828
829 int
equalForRow(NdbOperation * pOp,int row)830 HugoOperations::equalForRow(NdbOperation* pOp, int row)
831 {
832 for(int a = 0; a<tab.getNoOfColumns(); a++)
833 {
834 if (tab.getColumn(a)->getPrimaryKey() == true)
835 {
836 if(equalForAttr(pOp, a, row) != 0)
837 {
838 ERR(pOp->getNdbError());
839 setNdbError(pOp->getNdbError());
840 return NDBT_FAILED;
841 }
842 }
843 }
844 return NDBT_OK;
845 }
846
getPartIdForRow(const NdbOperation * pOp,int rowid,Uint32 & partId)847 bool HugoOperations::getPartIdForRow(const NdbOperation* pOp,
848 int rowid,
849 Uint32& partId)
850 {
851 if (tab.getFragmentType() == NdbDictionary::Object::UserDefined)
852 {
853 /* Primary keys and Ordered indexes are partitioned according
854 * to the row number
855 * PartitionId must be set for PK access. Ordered indexes
856 * can scan all partitions.
857 */
858 if (pOp->getType() == NdbOperation::PrimaryKeyAccess)
859 {
860 /* Need to set the partitionId for this op
861 * For Hugo, we use 'HASH' partitioning, which is probably
862 * better called 'MODULO' partitioning with
863 * FragId == rowNum % NumPartitions
864 * This gives a good balance with the normal Hugo data, but different
865 * row to partition assignments than normal key partitioning.
866 */
867 const Uint32 numFrags= tab.getFragmentCount();
868 partId= rowid % numFrags;
869 g_info << "Returning partition Id of " << partId << endl;
870 return true;
871 }
872 }
873 partId= ~0;
874 return false;
875 }
876
equalForAttr(NdbOperation * pOp,int attrId,int rowId)877 int HugoOperations::equalForAttr(NdbOperation* pOp,
878 int attrId,
879 int rowId){
880 const NdbDictionary::Column* attr = tab.getColumn(attrId);
881 if (attr->getPrimaryKey() == false){
882 g_info << "Can't call equalForAttr on non PK attribute" << endl;
883 return NDBT_FAILED;
884 }
885
886 int len = attr->getSizeInBytes();
887 char buf[NDB_MAX_TUPLE_SIZE];
888 memset(buf, 0, sizeof(buf));
889 Uint32 real_len;
890 const char * value = calc.calcValue(rowId, attrId, 0, buf, len, &real_len);
891 return pOp->equal( attr->getName(), value, real_len);
892 }
893
setValueForAttr(NdbOperation * pOp,int attrId,int rowId,int updateId)894 int HugoOperations::setValueForAttr(NdbOperation* pOp,
895 int attrId,
896 int rowId,
897 int updateId){
898 const NdbDictionary::Column* attr = tab.getColumn(attrId);
899
900 if (! (attr->getType() == NdbDictionary::Column::Blob))
901 {
902 int len = attr->getSizeInBytes();
903 char buf[NDB_MAX_TUPLE_SIZE];
904 memset(buf, 0, sizeof(buf));
905 Uint32 real_len;
906 const char * value = calc.calcValue(rowId, attrId,
907 updateId, buf, len, &real_len);
908 return pOp->setValue( attr->getName(), value, real_len);
909 }
910 else
911 {
912 char buf[32000];
913 int len = (int)sizeof(buf);
914 Uint32 real_len;
915 const char * value = calc.calcValue(rowId, attrId,
916 updateId, buf, len, &real_len);
917 NdbBlob * b = pOp->getBlobHandle(attrId);
918 if (b == 0)
919 return -1;
920
921 if (real_len == 0)
922 return b->setNull();
923 else
924 return b->setValue(value, real_len);
925 }
926 }
927
928 int
verifyUpdatesValue(int updatesValue,int _numRows)929 HugoOperations::verifyUpdatesValue(int updatesValue, int _numRows){
930 _numRows = (_numRows == 0 ? rows.size() : _numRows);
931
932 int result = NDBT_OK;
933
934 for(int i = 0; i<_numRows; i++){
935 if(calc.verifyRowValues(rows[i]) != NDBT_OK){
936 g_err << "Inconsistent row"
937 << endl << "\t" << rows[i]->c_str().c_str() << endl;
938 result = NDBT_FAILED;
939 continue;
940 }
941
942 if(calc.getUpdatesValue(rows[i]) != updatesValue){
943 result = NDBT_FAILED;
944 g_err << "Invalid updates value for row " << i << endl
945 << " updatesValue: " << updatesValue << endl
946 << " calc.getUpdatesValue: " << calc.getUpdatesValue(rows[i]) << endl
947 << rows[i]->c_str().c_str() << endl;
948 continue;
949 }
950 }
951
952 if(_numRows == 0){
953 g_err << "No rows -> Invalid updates value" << endl;
954 return NDBT_FAILED;
955 }
956
957 return result;
958 }
959
allocRows(int _numRows)960 void HugoOperations::allocRows(int _numRows){
961 if(_numRows <= 0){
962 g_info << "Illegal value for num rows : " << _numRows << endl;
963 abort();
964 }
965
966 for(int b=rows.size(); b<_numRows; b++){
967 rows.push_back(new NDBT_ResultRow(tab));
968 }
969 }
970
deallocRows()971 void HugoOperations::deallocRows(){
972 while(rows.size() > 0){
973 delete rows.back();
974 rows.erase(rows.size() - 1);
975 }
976 }
977
saveCopyOfRecord(int numRecords)978 int HugoOperations::saveCopyOfRecord(int numRecords ){
979
980 if (numRecords > (int)rows.size())
981 return NDBT_FAILED;
982
983 for (int i = 0; i < numRecords; i++){
984 savedRecords.push_back(rows[i]->c_str());
985 }
986 return NDBT_OK;
987 }
988
getRecordStr(int recordNum)989 BaseString HugoOperations::getRecordStr(int recordNum){
990 if (recordNum > (int)rows.size())
991 return NULL;
992 return rows[recordNum]->c_str();
993 }
994
getRecordGci(int recordNum)995 int HugoOperations::getRecordGci(int recordNum){
996 return pTrans->getGCI();
997 }
998
999
compareRecordToCopy(int numRecords)1000 int HugoOperations::compareRecordToCopy(int numRecords ){
1001 if (numRecords > (int)rows.size())
1002 return NDBT_FAILED;
1003 if ((unsigned)numRecords > savedRecords.size())
1004 return NDBT_FAILED;
1005
1006 int result = NDBT_OK;
1007 for (int i = 0; i < numRecords; i++){
1008 BaseString str = rows[i]->c_str();
1009 ndbout << "row["<<i<<"]: " << str << endl;
1010 ndbout << "sav["<<i<<"]: " << savedRecords[i] << endl;
1011 if (savedRecords[i] == str){
1012 ;
1013 } else {
1014 result = NDBT_FAILED;
1015 }
1016 }
1017 return result;
1018 }
1019
1020 void
refresh()1021 HugoOperations::refresh() {
1022 NdbTransaction * t = getTransaction();
1023 if(t)
1024 t->refresh();
1025 }
1026
indexReadRecords(Ndb *,const char * idxName,int recordNo,bool exclusive,int numRecords)1027 int HugoOperations::indexReadRecords(Ndb*, const char * idxName, int recordNo,
1028 bool exclusive,
1029 int numRecords){
1030
1031 int a;
1032 allocRows(numRecords);
1033 int check;
1034 for(int r=0; r < numRecords; r++){
1035 NdbOperation* pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
1036 if (pOp == NULL) {
1037 ERR(pTrans->getNdbError());
1038 setNdbError(pTrans->getNdbError());
1039 return NDBT_FAILED;
1040 }
1041
1042 if (exclusive == true)
1043 check = pOp->readTupleExclusive();
1044 else
1045 check = pOp->readTuple();
1046 if( check == -1 ) {
1047 ERR(pTrans->getNdbError());
1048 setNdbError(pTrans->getNdbError());
1049 return NDBT_FAILED;
1050 }
1051
1052 // Define primary keys
1053 if (equalForRow(pOp, r+recordNo) != 0)
1054 return NDBT_FAILED;
1055
1056 // Define attributes to read
1057 for(a = 0; a<tab.getNoOfColumns(); a++){
1058 if((rows[r]->attributeStore(a) =
1059 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1060 ERR(pTrans->getNdbError());
1061 setNdbError(pTrans->getNdbError());
1062 return NDBT_FAILED;
1063 }
1064 }
1065 }
1066 return NDBT_OK;
1067 }
1068
1069 int
indexUpdateRecord(Ndb *,const char * idxName,int recordNo,int numRecords,int updatesValue)1070 HugoOperations::indexUpdateRecord(Ndb*,
1071 const char * idxName,
1072 int recordNo,
1073 int numRecords,
1074 int updatesValue){
1075 int a;
1076 allocRows(numRecords);
1077 int check;
1078 for(int r=0; r < numRecords; r++){
1079 NdbOperation* pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
1080 if (pOp == NULL) {
1081 ERR(pTrans->getNdbError());
1082 setNdbError(pTrans->getNdbError());
1083 return NDBT_FAILED;
1084 }
1085
1086 check = pOp->updateTuple();
1087 if( check == -1 ) {
1088 ERR(pTrans->getNdbError());
1089 setNdbError(pTrans->getNdbError());
1090 return NDBT_FAILED;
1091 }
1092
1093 // Define primary keys
1094 if (equalForRow(pOp, r+recordNo) != 0)
1095 return NDBT_FAILED;
1096
1097 // Define attributes to update
1098 for(a = 0; a<tab.getNoOfColumns(); a++){
1099 if (tab.getColumn(a)->getPrimaryKey() == false){
1100 if(setValueForAttr(pOp, a, recordNo+r, updatesValue ) != 0){
1101 ERR(pTrans->getNdbError());
1102 setNdbError(pTrans->getNdbError());
1103 return NDBT_FAILED;
1104 }
1105 }
1106 }
1107 }
1108 return NDBT_OK;
1109 }
1110
1111 int
scanReadRecords(Ndb * pNdb,NdbScanOperation::LockMode lm,int records)1112 HugoOperations::scanReadRecords(Ndb* pNdb, NdbScanOperation::LockMode lm,
1113 int records){
1114
1115 allocRows(records);
1116 NdbScanOperation * pOp = pTrans->getNdbScanOperation(tab.getName());
1117
1118 if(!pOp)
1119 return -1;
1120
1121 if(pOp->readTuples(lm, 0, 1)){
1122 return -1;
1123 }
1124
1125 for(int a = 0; a<tab.getNoOfColumns(); a++){
1126 if((rows[0]->attributeStore(a) =
1127 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1128 ERR(pTrans->getNdbError());
1129 setNdbError(pTrans->getNdbError());
1130 return NDBT_FAILED;
1131 }
1132 }
1133
1134 RsPair p = {pOp, records};
1135 m_result_sets.push_back(p);
1136
1137 return 0;
1138 }
1139
1140 int
releaseLockHandles(Ndb * pNdb,Vector<const NdbLockHandle * > & lockHandles,int offset,int numRecords)1141 HugoOperations::releaseLockHandles(Ndb* pNdb,
1142 Vector<const NdbLockHandle*>& lockHandles,
1143 int offset,
1144 int numRecords)
1145 {
1146 int totalSize = lockHandles.size();
1147 if (numRecords == ~(0))
1148 {
1149 numRecords = totalSize - offset;
1150 }
1151
1152 if (totalSize < (offset + numRecords))
1153 {
1154 g_err << "ERROR : LockHandles size is "
1155 << lockHandles.size()
1156 << " offset ("
1157 << offset
1158 << ") and/or numRecords ("
1159 << numRecords
1160 << ") too large." << endl;
1161 return NDBT_FAILED;
1162 }
1163
1164 for (int i = 0; i < numRecords; i++)
1165 {
1166 const NdbLockHandle* lh = lockHandles[offset + i];
1167 if (lh != NULL)
1168 {
1169 if (pTrans->releaseLockHandle(lh) != 0)
1170 {
1171 ERR(pTrans->getNdbError());
1172 setNdbError(pTrans->getNdbError());
1173 return NDBT_FAILED;
1174 }
1175 const NdbLockHandle* nullPtr = NULL;
1176 (void)nullPtr; // ??
1177 //lockHandles.set(nullPtr, offset + i, nullPtr);
1178 }
1179 else
1180 {
1181 g_err << "ERROR : LockHandle number "
1182 << i+offset << " is NULL. "
1183 << " offset is " << offset << endl;
1184 return NDBT_FAILED;
1185 }
1186 }
1187
1188 return NDBT_OK;
1189
1190 }
1191
1192 static void
update(const NdbError & _err)1193 update(const NdbError & _err)
1194 {
1195 NdbError & error = (NdbError &) _err;
1196 ndberror_struct ndberror = (ndberror_struct)error;
1197 ndberror_update(&ndberror);
1198 error = NdbError(ndberror);
1199 }
1200
1201 const NdbError &
getNdbError() const1202 HugoOperations::getNdbError() const
1203 {
1204 update(m_error);
1205 return m_error;
1206 }
1207
1208 void
setNdbError(const NdbError & error)1209 HugoOperations::setNdbError(const NdbError& error)
1210 {
1211 m_error.code = error.code ? error.code : 1;
1212 }
1213
1214 void
setAnyValueCallback(AnyValueCallback avc)1215 HugoOperations::setAnyValueCallback(AnyValueCallback avc)
1216 {
1217 avCallback = avc;
1218 }
1219
1220 Uint32
getAnyValueForRowUpd(int row,int update)1221 HugoOperations::getAnyValueForRowUpd(int row, int update)
1222 {
1223 if (avCallback == NULL)
1224 return 0;
1225
1226 return (avCallback)(pTrans->getNdb(), pTrans,
1227 row, update);
1228 }
1229
1230 template class Vector<HugoOperations::RsPair>;
1231 template class Vector<const NdbLockHandle*>;
1232