1 /* Copyright (c) 2003-2005, 2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #include <NdbSleep.h>
18 #include <HugoAsynchTransactions.hpp>
19 
HugoAsynchTransactions(const NdbDictionary::Table & _t)20 HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _t)
21   : HugoTransactions(_t),
22     transactionsCompleted(0),
23     numTransactions(0),
24     transactions(NULL)
25 {
26 }
27 
~HugoAsynchTransactions()28 HugoAsynchTransactions::~HugoAsynchTransactions(){
29   deallocTransactions();
30 }
31 
asynchCallback(int result,NdbConnection * pTrans,void * anObject)32 void asynchCallback(int result, NdbConnection* pTrans,
33 		    void* anObject) {
34   HugoAsynchTransactions* pHugo = (HugoAsynchTransactions*) anObject;
35 
36   pHugo->transactionCompleted();
37 
38   if (result == -1) {
39     const NdbError err = pTrans->getNdbError();
40     switch(err.status) {
41     case NdbError::Success:
42       ERR(err);
43       g_info << "ERROR: NdbError reports success when transcaction failed"
44 	     << endl;
45       break;
46 
47     case NdbError::TemporaryError:
48       ERR(err);
49       break;
50 
51 #if 0
52     case 626: // Tuple did not exist
53       g_info << (unsigned int)pHugo->getTransactionsCompleted() << ": "
54 	     << err.code << " " << err.message << endl;
55       break;
56 #endif
57 
58     case NdbError::UnknownResult:
59       ERR(err);
60       break;
61 
62     case NdbError::PermanentError:
63       switch (err.classification) {
64       case NdbError::ConstraintViolation:
65 	// Tuple already existed, OK in this application,
66 	// but should be reported
67 	g_info << (unsigned int)pHugo->getTransactionsCompleted()
68 	       << ": " << err.code << " " << err.message << endl;
69 	break;
70       default:
71 	ERR(err);
72 	break;
73       }
74       break;
75     }
76   } else {// if (result == -1)
77     /*
78     ndbout << (unsigned int)pHugo->getTransactionsCompleted() << " completed"
79 	   << endl;
80     */
81   }
82 }
83 
84 int
loadTableAsynch(Ndb * pNdb,int records,int batch,int trans,int operations)85 HugoAsynchTransactions::loadTableAsynch(Ndb* pNdb,
86 				  int records,
87 				  int batch,
88 				  int trans,
89 				  int operations){
90 
91   int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
92 				      NO_INSERT);
93   g_info << (unsigned int)transactionsCompleted * operations
94 	 << "|- inserted..." << endl;
95 
96   return result;
97 }
98 
99 void
transactionCompleted()100 HugoAsynchTransactions::transactionCompleted() {
101   transactionsCompleted++;
102 }
103 
104 long
getTransactionsCompleted()105 HugoAsynchTransactions::getTransactionsCompleted() {
106   return transactionsCompleted;
107 }
108 
109 int
pkDelRecordsAsynch(Ndb * pNdb,int records,int batch,int trans,int operations)110 HugoAsynchTransactions::pkDelRecordsAsynch(Ndb* pNdb,
111 				     int records,
112 				     int batch,
113 				     int trans,
114 				     int operations) {
115 
116   g_info << "|- Deleting records asynchronous..." << endl;
117 
118   int result =  executeAsynchOperation(pNdb, records, batch, trans,
119 				       operations,
120 				       NO_DELETE);
121   g_info << "|- " << (unsigned int)transactionsCompleted * operations
122 	 << " deleted..." << endl;
123 
124   return result;
125 }
126 
127 int
pkReadRecordsAsynch(Ndb * pNdb,int records,int batch,int trans,int operations)128 HugoAsynchTransactions::pkReadRecordsAsynch(Ndb* pNdb,
129 				      int records,
130 				      int batch,
131 				      int trans,
132 				      int operations) {
133 
134   g_info << "|- Reading records asynchronous..." << endl;
135 
136   allocRows(trans*operations);
137   int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
138 				      NO_READ);
139 
140   g_info << "|- " << (unsigned int)transactionsCompleted * operations
141 	 << " read..."
142 	 << endl;
143 
144   deallocRows();
145 
146   return result;
147 }
148 
149 int
pkUpdateRecordsAsynch(Ndb * pNdb,int records,int batch,int trans,int operations)150 HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb,
151 					int records,
152 					int batch,
153 					int trans,
154 					int operations) {
155 
156   g_info << "|- Updating records asynchronous..." << endl;
157 
158   int             check = 0;
159   int             cTrans = 0;
160   int             cReadRecords = 0;
161   int             cReadIndex = 0;
162   int             cRecords = 0;
163   int             cIndex = 0;
164 
165   transactionsCompleted = 0;
166 
167   allocRows(trans*operations);
168   allocTransactions(trans);
169   int a, t, r;
170 
171   for (int i = 0; i < batch; i++) { // For each batch
172     while (cRecords < records*batch) {
173       cTrans = 0;
174       cReadIndex = 0;
175       for (t = 0; t < trans; t++) { // For each transaction
176 	transactions[t] = pNdb->startTransaction();
177 	if (transactions[t] == NULL) {
178 	  ERR(pNdb->getNdbError());
179 	  return NDBT_FAILED;
180 	}
181 	for (int k = 0; k < operations; k++) { // For each operation
182 	  NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
183 	  if (pOp == NULL) {
184 	    ERR(transactions[t]->getNdbError());
185 	    pNdb->closeTransaction(transactions[t]);
186 	    return NDBT_FAILED;
187 	  }
188 
189 	  // Read
190 	  // Define primary keys
191 	  check = pOp->readTupleExclusive();
192           if (equalForRow(pOp, cReadRecords) != 0)
193           {
194             ERR(transactions[t]->getNdbError());
195             pNdb->closeTransaction(transactions[t]);
196             return NDBT_FAILED;
197 	  }
198 	  // Define attributes to read
199 	  for (a = 0; a < tab.getNoOfColumns(); a++) {
200 	    if ((rows[cReadIndex]->attributeStore(a) =
201 		 pOp->getValue(tab.getColumn(a)->getName())) == 0) {
202 	      ERR(transactions[t]->getNdbError());
203 	      pNdb->closeTransaction(transactions[t]);
204 	      return NDBT_FAILED;
205 	    }
206 	  }
207 	  cReadIndex++;
208 	  cReadRecords++;
209 
210 	} // For each operation
211 
212 	// Let's prepare...
213 	transactions[t]->executeAsynchPrepare(NoCommit, &asynchCallback,
214 					this);
215 	cTrans++;
216 
217 	if (cReadRecords >= records) {
218 	  // No more transactions needed
219 	  break;
220 	}
221       } // For each transaction
222 
223       // Wait for all outstanding transactions
224       pNdb->sendPollNdb(3000, 0, 0);
225 
226       // Verify the data!
227       for (r = 0; r < trans*operations; r++) {
228 	if (calc.verifyRowValues(rows[r]) != 0) {
229 	  g_info << "|- Verify failed..." << endl;
230 	  // Close all transactions
231 	  for (int t = 0; t < cTrans; t++) {
232 	    pNdb->closeTransaction(transactions[t]);
233 	  }
234 	  return NDBT_FAILED;
235 	}
236       }
237 
238       // Update
239       cTrans = 0;
240       cIndex = 0;
241       for (t = 0; t < trans; t++) { // For each transaction
242 	for (int k = 0; k < operations; k++) { // For each operation
243 	  NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
244 	  if (pOp == NULL) {
245 	    ERR(transactions[t]->getNdbError());
246 	    pNdb->closeTransaction(transactions[t]);
247 	    return NDBT_FAILED;
248 	  }
249 
250 	  int updates = calc.getUpdatesValue(rows[cIndex]) + 1;
251 
252 	  check = pOp->updateTuple();
253 	  if (check == -1) {
254 	    ERR(transactions[t]->getNdbError());
255 	    pNdb->closeTransaction(transactions[t]);
256 	      return NDBT_FAILED;
257 	  }
258 
259 	  // Set search condition for the record
260           if (equalForRow(pOp, cReadRecords) != 0)
261           {
262             ERR(transactions[t]->getNdbError());
263             pNdb->closeTransaction(transactions[t]);
264             return NDBT_FAILED;
265 	  }
266 
267 	  // Update the record
268 	  for (a = 0; a < tab.getNoOfColumns(); a++) {
269 	    if (tab.getColumn(a)->getPrimaryKey() == false) {
270 	      if (setValueForAttr(pOp, a, cRecords, updates) != 0) {
271 		ERR(transactions[t]->getNdbError());
272 		pNdb->closeTransaction(transactions[t]);
273 		return NDBT_FAILED;
274 	      }
275 	    }
276 	  }
277 	  cIndex++;
278 	  cRecords++;
279 
280 	} // For each operation
281 
282 	// Let's prepare...
283 	transactions[t]->executeAsynchPrepare(Commit, &asynchCallback,
284 					this);
285 	cTrans++;
286 
287 	if (cRecords >= records) {
288 	  // No more transactions needed
289 	  break;
290 	}
291       } // For each transaction
292 
293       // Wait for all outstanding transactions
294       pNdb->sendPollNdb(3000, 0, 0);
295 
296       // Close all transactions
297       for (t = 0; t < cTrans; t++) {
298 	pNdb->closeTransaction(transactions[t]);
299       }
300 
301     } // while (cRecords < records*batch)
302 
303   } // For each batch
304 
305   deallocTransactions();
306   deallocRows();
307 
308   g_info << "|- " << ((unsigned int)transactionsCompleted * operations)/2
309 	 << " updated..." << endl;
310   return NDBT_OK;
311 }
312 
313 void
allocTransactions(int trans)314 HugoAsynchTransactions::allocTransactions(int trans) {
315   if (transactions != NULL) {
316     deallocTransactions();
317   }
318   numTransactions = trans;
319   transactions = new NdbConnection*[numTransactions];
320 }
321 
322 void
deallocTransactions()323 HugoAsynchTransactions::deallocTransactions() {
324   if (transactions != NULL){
325     delete[] transactions;
326   }
327   transactions = NULL;
328 }
329 
330 int
executeAsynchOperation(Ndb * pNdb,int records,int batch,int trans,int operations,NDB_OPERATION theOperation,ExecType theType)331 HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb,
332 					 int records,
333 					 int batch,
334 					 int trans,
335 					 int operations,
336 					 NDB_OPERATION theOperation,
337 					 ExecType theType) {
338 
339   int             check = 0;
340   //  int             retryAttempt = 0;  // Not used at the moment
341   //  int             retryMax = 5;      // Not used at the moment
342   int             cTrans = 0;
343   int             cRecords = 0;
344   int             cIndex = 0;
345   int a,t,r;
346 
347   transactionsCompleted = 0;
348   allocTransactions(trans);
349 
350   for (int i = 0; i < batch; i++) { // For each batch
351     while (cRecords < records*batch) {
352       cTrans = 0;
353       cIndex = 0;
354       for (t = 0; t < trans; t++) { // For each transaction
355 	transactions[t] = pNdb->startTransaction();
356 	if (transactions[t] == NULL) {
357 	  ERR(pNdb->getNdbError());
358 	  return NDBT_FAILED;
359 	}
360 	for (int k = 0; k < operations; k++) { // For each operation
361 	  NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
362 	  if (pOp == NULL) {
363 	    ERR(transactions[t]->getNdbError());
364 	    pNdb->closeTransaction(transactions[t]);
365 	    return NDBT_FAILED;
366 	  }
367 
368 	  switch (theOperation) {
369 	  case NO_INSERT:
370 	    // Insert
371 	    check = pOp->insertTuple();
372 	    if (check == -1) {
373 	      ERR(transactions[t]->getNdbError());
374 	      pNdb->closeTransaction(transactions[t]);
375 	      return NDBT_FAILED;
376 	    }
377 
378 	    // Set a calculated value for each attribute in this table
379 	    for (a = 0; a < tab.getNoOfColumns(); a++) {
380 	      if (setValueForAttr(pOp, a, cRecords, 0 ) != 0) {
381 		ERR(transactions[t]->getNdbError());
382 		pNdb->closeTransaction(transactions[t]);
383 		return NDBT_FAILED;
384 	      }
385 	    } // For each attribute
386 	    break;
387 	  case NO_UPDATE:
388 	    // This is a special case and is handled in the calling client...
389 	    break;
390 	  break;
391 	  case NO_READ:
392 	    // Define primary keys
393 	    check = pOp->readTuple();
394             if (equalForRow(pOp, cRecords) != 0)
395             {
396               ERR(transactions[t]->getNdbError());
397               pNdb->closeTransaction(transactions[t]);
398               return NDBT_FAILED;
399             }
400 	    // Define attributes to read
401 	    for (a = 0; a < tab.getNoOfColumns(); a++) {
402 	      if ((rows[cIndex]->attributeStore(a) =
403 		   pOp->getValue(tab.getColumn(a)->getName())) == 0) {
404 		ERR(transactions[t]->getNdbError());
405 		pNdb->closeTransaction(transactions[t]);
406 		return NDBT_FAILED;
407 	      }
408 	    }
409 	    break;
410 	  case NO_DELETE:
411 	    // Delete
412 	    check = pOp->deleteTuple();
413 	    if (check == -1) {
414 	      ERR(transactions[t]->getNdbError());
415 	      pNdb->closeTransaction(transactions[t]);
416 	      return NDBT_FAILED;
417 	    }
418 
419 	    // Define primary keys
420             if (equalForRow(pOp, cRecords) != 0)
421             {
422               ERR(transactions[t]->getNdbError());
423               pNdb->closeTransaction(transactions[t]);
424               return NDBT_FAILED;
425             }
426 	    break;
427 	  default:
428 	    // Should not happen...
429 	    pNdb->closeTransaction(transactions[t]);
430 	    return NDBT_FAILED;
431 	  }
432 
433 	  cIndex++;
434 	  cRecords++;
435 
436 	} // For each operation
437 
438 	// Let's prepare...
439 	transactions[t]->executeAsynchPrepare(theType, &asynchCallback,
440 					this);
441 	cTrans++;
442 
443 	if (cRecords >= records) {
444 	  // No more transactions needed
445 	  break;
446 	}
447       } // For each transaction
448 
449       // Wait for all outstanding transactions
450       pNdb->sendPollNdb(3000, 0, 0);
451 
452       // ugly... it's starts to resemble flexXXX ...:(
453       switch (theOperation) {
454       case NO_READ:
455 	// Verify the data!
456 	for (r = 0; r < trans*operations; r++) {
457 	  if (calc.verifyRowValues(rows[r]) != 0) {
458 	    g_info << "|- Verify failed..." << endl;
459 	    // Close all transactions
460 	    for (int t = 0; t < cTrans; t++) {
461 	      pNdb->closeTransaction(transactions[t]);
462 	    }
463 	    return NDBT_FAILED;
464 	  }
465 	}
466 	break;
467       case NO_INSERT:
468       case NO_UPDATE:
469       case NO_DELETE:
470 	break;
471       }
472 
473       // Close all transactions
474       for (t = 0; t < cTrans; t++) {
475 	pNdb->closeTransaction(transactions[t]);
476       }
477 
478     } // while (cRecords < records*batch)
479 
480   } // For each batch
481 
482   deallocTransactions();
483 
484   return NDBT_OK;
485 
486 }
487