1 /* Copyright (c) 2003-2005, 2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #include <NdbSleep.h>
18 #include <HugoAsynchTransactions.hpp>
19
HugoAsynchTransactions(const NdbDictionary::Table & _t)20 HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _t)
21 : HugoTransactions(_t),
22 transactionsCompleted(0),
23 numTransactions(0),
24 transactions(NULL)
25 {
26 }
27
~HugoAsynchTransactions()28 HugoAsynchTransactions::~HugoAsynchTransactions(){
29 deallocTransactions();
30 }
31
asynchCallback(int result,NdbConnection * pTrans,void * anObject)32 void asynchCallback(int result, NdbConnection* pTrans,
33 void* anObject) {
34 HugoAsynchTransactions* pHugo = (HugoAsynchTransactions*) anObject;
35
36 pHugo->transactionCompleted();
37
38 if (result == -1) {
39 const NdbError err = pTrans->getNdbError();
40 switch(err.status) {
41 case NdbError::Success:
42 ERR(err);
43 g_info << "ERROR: NdbError reports success when transcaction failed"
44 << endl;
45 break;
46
47 case NdbError::TemporaryError:
48 ERR(err);
49 break;
50
51 #if 0
52 case 626: // Tuple did not exist
53 g_info << (unsigned int)pHugo->getTransactionsCompleted() << ": "
54 << err.code << " " << err.message << endl;
55 break;
56 #endif
57
58 case NdbError::UnknownResult:
59 ERR(err);
60 break;
61
62 case NdbError::PermanentError:
63 switch (err.classification) {
64 case NdbError::ConstraintViolation:
65 // Tuple already existed, OK in this application,
66 // but should be reported
67 g_info << (unsigned int)pHugo->getTransactionsCompleted()
68 << ": " << err.code << " " << err.message << endl;
69 break;
70 default:
71 ERR(err);
72 break;
73 }
74 break;
75 }
76 } else {// if (result == -1)
77 /*
78 ndbout << (unsigned int)pHugo->getTransactionsCompleted() << " completed"
79 << endl;
80 */
81 }
82 }
83
84 int
loadTableAsynch(Ndb * pNdb,int records,int batch,int trans,int operations)85 HugoAsynchTransactions::loadTableAsynch(Ndb* pNdb,
86 int records,
87 int batch,
88 int trans,
89 int operations){
90
91 int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
92 NO_INSERT);
93 g_info << (unsigned int)transactionsCompleted * operations
94 << "|- inserted..." << endl;
95
96 return result;
97 }
98
99 void
transactionCompleted()100 HugoAsynchTransactions::transactionCompleted() {
101 transactionsCompleted++;
102 }
103
104 long
getTransactionsCompleted()105 HugoAsynchTransactions::getTransactionsCompleted() {
106 return transactionsCompleted;
107 }
108
109 int
pkDelRecordsAsynch(Ndb * pNdb,int records,int batch,int trans,int operations)110 HugoAsynchTransactions::pkDelRecordsAsynch(Ndb* pNdb,
111 int records,
112 int batch,
113 int trans,
114 int operations) {
115
116 g_info << "|- Deleting records asynchronous..." << endl;
117
118 int result = executeAsynchOperation(pNdb, records, batch, trans,
119 operations,
120 NO_DELETE);
121 g_info << "|- " << (unsigned int)transactionsCompleted * operations
122 << " deleted..." << endl;
123
124 return result;
125 }
126
127 int
pkReadRecordsAsynch(Ndb * pNdb,int records,int batch,int trans,int operations)128 HugoAsynchTransactions::pkReadRecordsAsynch(Ndb* pNdb,
129 int records,
130 int batch,
131 int trans,
132 int operations) {
133
134 g_info << "|- Reading records asynchronous..." << endl;
135
136 allocRows(trans*operations);
137 int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
138 NO_READ);
139
140 g_info << "|- " << (unsigned int)transactionsCompleted * operations
141 << " read..."
142 << endl;
143
144 deallocRows();
145
146 return result;
147 }
148
149 int
pkUpdateRecordsAsynch(Ndb * pNdb,int records,int batch,int trans,int operations)150 HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb,
151 int records,
152 int batch,
153 int trans,
154 int operations) {
155
156 g_info << "|- Updating records asynchronous..." << endl;
157
158 int check = 0;
159 int cTrans = 0;
160 int cReadRecords = 0;
161 int cReadIndex = 0;
162 int cRecords = 0;
163 int cIndex = 0;
164
165 transactionsCompleted = 0;
166
167 allocRows(trans*operations);
168 allocTransactions(trans);
169 int a, t, r;
170
171 for (int i = 0; i < batch; i++) { // For each batch
172 while (cRecords < records*batch) {
173 cTrans = 0;
174 cReadIndex = 0;
175 for (t = 0; t < trans; t++) { // For each transaction
176 transactions[t] = pNdb->startTransaction();
177 if (transactions[t] == NULL) {
178 ERR(pNdb->getNdbError());
179 return NDBT_FAILED;
180 }
181 for (int k = 0; k < operations; k++) { // For each operation
182 NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
183 if (pOp == NULL) {
184 ERR(transactions[t]->getNdbError());
185 pNdb->closeTransaction(transactions[t]);
186 return NDBT_FAILED;
187 }
188
189 // Read
190 // Define primary keys
191 check = pOp->readTupleExclusive();
192 if (equalForRow(pOp, cReadRecords) != 0)
193 {
194 ERR(transactions[t]->getNdbError());
195 pNdb->closeTransaction(transactions[t]);
196 return NDBT_FAILED;
197 }
198 // Define attributes to read
199 for (a = 0; a < tab.getNoOfColumns(); a++) {
200 if ((rows[cReadIndex]->attributeStore(a) =
201 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
202 ERR(transactions[t]->getNdbError());
203 pNdb->closeTransaction(transactions[t]);
204 return NDBT_FAILED;
205 }
206 }
207 cReadIndex++;
208 cReadRecords++;
209
210 } // For each operation
211
212 // Let's prepare...
213 transactions[t]->executeAsynchPrepare(NoCommit, &asynchCallback,
214 this);
215 cTrans++;
216
217 if (cReadRecords >= records) {
218 // No more transactions needed
219 break;
220 }
221 } // For each transaction
222
223 // Wait for all outstanding transactions
224 pNdb->sendPollNdb(3000, 0, 0);
225
226 // Verify the data!
227 for (r = 0; r < trans*operations; r++) {
228 if (calc.verifyRowValues(rows[r]) != 0) {
229 g_info << "|- Verify failed..." << endl;
230 // Close all transactions
231 for (int t = 0; t < cTrans; t++) {
232 pNdb->closeTransaction(transactions[t]);
233 }
234 return NDBT_FAILED;
235 }
236 }
237
238 // Update
239 cTrans = 0;
240 cIndex = 0;
241 for (t = 0; t < trans; t++) { // For each transaction
242 for (int k = 0; k < operations; k++) { // For each operation
243 NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
244 if (pOp == NULL) {
245 ERR(transactions[t]->getNdbError());
246 pNdb->closeTransaction(transactions[t]);
247 return NDBT_FAILED;
248 }
249
250 int updates = calc.getUpdatesValue(rows[cIndex]) + 1;
251
252 check = pOp->updateTuple();
253 if (check == -1) {
254 ERR(transactions[t]->getNdbError());
255 pNdb->closeTransaction(transactions[t]);
256 return NDBT_FAILED;
257 }
258
259 // Set search condition for the record
260 if (equalForRow(pOp, cReadRecords) != 0)
261 {
262 ERR(transactions[t]->getNdbError());
263 pNdb->closeTransaction(transactions[t]);
264 return NDBT_FAILED;
265 }
266
267 // Update the record
268 for (a = 0; a < tab.getNoOfColumns(); a++) {
269 if (tab.getColumn(a)->getPrimaryKey() == false) {
270 if (setValueForAttr(pOp, a, cRecords, updates) != 0) {
271 ERR(transactions[t]->getNdbError());
272 pNdb->closeTransaction(transactions[t]);
273 return NDBT_FAILED;
274 }
275 }
276 }
277 cIndex++;
278 cRecords++;
279
280 } // For each operation
281
282 // Let's prepare...
283 transactions[t]->executeAsynchPrepare(Commit, &asynchCallback,
284 this);
285 cTrans++;
286
287 if (cRecords >= records) {
288 // No more transactions needed
289 break;
290 }
291 } // For each transaction
292
293 // Wait for all outstanding transactions
294 pNdb->sendPollNdb(3000, 0, 0);
295
296 // Close all transactions
297 for (t = 0; t < cTrans; t++) {
298 pNdb->closeTransaction(transactions[t]);
299 }
300
301 } // while (cRecords < records*batch)
302
303 } // For each batch
304
305 deallocTransactions();
306 deallocRows();
307
308 g_info << "|- " << ((unsigned int)transactionsCompleted * operations)/2
309 << " updated..." << endl;
310 return NDBT_OK;
311 }
312
313 void
allocTransactions(int trans)314 HugoAsynchTransactions::allocTransactions(int trans) {
315 if (transactions != NULL) {
316 deallocTransactions();
317 }
318 numTransactions = trans;
319 transactions = new NdbConnection*[numTransactions];
320 }
321
322 void
deallocTransactions()323 HugoAsynchTransactions::deallocTransactions() {
324 if (transactions != NULL){
325 delete[] transactions;
326 }
327 transactions = NULL;
328 }
329
330 int
executeAsynchOperation(Ndb * pNdb,int records,int batch,int trans,int operations,NDB_OPERATION theOperation,ExecType theType)331 HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb,
332 int records,
333 int batch,
334 int trans,
335 int operations,
336 NDB_OPERATION theOperation,
337 ExecType theType) {
338
339 int check = 0;
340 // int retryAttempt = 0; // Not used at the moment
341 // int retryMax = 5; // Not used at the moment
342 int cTrans = 0;
343 int cRecords = 0;
344 int cIndex = 0;
345 int a,t,r;
346
347 transactionsCompleted = 0;
348 allocTransactions(trans);
349
350 for (int i = 0; i < batch; i++) { // For each batch
351 while (cRecords < records*batch) {
352 cTrans = 0;
353 cIndex = 0;
354 for (t = 0; t < trans; t++) { // For each transaction
355 transactions[t] = pNdb->startTransaction();
356 if (transactions[t] == NULL) {
357 ERR(pNdb->getNdbError());
358 return NDBT_FAILED;
359 }
360 for (int k = 0; k < operations; k++) { // For each operation
361 NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
362 if (pOp == NULL) {
363 ERR(transactions[t]->getNdbError());
364 pNdb->closeTransaction(transactions[t]);
365 return NDBT_FAILED;
366 }
367
368 switch (theOperation) {
369 case NO_INSERT:
370 // Insert
371 check = pOp->insertTuple();
372 if (check == -1) {
373 ERR(transactions[t]->getNdbError());
374 pNdb->closeTransaction(transactions[t]);
375 return NDBT_FAILED;
376 }
377
378 // Set a calculated value for each attribute in this table
379 for (a = 0; a < tab.getNoOfColumns(); a++) {
380 if (setValueForAttr(pOp, a, cRecords, 0 ) != 0) {
381 ERR(transactions[t]->getNdbError());
382 pNdb->closeTransaction(transactions[t]);
383 return NDBT_FAILED;
384 }
385 } // For each attribute
386 break;
387 case NO_UPDATE:
388 // This is a special case and is handled in the calling client...
389 break;
390 break;
391 case NO_READ:
392 // Define primary keys
393 check = pOp->readTuple();
394 if (equalForRow(pOp, cRecords) != 0)
395 {
396 ERR(transactions[t]->getNdbError());
397 pNdb->closeTransaction(transactions[t]);
398 return NDBT_FAILED;
399 }
400 // Define attributes to read
401 for (a = 0; a < tab.getNoOfColumns(); a++) {
402 if ((rows[cIndex]->attributeStore(a) =
403 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
404 ERR(transactions[t]->getNdbError());
405 pNdb->closeTransaction(transactions[t]);
406 return NDBT_FAILED;
407 }
408 }
409 break;
410 case NO_DELETE:
411 // Delete
412 check = pOp->deleteTuple();
413 if (check == -1) {
414 ERR(transactions[t]->getNdbError());
415 pNdb->closeTransaction(transactions[t]);
416 return NDBT_FAILED;
417 }
418
419 // Define primary keys
420 if (equalForRow(pOp, cRecords) != 0)
421 {
422 ERR(transactions[t]->getNdbError());
423 pNdb->closeTransaction(transactions[t]);
424 return NDBT_FAILED;
425 }
426 break;
427 default:
428 // Should not happen...
429 pNdb->closeTransaction(transactions[t]);
430 return NDBT_FAILED;
431 }
432
433 cIndex++;
434 cRecords++;
435
436 } // For each operation
437
438 // Let's prepare...
439 transactions[t]->executeAsynchPrepare(theType, &asynchCallback,
440 this);
441 cTrans++;
442
443 if (cRecords >= records) {
444 // No more transactions needed
445 break;
446 }
447 } // For each transaction
448
449 // Wait for all outstanding transactions
450 pNdb->sendPollNdb(3000, 0, 0);
451
452 // ugly... it's starts to resemble flexXXX ...:(
453 switch (theOperation) {
454 case NO_READ:
455 // Verify the data!
456 for (r = 0; r < trans*operations; r++) {
457 if (calc.verifyRowValues(rows[r]) != 0) {
458 g_info << "|- Verify failed..." << endl;
459 // Close all transactions
460 for (int t = 0; t < cTrans; t++) {
461 pNdb->closeTransaction(transactions[t]);
462 }
463 return NDBT_FAILED;
464 }
465 }
466 break;
467 case NO_INSERT:
468 case NO_UPDATE:
469 case NO_DELETE:
470 break;
471 }
472
473 // Close all transactions
474 for (t = 0; t < cTrans; t++) {
475 pNdb->closeTransaction(transactions[t]);
476 }
477
478 } // while (cRecords < records*batch)
479
480 } // For each batch
481
482 deallocTransactions();
483
484 return NDBT_OK;
485
486 }
487