1 /*
2  Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License, version 2.0,
6  as published by the Free Software Foundation.
7 
8  This program is also distributed with certain software (including
9  but not limited to OpenSSL) that is licensed under separate terms,
10  as designated in a particular file or component or in included license
11  documentation.  The authors of MySQL hereby grant you an additional
12  permission to link the program and your derivative works with the
13  separately licensed software that they have included with MySQL.
14 
15  This program is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  GNU General Public License, version 2.0, for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with this program; if not, write to the Free Software
22  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <NdbApi.hpp>
26 
27 #include "NdbQueryBuilder.hpp"
28 #include "NdbQueryOperation.hpp"
29 
30 #include <node.h>
31 
32 #include "adapter_global.h"
33 #include "unified_debug.h"
34 
35 #include "QueryOperation.h"
36 #include "TransactionImpl.h"
37 
38 enum {
39   flag_row_is_null          = 1,
40   flag_table_is_join_table  = 2,
41   flag_row_is_duplicate     = 8,
42 };
43 
QueryOperation(int sz)44 QueryOperation::QueryOperation(int sz) :
45   size(sz),
46   buffers(new QueryBuffer[sz]),
47   operationTree(0),
48   definedQuery(0),
49   ndbQuery(0),
50   transaction(0),
51   results(0),
52   latest_error(0),
53   nresults(0),
54   nheaders(0),
55   nextHeaderAllocationSize(1024)
56 {
57   ndbQueryBuilder = NdbQueryBuilder::create();
58   DEBUG_PRINT("Size: %d", size);
59 }
60 
~QueryOperation()61 QueryOperation::~QueryOperation() {
62   ndbQueryBuilder->destroy();
63   delete[] buffers;
64   free(results);
65 }
66 
createRowBuffer(int level,Record * record,int parent_table)67 void QueryOperation::createRowBuffer(int level, Record *record, int parent_table) {
68   buffers[level].record = record;
69   buffers[level].buffer = new char[record->getBufferSize()];
70   buffers[level].size   = record->getBufferSize();
71   buffers[level].parent = (short) parent_table;
72 }
73 
levelIsJoinTable(int level)74 void QueryOperation::levelIsJoinTable(int level) {
75   DEBUG_PRINT("Level %d is join table", level);
76   buffers[level].static_flags |= flag_table_is_join_table;
77 }
78 
prepare(const NdbQueryOperationDef * root)79 void QueryOperation::prepare(const NdbQueryOperationDef * root) {
80   DEBUG_MARKER(UDEB_DEBUG);
81   operationTree = root;
82   definedQuery = ndbQueryBuilder->prepare();
83 }
84 
prepareAndExecute()85 int QueryOperation::prepareAndExecute() {
86   return transaction->prepareAndExecuteQuery(this);
87 }
88 
89 
90 /* Check whether this row and its parent are duplicates,
91    assuming parent has already been tested and flagged.
92    An optimization here would be to scan only the key fields.
93 */
isDuplicate(int level)94 bool QueryOperation::isDuplicate(int level) {
95   QueryBuffer & current = buffers[level];
96   QueryBuffer & parent  = buffers[current.parent];
97   char * & result       = current.buffer;
98   uint32_t & result_sz  = current.size;
99   int lastResult        = current.result;  // most recent result for this table
100 
101   /* If the parent is a known duplicate, and the current value matches the
102      immediate previous value for this table, then it is a duplicate.
103   */
104   if((level == 0 || parent.result_flags & flag_row_is_duplicate) &&
105       nresults &&                   // this is not the first result for root
106       lastResult >= level &&       // and not the first result at this level
107      ! (memcmp(results[lastResult].data, result, result_sz)))
108   {
109     current.result_flags |= flag_row_is_duplicate;
110     return true;
111   }
112 
113   return false;
114 }
115 
116 /* takes sector number and two result header indexes
117    returns true if results are identical.
118 */
compareTwoResults(int level,int r1,int r2)119 bool QueryOperation::compareTwoResults(int level, int r1, int r2) {
120   if(r1 == r2) return true;
121 //  DEBUG_PRINT_DETAIL("compareTwoResults for level %d: %d <=> %d", level, r2, r1);
122   assert(level == results[r1].sector);
123   assert(level == results[r2].sector);
124   return ! memcmp(results[r1].data, results[r2].data, buffers[level].size);
125 }
126 
127 /* Takes number of leaf sector number and leaf result header indexes.
128    walks to root.  returns true if results are identical at all nodes.
129 */
compareFullRows(int level,int r1,int r2)130 bool QueryOperation::compareFullRows(int level, int r1, int r2) {
131   bool didCompareRoot;
132   do {
133     if(! compareTwoResults(level, r1, r2)) return false;
134     didCompareRoot = (level == 0);
135     level = buffers[level].parent;
136     r1 = results[r1].parent;
137     r2 = results[r2].parent;
138   } while(! didCompareRoot);
139 
140   return true;
141 }
142 
143 /* Takes a leaf result header index and returns true if it matches any
144    previous row.
145 */
compareRowToAllPrevious()146 bool QueryOperation::compareRowToAllPrevious() {
147   int r2 = nresults - 1;           // r2: the latest result
148   int r1 = results[r2].previous;   // r1: the earlier result
149   int level = results[r2].sector;  // sector
150 //  DEBUG_PRINT_DETAIL("compareRowToAllPrevious %d %d %d", level, r2, r1);
151   while(r1 >= level) {
152     assert(r1 < r2);
153     if(compareFullRows(level, r1, r2)) {
154       return true;
155     }
156     r1 = results[r1].previous;
157   }
158   return false;
159 }
160 
161 
pushResultForTable(short level)162 bool QueryOperation::pushResultForTable(short level) {
163   QueryBuffer & current = buffers[level];
164   QueryBuffer & parent = buffers[current.parent];
165 
166   if(level == 0)                            // reset flags for new root result
167     for(int i = 0 ; i < this->size ; i++)
168       buffers[i].result_flags = buffers[i].static_flags;
169 
170   /* Push NULL result, or skip if parent was also NULL */
171   if(ndbQuery->getQueryOperation(level)->isRowNULL())
172   {
173     current.result_flags |= flag_row_is_null;
174     if(parent.result_flags & flag_row_is_null)
175     {
176       DEBUG_PRINT("table %d SKIP -- parent is null", level);
177       return true;   /* skip */
178     }
179     return pushResultNull(level);
180   }
181 
182   if(isDuplicate(level))
183   {
184     DEBUG_PRINT("table %d SKIP DUPLICATE", level);
185     return true;  /* skip */
186   }
187 
188   bool ok = pushResultValue(level);
189 
190   /* Finally compare the entire row against all previous values,
191      unless it is the very first row.
192   */
193   if(ok && (int) nresults > size) {
194     if(compareRowToAllPrevious()) {
195       int r = nresults - 1;
196       DEBUG_PRINT("table %d PRUNE LAST RESULT", results[r].sector);
197       results[r].tag |= flag_row_is_duplicate;
198       free(results[r].data);
199     }
200   }
201   return ok;
202 }
203 
newResultForTable(short level)204 bool QueryOperation::newResultForTable(short level) {
205   bool ok = true;
206   int n = nresults;
207 
208   if(n == nheaders) {
209     ok = growHeaderArray();
210   }
211   if(ok) {
212     nresults++;
213     results[n].sector = level;
214     results[n].previous = buffers[level].result;  // index of previous result
215     buffers[level].result = n;                    // index of this result
216     results[n].parent = buffers[buffers[level].parent].result;  // current value
217     results[n].tag = buffers[level].result_flags;
218   }
219   return ok;
220 }
221 
pushResultNull(short level)222 bool QueryOperation::pushResultNull(short level) {
223   int n = nresults;
224   bool ok = newResultForTable(level);
225   if(ok) {
226     DEBUG_PRINT("table %d NULL", level);
227     results[n].data = 0;
228   }
229   return ok;
230 }
231 
pushResultValue(short level)232 bool QueryOperation::pushResultValue(short level) {
233   int n = nresults;
234   uint32_t & size = buffers[level].size;
235   char * & temp_result = buffers[level].buffer;
236   bool ok = newResultForTable(level);
237   if(ok) {
238     DEBUG_PRINT("table %d USE RESULT", level);
239 
240     /* Allocate space for the new result */
241     results[n].data = (char *) malloc(size);
242     if(! results[n].data) return false;
243 
244     /* Copy from the holding buffer to the new result */
245     memcpy(results[n].data, temp_result, size);
246   }
247   return ok;
248 }
249 
getResult(int id)250 QueryResultHeader * QueryOperation::getResult(int id) {
251 //  DEBUG_PRINT_DETAIL("R %d : TABLE %d TAG %d PARENT %d", id,
252 //                     results[id].sector, results[id].tag, results[id].parent);
253   return (id < nresults) ?  & results[id] : 0;
254 }
255 
more(int status)256 inline bool more(int status) {  /* 0 or 2 */
257   return ((status == NdbQuery::NextResult_gotRow) ||
258           (status == NdbQuery::NextResult_bufferEmpty));
259 }
260 
isError(int status)261 inline bool isError(int status) { /* -1 */
262   return (status == NdbQuery::NextResult_error);
263 }
264 
265 
266 /* Returns number of results, or an error code < 0
267 */
fetchAllResults()268 int QueryOperation::fetchAllResults() {
269   int status = NdbQuery::NextResult_bufferEmpty;
270 
271   while(more(status)) {
272     status = ndbQuery->nextResult();
273     switch(status) {
274       case NdbQuery::NextResult_gotRow:
275         /* New results at every level */
276         DEBUG_PRINT_DETAIL("NextResult_gotRow");
277         for(short level = 0 ; level < size ; level++) {
278           if(! pushResultForTable(level)) return -1;
279         }
280         break;
281 
282       case NdbQuery::NextResult_scanComplete:
283         DEBUG_PRINT_DETAIL("NextResult_scanComplete");
284         break;
285 
286       default:
287         assert(status == NdbQuery::NextResult_error);
288         latest_error = & ndbQuery->getNdbError();
289         DEBUG_PRINT("%d %s", latest_error->code, latest_error->message);
290         return -1;
291     }
292   }
293   /* All done with the query now. */
294   ndbQuery->close();
295   ndbQuery = 0;
296 
297   return nresults;
298 }
299 
growHeaderArray()300 bool QueryOperation::growHeaderArray() {
301   DEBUG_PRINT("growHeaderArray %d => %d", nheaders, nextHeaderAllocationSize);
302   QueryResultHeader * old_results = results;
303 
304   results = (QueryResultHeader *) calloc(nextHeaderAllocationSize, sizeof(QueryResultHeader));
305   if(results) {
306     memcpy(results, old_results, nheaders * sizeof(QueryResultHeader));
307     free(old_results);
308     nheaders = nextHeaderAllocationSize;
309     nextHeaderAllocationSize *= 2;
310     return true;
311   }
312   return false; // allocation failed
313 }
314 
315 const NdbQueryOperationDef *
defineOperation(const NdbDictionary::Index * index,const NdbDictionary::Table * table,const NdbQueryOperand * const keys[])316   QueryOperation::defineOperation(const NdbDictionary::Index * index,
317                                   const NdbDictionary::Table * table,
318                                   const NdbQueryOperand* const keys[]) {
319   const NdbQueryOperationDef * rval = 0;
320   NdbQueryIndexBound * bound;
321 
322   if(index) {
323     switch(index->getType()) {
324       case NdbDictionary::Index::UniqueHashIndex:
325         rval = ndbQueryBuilder->readTuple(index, table, keys);
326         DEBUG_PRINT("defineOperation using UniqueHashIndex %s", index->getName());
327         break;
328 
329       case NdbDictionary::Index::OrderedIndex:
330         bound = new NdbQueryIndexBound(keys);
331         rval = ndbQueryBuilder->scanIndex(index, table, bound);
332         DEBUG_PRINT("defineOperation using OrderedIndex %s", index->getName());
333         break;
334       default:
335         DEBUG_PRINT("defineOperation ERROR: default case");
336         return 0;
337     }
338   }
339   else {
340     rval = ndbQueryBuilder->readTuple(table, keys);
341     DEBUG_PRINT("defineOperation using PrimaryKey");
342   }
343 
344   if(rval == 0) {
345     latest_error = & ndbQueryBuilder->getNdbError();
346     DEBUG_PRINT("defineOperation: Error %d %s", latest_error->code, latest_error->message);
347   }
348   return rval;
349 }
350 
createNdbQuery(NdbTransaction * tx)351 bool QueryOperation::createNdbQuery(NdbTransaction *tx) {
352   DEBUG_MARKER(UDEB_DEBUG);
353   ndbQuery = tx->createQuery(definedQuery);
354   if(! ndbQuery) {
355     DEBUG_PRINT("createQuery returned null");
356     return false;
357   }
358 
359   for(int i = 0 ; i < size ; i++) {
360     NdbQueryOperation * qop = ndbQuery->getQueryOperation(i);
361     if(! qop) {
362       DEBUG_PRINT("No Query Operation at index %d", i);
363       return false;
364     }
365     assert(buffers[i].record);
366     qop->setResultRowBuf(buffers[i].record->getNdbRecord(), buffers[i].buffer);
367   }
368   return true;
369 }
370 
setTransactionImpl(TransactionImpl * tx)371 void QueryOperation::setTransactionImpl(TransactionImpl *tx) {
372   transaction = tx;
373 }
374 
close()375 void QueryOperation::close() {
376   DEBUG_ENTER();
377   definedQuery->destroy();
378 }
379 
getNdbError()380 const NdbError & QueryOperation::getNdbError() {
381   return ndbQueryBuilder->getNdbError();
382 }
383