1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "HugoQueries.hpp"
26 #include <NDBT_Stats.hpp>
27 #include <NdbSleep.h>
28 #include <NdbTick.h>
29 #include "../../src/ndbapi/NdbQueryOperation.hpp"
30
HugoQueries(const NdbQueryDef & query)31 HugoQueries::HugoQueries(const NdbQueryDef & query)
32 {
33 m_retryMax = 100;
34 m_query_def = &query;
35
36 for (Uint32 i = 0; i<query.getNoOfOperations(); i++)
37 {
38 struct Op op;
39 op.m_query_op = query.getQueryOperation(i);
40 op.m_calc = 0;
41 if (op.m_query_op->getTable())
42 {
43 op.m_calc = new HugoCalculator(* op.m_query_op->getTable());
44 }
45 m_ops.push_back(op);
46 }
47 }
48
~HugoQueries()49 HugoQueries::~HugoQueries()
50 {
51 for (unsigned o = 0; o<m_ops.size(); o++)
52 {
53 while (m_ops[o].m_rows.size())
54 {
55 delete m_ops[o].m_rows.back();
56 m_ops[o].m_rows.erase(m_ops[o].m_rows.size() - 1);
57 }
58 if (m_ops[o].m_calc)
59 delete m_ops[o].m_calc;
60 }
61 }
62
63 void
allocRows(int batch)64 HugoQueries::allocRows(int batch)
65 {
66 for (unsigned o = 0; o<m_ops.size(); o++)
67 {
68 const NdbQueryOperationDef * pOp =m_query_def->getQueryOperation((Uint32)o);
69 const NdbDictionary::Table* tab = pOp->getTable();
70
71 if (tab)
72 {
73 while (m_ops[o].m_rows.size() < (unsigned)batch)
74 {
75 m_ops[o].m_rows.push_back(new NDBT_ResultRow(* tab));
76 }
77 }
78 }
79 }
80
81 int
equalForParameters(char * buf,Op & op,NdbQueryParamValue params[],int rowNo)82 HugoQueries::equalForParameters(char * buf,
83 Op & op,
84 NdbQueryParamValue params[],
85 int rowNo)
86 {
87 Uint32 no = 0;
88 HugoCalculator & calc = * op.m_calc;
89 const NdbDictionary::Table & tab = calc.getTable();
90 if (op.m_query_op->getType() == NdbQueryOperationDef::TableScan)
91 {
92
93 }
94 else if (op.m_query_op->getType() == NdbQueryOperationDef::PrimaryKeyAccess)
95 {
96 for (int i = 0; i<tab.getNoOfColumns(); i++)
97 {
98 const NdbDictionary::Column* attr = tab.getColumn(i);
99 if (attr->getPrimaryKey())
100 {
101 Uint32 len = attr->getSizeInBytes();
102 Uint32 real_len;
103 bzero(buf, len);
104 calc.calcValue((Uint32)rowNo, i, 0, buf, len, &real_len);
105 params[no++]= NdbQueryParamValue((void*)buf);
106 buf += len;
107 }
108 }
109 }
110 else if (op.m_query_op->getType() == NdbQueryOperationDef::UniqueIndexAccess||
111 op.m_query_op->getType() == NdbQueryOperationDef::OrderedIndexScan)
112 {
113 const NdbDictionary::Index* idx = op.m_query_op->getIndex();
114 for (unsigned i = 0; i < idx->getNoOfColumns(); i++)
115 {
116 const NdbDictionary::Column* attr =
117 tab.getColumn(idx->getColumn(i)->getName());
118 Uint32 len = attr->getSizeInBytes();
119 Uint32 real_len;
120 bzero(buf, len);
121 calc.calcValue((Uint32)rowNo, attr->getColumnNo(),
122 0, buf, len, &real_len);
123 params[no++]= NdbQueryParamValue((void*)buf);
124 buf += len;
125 }
126 }
127 return 0;
128 }
129
130 int
getValueForQueryOp(NdbQueryOperation * pOp,NDBT_ResultRow * pRow)131 HugoQueries::getValueForQueryOp(NdbQueryOperation* pOp, NDBT_ResultRow * pRow)
132 {
133 const NdbDictionary::Table & tab = pRow->getTable();
134 for(int a = 0; a<tab.getNoOfColumns(); a++)
135 {
136 pRow->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName());
137 }
138
139 return 0;
140 }
141
142 int
runLookupQuery(Ndb * pNdb,int queries,int batch)143 HugoQueries::runLookupQuery(Ndb* pNdb,
144 int queries,
145 int batch)
146 {
147 int q = 0;
148 int retryAttempt = 0;
149
150 m_rows_found.clear();
151 Uint32 zero = 0;
152 m_rows_found.fill(m_query_def->getNoOfOperations() - 1, zero);
153
154 if (batch == 0) {
155 g_info << "ERROR: Argument batch == 0 in runLookupQuery. Not allowed."
156 << endl;
157 return NDBT_FAILED;
158 }
159
160 allocRows(batch);
161
162 while (q < queries)
163 {
164 if (q + batch > queries)
165 batch = queries - q;
166
167 if (retryAttempt >= m_retryMax)
168 {
169 g_info << "ERROR: has retried this operation " << retryAttempt
170 << " times, failing!" << endl;
171 return NDBT_FAILED;
172 }
173
174 Vector<Uint32> batch_rows_found;
175 batch_rows_found.fill(m_query_def->getNoOfOperations() - 1, zero);
176 Vector<NdbQuery*> queries;
177
178 NdbTransaction * pTrans = pNdb->startTransaction();
179 if (pTrans == NULL)
180 {
181 const NdbError err = pNdb->getNdbError();
182
183 if (err.status == NdbError::TemporaryError){
184 NDB_ERR(err);
185 NdbSleep_MilliSleep(50);
186 retryAttempt++;
187 continue;
188 }
189 NDB_ERR(err);
190 return NDBT_FAILED;
191 }
192
193 for (int b = 0; b<batch; b++)
194 {
195 char buf[NDB_MAX_TUPLE_SIZE];
196 NdbQueryParamValue params[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
197 equalForParameters(buf, m_ops[0], params, b + q);
198
199 NdbQuery * query = pTrans->createQuery(m_query_def, params);
200 if (query == 0)
201 {
202 const NdbError err = pTrans->getNdbError();
203 NDB_ERR(err);
204 return NDBT_FAILED;
205 }
206
207 for (unsigned o = 0; o<m_ops.size(); o++)
208 {
209 NdbQueryOperation * pOp = query->getQueryOperation((Uint32)o);
210 HugoQueries::getValueForQueryOp(pOp, m_ops[o].m_rows[b]);
211 }
212 queries.push_back(query);
213 }
214
215 int check = pTrans->execute(NoCommit, AbortOnError);
216 if (check == -1)
217 {
218 const NdbError err = pTrans->getNdbError();
219 NDB_ERR(err);
220 if (err.status == NdbError::TemporaryError){
221 pTrans->close();
222 NdbSleep_MilliSleep(50);
223 retryAttempt++;
224 continue;
225 }
226 pTrans->close();
227 return NDBT_FAILED;
228 }
229 #if 0
230 // Disabled, as this is incorrectly handled in SPJ API, will fix soon
231 else
232 {
233 /**
234 * If ::execute() didn't fail, there should not be an error on
235 * its NdbError object either:
236 */
237 const NdbError err = pTrans->getNdbError();
238 if (err.code)
239 {
240 NDB_ERR(err);
241 ndbout_c("API INCONSISTENCY: NdbTransaction returned NdbError even if ::execute() succeeded");
242 pTrans->close();
243 return NDBT_FAILED;
244 }
245 }
246 #endif
247
248 bool retry = false;
249 for (int b = 0; b<batch; b++)
250 {
251 NdbQuery * query = queries[b];
252
253 /**
254 * As NdbQuery is always 'dirty read' (impl. limitations), 'AbortOnError'
255 * is ignored and handled as 'IgnoreError'. We will therefore not get
256 * errors returned from ::execute() or set into 'pTrans->getNdbError()':
257 * Has to check for errors on the NdbQuery object instead:
258 */
259 const NdbError& err = query->getNdbError();
260 if (err.code)
261 {
262 NDB_ERR(err);
263 if (err.status == NdbError::TemporaryError){
264 pTrans->close();
265 retry = true;
266 break;
267 }
268 pTrans->close();
269 return NDBT_FAILED;
270 }
271
272 const NdbQuery::NextResultOutcome stat = query->nextResult();
273 if (stat == NdbQuery::NextResult_gotRow)
274 {
275 for (unsigned o = 0; o<m_ops.size(); o++)
276 {
277 NdbQueryOperation * pOp = query->getQueryOperation((Uint32)o);
278 if (!pOp->isRowNULL())
279 {
280 batch_rows_found[o]++;
281 if (m_ops[o].m_calc->verifyRowValues(m_ops[o].m_rows[b]) != 0)
282 {
283 pTrans->close();
284 return NDBT_FAILED;
285 }
286 }
287 }
288 }
289 else if (stat == NdbQuery::NextResult_error)
290 {
291 const NdbError& err = query->getNdbError();
292 NDB_ERR(err);
293 if (err.status == NdbError::TemporaryError){
294 pTrans->close();
295 retry = true;
296 break;
297 }
298 pTrans->close();
299 return NDBT_FAILED;
300 }
301 }
302 if (retry)
303 {
304 NdbSleep_MilliSleep(50);
305 retryAttempt++;
306 continue;
307 }
308
309 pTrans->close();
310 q += batch;
311
312 for (unsigned i = 0; i<batch_rows_found.size(); i++)
313 m_rows_found[i] += batch_rows_found[i];
314 }
315
316 return NDBT_OK;
317 }
318
319 int
runScanQuery(Ndb * pNdb,int abort,int parallelism,int scan_flags)320 HugoQueries::runScanQuery(Ndb * pNdb,
321 int abort,
322 int parallelism,
323 int scan_flags)
324 {
325 int retryAttempt = 0;
326
327 allocRows(1);
328
329 while (retryAttempt < m_retryMax)
330 {
331 m_rows_found.clear();
332 Uint32 zero = 0;
333 m_rows_found.fill(m_query_def->getNoOfOperations() - 1, zero);
334
335 NdbTransaction * pTrans = pNdb->startTransaction();
336 if (pTrans == NULL)
337 {
338 const NdbError err = pNdb->getNdbError();
339 NDB_ERR(err);
340 if (err.status == NdbError::TemporaryError){
341 NdbSleep_MilliSleep(50);
342 retryAttempt++;
343 continue;
344 }
345 return NDBT_FAILED;
346 }
347
348 NdbQuery * query = 0;
349
350 char buf[NDB_MAX_TUPLE_SIZE];
351 NdbQueryParamValue params[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
352 equalForParameters(buf, m_ops[0], params, /* rowNo */ 0);
353 query = pTrans->createQuery(m_query_def, params);
354 if (query == 0)
355 {
356 const NdbError err = pTrans->getNdbError();
357 NDB_ERR(err);
358 return NDBT_FAILED;
359 }
360
361 for (unsigned o = 0; o<m_ops.size(); o++)
362 {
363 NdbQueryOperation * pOp = query->getQueryOperation((Uint32)o);
364 HugoQueries::getValueForQueryOp(pOp, m_ops[o].m_rows[0]);
365 }
366
367 int check = pTrans->execute(NoCommit, AbortOnError);
368 if (check == -1)
369 {
370 const NdbError err = pTrans->getNdbError();
371 NDB_ERR(err);
372 if (err.status == NdbError::TemporaryError){
373 pTrans->close();
374 NdbSleep_MilliSleep(50);
375 retryAttempt++;
376 continue;
377 }
378 pTrans->close();
379 return NDBT_FAILED;
380 }
381 else
382 {
383 // Disabled, as this is incorrectly handled in SPJ API, will fix soon
384 #if 0
385 /**
386 * If ::execute() didn't fail, there should not be an error on
387 * its NdbError object either:
388 */
389 const NdbError err = pTrans->getNdbError();
390 if (err.code)
391 {
392 NDB_ERR(err);
393 ndbout_c("API INCONSISTENCY: NdbTransaction returned NdbError even if ::execute() succeeded");
394 pTrans->close();
395 return NDBT_FAILED;
396 }
397 #endif
398
399 /**
400 * As NdbQuery is always 'dirty read' (impl. limitations), 'AbortOnError'
401 * is ignored and handled as 'IgnoreError'. We will therefore not get
402 * errors returned from ::execute() or set into 'pTrans->getNdbError()':
403 * Has to check for errors on the NdbQuery object instead:
404 */
405 NdbError err = query->getNdbError();
406 if (err.code)
407 {
408 NDB_ERR(err);
409 if (err.status == NdbError::TemporaryError){
410 pTrans->close();
411 NdbSleep_MilliSleep(50);
412 retryAttempt++;
413 continue;
414 }
415 pTrans->close();
416 return NDBT_FAILED;
417 }
418 }
419
420 int r = rand() % 100;
421 if (r < abort && ((r & 1) == 0))
422 {
423 ndbout_c("Query aborted!");
424 query->close();
425 pTrans->close();
426 m_rows_found.clear();
427 return NDBT_OK;
428 }
429
430 NdbQuery::NextResultOutcome res;
431 while ((res = query->nextResult()) == NdbQuery::NextResult_gotRow)
432 {
433 if (r < abort && ((r & 1) == 1))
434 {
435 ndbout_c("Query aborted 2!");
436 query->close();
437 pTrans->close();
438 m_rows_found.clear();
439 return NDBT_OK;
440 }
441
442 for (unsigned o = 0; o<m_ops.size(); o++)
443 {
444 NdbQueryOperation * pOp = query->getQueryOperation((Uint32)o);
445 if (!pOp->isRowNULL())
446 {
447 m_rows_found[o]++;
448 if (m_ops[o].m_calc->verifyRowValues(m_ops[o].m_rows[0]) != 0)
449 {
450 pTrans->close();
451 return NDBT_FAILED;
452 }
453 }
454 }
455 }
456
457 const NdbError err = query->getNdbError();
458 query->close();
459 pTrans->close();
460 if (res == NdbQuery::NextResult_error)
461 {
462 NDB_ERR(err);
463 if (err.status == NdbError::TemporaryError)
464 {
465 NdbSleep_MilliSleep(50);
466 retryAttempt++;
467 continue;
468 }
469 return NDBT_FAILED;
470 }
471 else if (res != NdbQuery::NextResult_scanComplete)
472 {
473 ndbout_c("Got %u from nextResult()", res);
474 return NDBT_FAILED;
475 }
476 break;
477 }
478
479 return NDBT_OK;
480 }
481
482 template class Vector<HugoQueries::Op>;
483 template class Vector<NdbQuery*>;
484