1 /*
2   Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "HugoQueries.hpp"
26 #include <NDBT_Stats.hpp>
27 #include <NdbSleep.h>
28 #include <NdbTick.h>
29 #include "../../src/ndbapi/NdbQueryOperation.hpp"
30 
HugoQueries(const NdbQueryDef & query)31 HugoQueries::HugoQueries(const NdbQueryDef & query)
32 {
33   m_retryMax = 100;
34   m_query_def = &query;
35 
36   for (Uint32 i = 0; i<query.getNoOfOperations(); i++)
37   {
38     struct Op op;
39     op.m_query_op = query.getQueryOperation(i);
40     op.m_calc = 0;
41     if (op.m_query_op->getTable())
42     {
43       op.m_calc = new HugoCalculator(* op.m_query_op->getTable());
44     }
45     m_ops.push_back(op);
46   }
47 }
48 
~HugoQueries()49 HugoQueries::~HugoQueries()
50 {
51   for (unsigned o = 0; o<m_ops.size(); o++)
52   {
53     while (m_ops[o].m_rows.size())
54     {
55       delete m_ops[o].m_rows.back();
56       m_ops[o].m_rows.erase(m_ops[o].m_rows.size() - 1);
57     }
58     if (m_ops[o].m_calc)
59       delete m_ops[o].m_calc;
60   }
61 }
62 
63 void
allocRows(int batch)64 HugoQueries::allocRows(int batch)
65 {
66   for (unsigned o = 0; o<m_ops.size(); o++)
67   {
68     const NdbQueryOperationDef * pOp =m_query_def->getQueryOperation((Uint32)o);
69     const NdbDictionary::Table* tab = pOp->getTable();
70 
71     if (tab)
72     {
73       while (m_ops[o].m_rows.size() < (unsigned)batch)
74       {
75         m_ops[o].m_rows.push_back(new NDBT_ResultRow(* tab));
76       }
77     }
78   }
79 }
80 
81 int
equalForParameters(char * buf,Op & op,NdbQueryParamValue params[],int rowNo)82 HugoQueries::equalForParameters(char * buf,
83                                 Op & op,
84                                 NdbQueryParamValue params[],
85                                 int rowNo)
86 {
87   Uint32 no = 0;
88   HugoCalculator & calc = * op.m_calc;
89   const NdbDictionary::Table & tab = calc.getTable();
90   if (op.m_query_op->getType() == NdbQueryOperationDef::TableScan)
91   {
92 
93   }
94   else if (op.m_query_op->getType() == NdbQueryOperationDef::PrimaryKeyAccess)
95   {
96     for (int i = 0; i<tab.getNoOfColumns(); i++)
97     {
98       const NdbDictionary::Column* attr = tab.getColumn(i);
99       if (attr->getPrimaryKey())
100       {
101         Uint32 len = attr->getSizeInBytes();
102         Uint32 real_len;
103         bzero(buf, len);
104         calc.calcValue((Uint32)rowNo, i, 0, buf, len, &real_len);
105         params[no++]= NdbQueryParamValue((void*)buf);
106         buf += len;
107       }
108     }
109   }
110   else if (op.m_query_op->getType() == NdbQueryOperationDef::UniqueIndexAccess||
111            op.m_query_op->getType() == NdbQueryOperationDef::OrderedIndexScan)
112   {
113     const NdbDictionary::Index* idx = op.m_query_op->getIndex();
114     for (unsigned i = 0; i < idx->getNoOfColumns(); i++)
115     {
116       const NdbDictionary::Column* attr =
117         tab.getColumn(idx->getColumn(i)->getName());
118       Uint32 len = attr->getSizeInBytes();
119       Uint32 real_len;
120       bzero(buf, len);
121       calc.calcValue((Uint32)rowNo, attr->getColumnNo(),
122                      0, buf, len, &real_len);
123       params[no++]= NdbQueryParamValue((void*)buf);
124       buf += len;
125     }
126   }
127   return 0;
128 }
129 
130 int
getValueForQueryOp(NdbQueryOperation * pOp,NDBT_ResultRow * pRow)131 HugoQueries::getValueForQueryOp(NdbQueryOperation* pOp, NDBT_ResultRow * pRow)
132 {
133   const NdbDictionary::Table & tab = pRow->getTable();
134   for(int a = 0; a<tab.getNoOfColumns(); a++)
135   {
136     pRow->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName());
137   }
138 
139   return 0;
140 }
141 
142 int
runLookupQuery(Ndb * pNdb,int queries,int batch)143 HugoQueries::runLookupQuery(Ndb* pNdb,
144                             int queries,
145                             int batch)
146 {
147   int q = 0;
148   int retryAttempt = 0;
149 
150   m_rows_found.clear();
151   Uint32 zero = 0;
152   m_rows_found.fill(m_query_def->getNoOfOperations() - 1, zero);
153 
154   if (batch == 0) {
155     g_info << "ERROR: Argument batch == 0 in runLookupQuery. Not allowed."
156            << endl;
157     return NDBT_FAILED;
158   }
159 
160   allocRows(batch);
161 
162   while (q < queries)
163   {
164     if (q + batch > queries)
165       batch = queries - q;
166 
167     if (retryAttempt >= m_retryMax)
168     {
169       g_info << "ERROR: has retried this operation " << retryAttempt
170              << " times, failing!" << endl;
171       return NDBT_FAILED;
172     }
173 
174     Vector<Uint32> batch_rows_found;
175     batch_rows_found.fill(m_query_def->getNoOfOperations() - 1, zero);
176     Vector<NdbQuery*> queries;
177 
178     NdbTransaction * pTrans = pNdb->startTransaction();
179     if (pTrans == NULL)
180     {
181       const NdbError err = pNdb->getNdbError();
182 
183       if (err.status == NdbError::TemporaryError){
184         NDB_ERR(err);
185         NdbSleep_MilliSleep(50);
186         retryAttempt++;
187         continue;
188       }
189       NDB_ERR(err);
190       return NDBT_FAILED;
191     }
192 
193     for (int b = 0; b<batch; b++)
194     {
195       char buf[NDB_MAX_TUPLE_SIZE];
196       NdbQueryParamValue params[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
197       equalForParameters(buf, m_ops[0], params, b + q);
198 
199       NdbQuery * query = pTrans->createQuery(m_query_def, params);
200       if (query == 0)
201       {
202         const NdbError err = pTrans->getNdbError();
203         NDB_ERR(err);
204         return NDBT_FAILED;
205       }
206 
207       for (unsigned o = 0; o<m_ops.size(); o++)
208       {
209         NdbQueryOperation * pOp = query->getQueryOperation((Uint32)o);
210         HugoQueries::getValueForQueryOp(pOp, m_ops[o].m_rows[b]);
211       }
212       queries.push_back(query);
213     }
214 
215     int check = pTrans->execute(NoCommit, AbortOnError);
216     if (check == -1)
217     {
218       const NdbError err = pTrans->getNdbError();
219       NDB_ERR(err);
220       if (err.status == NdbError::TemporaryError){
221         pTrans->close();
222         NdbSleep_MilliSleep(50);
223         retryAttempt++;
224         continue;
225       }
226       pTrans->close();
227       return NDBT_FAILED;
228     }
229 #if 0
230     // Disabled, as this is incorrectly handled in SPJ API, will fix soon
231     else
232     {
233       /**
234        * If ::execute() didn't fail, there should not be an error on
235        * its NdbError object either:
236        */
237       const NdbError err = pTrans->getNdbError();
238       if (err.code)
239       {
240         NDB_ERR(err);
241         ndbout_c("API INCONSISTENCY: NdbTransaction returned NdbError even if ::execute() succeeded");
242         pTrans->close();
243         return NDBT_FAILED;
244       }
245     }
246 #endif
247 
248     bool retry = false;
249     for (int b = 0; b<batch; b++)
250     {
251       NdbQuery * query = queries[b];
252 
253       /**
254        * As NdbQuery is always 'dirty read' (impl. limitations), 'AbortOnError'
255        * is ignored and handled as 'IgnoreError'. We will therefore not get
256        * errors returned from ::execute() or set into 'pTrans->getNdbError()':
257        * Has to check for errors on the NdbQuery object instead:
258        */
259       const NdbError& err = query->getNdbError();
260       if (err.code)
261       {
262         NDB_ERR(err);
263         if (err.status == NdbError::TemporaryError){
264           pTrans->close();
265           retry = true;
266           break;
267         }
268         pTrans->close();
269         return NDBT_FAILED;
270       }
271 
272       const NdbQuery::NextResultOutcome stat = query->nextResult();
273       if (stat == NdbQuery::NextResult_gotRow)
274       {
275         for (unsigned o = 0; o<m_ops.size(); o++)
276         {
277           NdbQueryOperation * pOp = query->getQueryOperation((Uint32)o);
278           if (!pOp->isRowNULL())
279           {
280             batch_rows_found[o]++;
281             if (m_ops[o].m_calc->verifyRowValues(m_ops[o].m_rows[b]) != 0)
282             {
283               pTrans->close();
284               return NDBT_FAILED;
285             }
286           }
287         }
288       }
289       else if (stat == NdbQuery::NextResult_error)
290       {
291         const NdbError& err = query->getNdbError();
292         NDB_ERR(err);
293         if (err.status == NdbError::TemporaryError){
294           pTrans->close();
295           retry = true;
296           break;
297         }
298         pTrans->close();
299         return NDBT_FAILED;
300       }
301     }
302     if (retry)
303     {
304       NdbSleep_MilliSleep(50);
305       retryAttempt++;
306       continue;
307     }
308 
309     pTrans->close();
310     q += batch;
311 
312     for (unsigned i = 0; i<batch_rows_found.size(); i++)
313       m_rows_found[i] += batch_rows_found[i];
314   }
315 
316   return NDBT_OK;
317 }
318 
319 int
runScanQuery(Ndb * pNdb,int abort,int parallelism,int scan_flags)320 HugoQueries::runScanQuery(Ndb * pNdb,
321                           int abort,
322                           int parallelism,
323                           int scan_flags)
324 {
325   int retryAttempt = 0;
326 
327   allocRows(1);
328 
329   while (retryAttempt < m_retryMax)
330   {
331     m_rows_found.clear();
332     Uint32 zero = 0;
333     m_rows_found.fill(m_query_def->getNoOfOperations() - 1, zero);
334 
335     NdbTransaction * pTrans = pNdb->startTransaction();
336     if (pTrans == NULL)
337     {
338       const NdbError err = pNdb->getNdbError();
339       NDB_ERR(err);
340       if (err.status == NdbError::TemporaryError){
341         NdbSleep_MilliSleep(50);
342         retryAttempt++;
343         continue;
344       }
345       return NDBT_FAILED;
346     }
347 
348     NdbQuery * query = 0;
349 
350     char buf[NDB_MAX_TUPLE_SIZE];
351     NdbQueryParamValue params[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
352     equalForParameters(buf, m_ops[0], params, /* rowNo */ 0);
353     query = pTrans->createQuery(m_query_def, params);
354     if (query == 0)
355     {
356       const NdbError err = pTrans->getNdbError();
357       NDB_ERR(err);
358       return NDBT_FAILED;
359     }
360 
361     for (unsigned o = 0; o<m_ops.size(); o++)
362     {
363       NdbQueryOperation * pOp = query->getQueryOperation((Uint32)o);
364       HugoQueries::getValueForQueryOp(pOp, m_ops[o].m_rows[0]);
365     }
366 
367     int check = pTrans->execute(NoCommit, AbortOnError);
368     if (check == -1)
369     {
370       const NdbError err = pTrans->getNdbError();
371       NDB_ERR(err);
372       if (err.status == NdbError::TemporaryError){
373         pTrans->close();
374         NdbSleep_MilliSleep(50);
375         retryAttempt++;
376         continue;
377       }
378       pTrans->close();
379       return NDBT_FAILED;
380     }
381     else
382     {
383       // Disabled, as this is incorrectly handled in SPJ API, will fix soon
384 #if 0
385       /**
386        * If ::execute() didn't fail, there should not be an error on
387        * its NdbError object either:
388        */
389       const NdbError err = pTrans->getNdbError();
390       if (err.code)
391       {
392         NDB_ERR(err);
393         ndbout_c("API INCONSISTENCY: NdbTransaction returned NdbError even if ::execute() succeeded");
394         pTrans->close();
395         return NDBT_FAILED;
396       }
397 #endif
398 
399       /**
400        * As NdbQuery is always 'dirty read' (impl. limitations), 'AbortOnError'
401        * is ignored and handled as 'IgnoreError'. We will therefore not get
402        * errors returned from ::execute() or set into 'pTrans->getNdbError()':
403        * Has to check for errors on the NdbQuery object instead:
404        */
405       NdbError err = query->getNdbError();
406       if (err.code)
407       {
408         NDB_ERR(err);
409         if (err.status == NdbError::TemporaryError){
410           pTrans->close();
411           NdbSleep_MilliSleep(50);
412           retryAttempt++;
413           continue;
414         }
415         pTrans->close();
416         return NDBT_FAILED;
417       }
418     }
419 
420     int r = rand() % 100;
421     if (r < abort && ((r & 1) == 0))
422     {
423       ndbout_c("Query aborted!");
424       query->close();
425       pTrans->close();
426       m_rows_found.clear();
427       return NDBT_OK;
428     }
429 
430     NdbQuery::NextResultOutcome res;
431     while ((res = query->nextResult()) == NdbQuery::NextResult_gotRow)
432     {
433       if (r < abort && ((r & 1) == 1))
434       {
435         ndbout_c("Query aborted 2!");
436         query->close();
437         pTrans->close();
438         m_rows_found.clear();
439       return NDBT_OK;
440       }
441 
442       for (unsigned o = 0; o<m_ops.size(); o++)
443       {
444         NdbQueryOperation * pOp = query->getQueryOperation((Uint32)o);
445         if (!pOp->isRowNULL())
446         {
447           m_rows_found[o]++;
448           if (m_ops[o].m_calc->verifyRowValues(m_ops[o].m_rows[0]) != 0)
449           {
450             pTrans->close();
451             return NDBT_FAILED;
452           }
453         }
454       }
455     }
456 
457     const NdbError err = query->getNdbError();
458     query->close();
459     pTrans->close();
460     if (res == NdbQuery::NextResult_error)
461     {
462       NDB_ERR(err);
463       if (err.status == NdbError::TemporaryError)
464       {
465         NdbSleep_MilliSleep(50);
466         retryAttempt++;
467         continue;
468       }
469       return NDBT_FAILED;
470     }
471     else if (res != NdbQuery::NextResult_scanComplete)
472     {
473       ndbout_c("Got %u from nextResult()", res);
474       return NDBT_FAILED;
475     }
476     break;
477   }
478 
479   return NDBT_OK;
480 }
481 
482 template class Vector<HugoQueries::Op>;
483 template class Vector<NdbQuery*>;
484