1 /*
2 Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <NdbApi.hpp>
26
27 #include "NdbQueryBuilder.hpp"
28 #include "NdbQueryOperation.hpp"
29
30 #include <node.h>
31
32 #include "adapter_global.h"
33 #include "unified_debug.h"
34
35 #include "QueryOperation.h"
36 #include "TransactionImpl.h"
37
38 enum {
39 flag_row_is_null = 1,
40 flag_table_is_join_table = 2,
41 flag_row_is_duplicate = 8,
42 };
43
QueryOperation(int sz)44 QueryOperation::QueryOperation(int sz) :
45 size(sz),
46 buffers(new QueryBuffer[sz]),
47 operationTree(0),
48 definedQuery(0),
49 ndbQuery(0),
50 transaction(0),
51 results(0),
52 latest_error(0),
53 nresults(0),
54 nheaders(0),
55 nextHeaderAllocationSize(1024)
56 {
57 ndbQueryBuilder = NdbQueryBuilder::create();
58 DEBUG_PRINT("Size: %d", size);
59 }
60
~QueryOperation()61 QueryOperation::~QueryOperation() {
62 ndbQueryBuilder->destroy();
63 delete[] buffers;
64 free(results);
65 }
66
createRowBuffer(int level,Record * record,int parent_table)67 void QueryOperation::createRowBuffer(int level, Record *record, int parent_table) {
68 buffers[level].record = record;
69 buffers[level].buffer = new char[record->getBufferSize()];
70 buffers[level].size = record->getBufferSize();
71 buffers[level].parent = (short) parent_table;
72 }
73
levelIsJoinTable(int level)74 void QueryOperation::levelIsJoinTable(int level) {
75 DEBUG_PRINT("Level %d is join table", level);
76 buffers[level].static_flags |= flag_table_is_join_table;
77 }
78
prepare(const NdbQueryOperationDef * root)79 void QueryOperation::prepare(const NdbQueryOperationDef * root) {
80 DEBUG_MARKER(UDEB_DEBUG);
81 operationTree = root;
82 definedQuery = ndbQueryBuilder->prepare();
83 }
84
prepareAndExecute()85 int QueryOperation::prepareAndExecute() {
86 return transaction->prepareAndExecuteQuery(this);
87 }
88
89
90 /* Check whether this row and its parent are duplicates,
91 assuming parent has already been tested and flagged.
92 An optimization here would be to scan only the key fields.
93 */
isDuplicate(int level)94 bool QueryOperation::isDuplicate(int level) {
95 QueryBuffer & current = buffers[level];
96 QueryBuffer & parent = buffers[current.parent];
97 char * & result = current.buffer;
98 uint32_t & result_sz = current.size;
99 int lastResult = current.result; // most recent result for this table
100
101 /* If the parent is a known duplicate, and the current value matches the
102 immediate previous value for this table, then it is a duplicate.
103 */
104 if((level == 0 || parent.result_flags & flag_row_is_duplicate) &&
105 nresults && // this is not the first result for root
106 lastResult >= level && // and not the first result at this level
107 ! (memcmp(results[lastResult].data, result, result_sz)))
108 {
109 current.result_flags |= flag_row_is_duplicate;
110 return true;
111 }
112
113 return false;
114 }
115
116 /* takes sector number and two result header indexes
117 returns true if results are identical.
118 */
compareTwoResults(int level,int r1,int r2)119 bool QueryOperation::compareTwoResults(int level, int r1, int r2) {
120 if(r1 == r2) return true;
121 // DEBUG_PRINT_DETAIL("compareTwoResults for level %d: %d <=> %d", level, r2, r1);
122 assert(level == results[r1].sector);
123 assert(level == results[r2].sector);
124 return ! memcmp(results[r1].data, results[r2].data, buffers[level].size);
125 }
126
127 /* Takes number of leaf sector number and leaf result header indexes.
128 walks to root. returns true if results are identical at all nodes.
129 */
compareFullRows(int level,int r1,int r2)130 bool QueryOperation::compareFullRows(int level, int r1, int r2) {
131 bool didCompareRoot;
132 do {
133 if(! compareTwoResults(level, r1, r2)) return false;
134 didCompareRoot = (level == 0);
135 level = buffers[level].parent;
136 r1 = results[r1].parent;
137 r2 = results[r2].parent;
138 } while(! didCompareRoot);
139
140 return true;
141 }
142
143 /* Takes a leaf result header index and returns true if it matches any
144 previous row.
145 */
compareRowToAllPrevious()146 bool QueryOperation::compareRowToAllPrevious() {
147 int r2 = nresults - 1; // r2: the latest result
148 int r1 = results[r2].previous; // r1: the earlier result
149 int level = results[r2].sector; // sector
150 // DEBUG_PRINT_DETAIL("compareRowToAllPrevious %d %d %d", level, r2, r1);
151 while(r1 >= level) {
152 assert(r1 < r2);
153 if(compareFullRows(level, r1, r2)) {
154 return true;
155 }
156 r1 = results[r1].previous;
157 }
158 return false;
159 }
160
161
pushResultForTable(short level)162 bool QueryOperation::pushResultForTable(short level) {
163 QueryBuffer & current = buffers[level];
164 QueryBuffer & parent = buffers[current.parent];
165
166 if(level == 0) // reset flags for new root result
167 for(int i = 0 ; i < this->size ; i++)
168 buffers[i].result_flags = buffers[i].static_flags;
169
170 /* Push NULL result, or skip if parent was also NULL */
171 if(ndbQuery->getQueryOperation(level)->isRowNULL())
172 {
173 current.result_flags |= flag_row_is_null;
174 if(parent.result_flags & flag_row_is_null)
175 {
176 DEBUG_PRINT("table %d SKIP -- parent is null", level);
177 return true; /* skip */
178 }
179 return pushResultNull(level);
180 }
181
182 if(isDuplicate(level))
183 {
184 DEBUG_PRINT("table %d SKIP DUPLICATE", level);
185 return true; /* skip */
186 }
187
188 bool ok = pushResultValue(level);
189
190 /* Finally compare the entire row against all previous values,
191 unless it is the very first row.
192 */
193 if(ok && (int) nresults > size) {
194 if(compareRowToAllPrevious()) {
195 int r = nresults - 1;
196 DEBUG_PRINT("table %d PRUNE LAST RESULT", results[r].sector);
197 results[r].tag |= flag_row_is_duplicate;
198 free(results[r].data);
199 }
200 }
201 return ok;
202 }
203
newResultForTable(short level)204 bool QueryOperation::newResultForTable(short level) {
205 bool ok = true;
206 int n = nresults;
207
208 if(n == nheaders) {
209 ok = growHeaderArray();
210 }
211 if(ok) {
212 nresults++;
213 results[n].sector = level;
214 results[n].previous = buffers[level].result; // index of previous result
215 buffers[level].result = n; // index of this result
216 results[n].parent = buffers[buffers[level].parent].result; // current value
217 results[n].tag = buffers[level].result_flags;
218 }
219 return ok;
220 }
221
pushResultNull(short level)222 bool QueryOperation::pushResultNull(short level) {
223 int n = nresults;
224 bool ok = newResultForTable(level);
225 if(ok) {
226 DEBUG_PRINT("table %d NULL", level);
227 results[n].data = 0;
228 }
229 return ok;
230 }
231
pushResultValue(short level)232 bool QueryOperation::pushResultValue(short level) {
233 int n = nresults;
234 uint32_t & size = buffers[level].size;
235 char * & temp_result = buffers[level].buffer;
236 bool ok = newResultForTable(level);
237 if(ok) {
238 DEBUG_PRINT("table %d USE RESULT", level);
239
240 /* Allocate space for the new result */
241 results[n].data = (char *) malloc(size);
242 if(! results[n].data) return false;
243
244 /* Copy from the holding buffer to the new result */
245 memcpy(results[n].data, temp_result, size);
246 }
247 return ok;
248 }
249
getResult(int id)250 QueryResultHeader * QueryOperation::getResult(int id) {
251 // DEBUG_PRINT_DETAIL("R %d : TABLE %d TAG %d PARENT %d", id,
252 // results[id].sector, results[id].tag, results[id].parent);
253 return (id < nresults) ? & results[id] : 0;
254 }
255
more(int status)256 inline bool more(int status) { /* 0 or 2 */
257 return ((status == NdbQuery::NextResult_gotRow) ||
258 (status == NdbQuery::NextResult_bufferEmpty));
259 }
260
isError(int status)261 inline bool isError(int status) { /* -1 */
262 return (status == NdbQuery::NextResult_error);
263 }
264
265
266 /* Returns number of results, or an error code < 0
267 */
fetchAllResults()268 int QueryOperation::fetchAllResults() {
269 int status = NdbQuery::NextResult_bufferEmpty;
270
271 while(more(status)) {
272 status = ndbQuery->nextResult();
273 switch(status) {
274 case NdbQuery::NextResult_gotRow:
275 /* New results at every level */
276 DEBUG_PRINT_DETAIL("NextResult_gotRow");
277 for(short level = 0 ; level < size ; level++) {
278 if(! pushResultForTable(level)) return -1;
279 }
280 break;
281
282 case NdbQuery::NextResult_scanComplete:
283 DEBUG_PRINT_DETAIL("NextResult_scanComplete");
284 break;
285
286 default:
287 assert(status == NdbQuery::NextResult_error);
288 latest_error = & ndbQuery->getNdbError();
289 DEBUG_PRINT("%d %s", latest_error->code, latest_error->message);
290 return -1;
291 }
292 }
293 /* All done with the query now. */
294 ndbQuery->close();
295 ndbQuery = 0;
296
297 return nresults;
298 }
299
growHeaderArray()300 bool QueryOperation::growHeaderArray() {
301 DEBUG_PRINT("growHeaderArray %d => %d", nheaders, nextHeaderAllocationSize);
302 QueryResultHeader * old_results = results;
303
304 results = (QueryResultHeader *) calloc(nextHeaderAllocationSize, sizeof(QueryResultHeader));
305 if(results) {
306 memcpy(results, old_results, nheaders * sizeof(QueryResultHeader));
307 free(old_results);
308 nheaders = nextHeaderAllocationSize;
309 nextHeaderAllocationSize *= 2;
310 return true;
311 }
312 return false; // allocation failed
313 }
314
315 const NdbQueryOperationDef *
defineOperation(const NdbDictionary::Index * index,const NdbDictionary::Table * table,const NdbQueryOperand * const keys[])316 QueryOperation::defineOperation(const NdbDictionary::Index * index,
317 const NdbDictionary::Table * table,
318 const NdbQueryOperand* const keys[]) {
319 const NdbQueryOperationDef * rval = 0;
320 NdbQueryIndexBound * bound;
321
322 if(index) {
323 switch(index->getType()) {
324 case NdbDictionary::Index::UniqueHashIndex:
325 rval = ndbQueryBuilder->readTuple(index, table, keys);
326 DEBUG_PRINT("defineOperation using UniqueHashIndex %s", index->getName());
327 break;
328
329 case NdbDictionary::Index::OrderedIndex:
330 bound = new NdbQueryIndexBound(keys);
331 rval = ndbQueryBuilder->scanIndex(index, table, bound);
332 DEBUG_PRINT("defineOperation using OrderedIndex %s", index->getName());
333 break;
334 default:
335 DEBUG_PRINT("defineOperation ERROR: default case");
336 return 0;
337 }
338 }
339 else {
340 rval = ndbQueryBuilder->readTuple(table, keys);
341 DEBUG_PRINT("defineOperation using PrimaryKey");
342 }
343
344 if(rval == 0) {
345 latest_error = & ndbQueryBuilder->getNdbError();
346 DEBUG_PRINT("defineOperation: Error %d %s", latest_error->code, latest_error->message);
347 }
348 return rval;
349 }
350
createNdbQuery(NdbTransaction * tx)351 bool QueryOperation::createNdbQuery(NdbTransaction *tx) {
352 DEBUG_MARKER(UDEB_DEBUG);
353 ndbQuery = tx->createQuery(definedQuery);
354 if(! ndbQuery) {
355 DEBUG_PRINT("createQuery returned null");
356 return false;
357 }
358
359 for(int i = 0 ; i < size ; i++) {
360 NdbQueryOperation * qop = ndbQuery->getQueryOperation(i);
361 if(! qop) {
362 DEBUG_PRINT("No Query Operation at index %d", i);
363 return false;
364 }
365 assert(buffers[i].record);
366 qop->setResultRowBuf(buffers[i].record->getNdbRecord(), buffers[i].buffer);
367 }
368 return true;
369 }
370
setTransactionImpl(TransactionImpl * tx)371 void QueryOperation::setTransactionImpl(TransactionImpl *tx) {
372 transaction = tx;
373 }
374
close()375 void QueryOperation::close() {
376 DEBUG_ENTER();
377 definedQuery->destroy();
378 }
379
getNdbError()380 const NdbError & QueryOperation::getNdbError() {
381 return ndbQueryBuilder->getNdbError();
382 }
383