1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <ndb_global.h>
27 #include <NdbDictionary.hpp>
28 #include <NdbIndexScanOperation.hpp>
29 #include "NdbQueryBuilder.hpp"
30 #include "NdbQueryOperation.hpp"
31 #include "API.hpp"
32 #include "NdbQueryBuilderImpl.hpp"
33 #include "NdbQueryOperationImpl.hpp"
34 #include "NdbInterpretedCode.hpp"
35
36 #include <signaldata/TcKeyReq.hpp>
37 #include <signaldata/TcKeyRef.hpp>
38 #include <signaldata/ScanTab.hpp>
39 #include <signaldata/QueryTree.hpp>
40 #include <signaldata/DbspjErr.hpp>
41
42 #include "AttributeHeader.hpp"
43
44 #include <Bitmask.hpp>
45
46 #if 0
47 #define DEBUG_CRASH() assert(false)
48 #else
49 #define DEBUG_CRASH()
50 #endif
51
52 /** To prevent compiler warnings about variables that are only used in asserts
53 * (when building optimized version).
54 */
55 #define UNUSED(x) ((void)(x))
56
57 // To force usage of SCAN_NEXTREQ even for small scans resultsets:
58 // - '0' is default (production) value
59 // - '4' is normally a good value for testing batch related problems
60 static const int enforcedBatchSize = 0;
61
62 // Use double buffered ResultSets, may later change
63 // to be more adaptive based on query type
64 static const bool useDoubleBuffers = true;
65
66 /* Various error codes that are not specific to NdbQuery. */
67 static const int Err_TupleNotFound = 626;
68 static const int Err_FalsePredicate = 899;
69 static const int Err_MemoryAlloc = 4000;
70 static const int Err_SendFailed = 4002;
71 static const int Err_FunctionNotImplemented = 4003;
72 static const int Err_UnknownColumn = 4004;
73 static const int Err_ReceiveTimedOut = 4008;
74 static const int Err_NodeFailCausedAbort = 4028;
75 static const int Err_ParameterError = 4118;
76 static const int Err_SimpleDirtyReadFailed = 4119;
77 static const int Err_WrongFieldLength = 4209;
78 static const int Err_ReadTooMuch = 4257;
79 static const int Err_InvalidRangeNo = 4286;
80 static const int Err_DifferentTabForKeyRecAndAttrRec = 4287;
81 static const int Err_KeyIsNULL = 4316;
82 static const int Err_FinaliseNotCalled = 4519;
83 static const int Err_InterpretedCodeWrongTab = 4524;
84
85 /* A 'void' index for a tuple in internal parent / child correlation structs .*/
86 static const Uint16 tupleNotFound = 0xffff;
87
88 /** Set to true to trace incomming signals.*/
89 static const bool traceSignals = false;
90
91 enum
92 {
93 /**
94 * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
95 * scan parallelism should be adaptive.
96 */
97 Parallelism_adaptive = 0xffff0000,
98
99 /**
100 * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
101 * all fragments should be scanned in parallel.
102 */
103 Parallelism_max = 0xffff0001
104 };
105
106 /**
107 * A class for accessing the correlation data at the end of a tuple (for
108 * scan queries). These data have the following layout:
109 *
110 * Word 0: AttributeHeader
111 * Word 1, upper halfword: tuple id of parent tuple.
112 * Word 1, lower halfword: tuple id of this tuple.
113 * Word 2: Id of receiver for root operation (where the ancestor tuple of this
114 * tuple will go).
115 *
116 * Both tuple identifiers are unique within this batch and root fragment.
117 * With these identifiers, it is possible to relate a tuple to its parent and
118 * children. That way, results for child operations can be updated correctly
119 * when the application iterates over the results of the root scan operation.
120 */
121 class TupleCorrelation
122 {
123 public:
124 static const Uint32 wordCount = 1;
125
TupleCorrelation()126 explicit TupleCorrelation()
127 : m_correlation((tupleNotFound<<16) | tupleNotFound)
128 {}
129
130 /** Conversion to/from Uint32 to store/fetch from buffers */
TupleCorrelation(Uint32 val)131 explicit TupleCorrelation(Uint32 val)
132 : m_correlation(val)
133 {}
134
toUint32() const135 Uint32 toUint32() const
136 { return m_correlation; }
137
getTupleId() const138 Uint16 getTupleId() const
139 { return m_correlation & 0xffff;}
140
getParentTupleId() const141 Uint16 getParentTupleId() const
142 { return m_correlation >> 16;}
143
144 private:
145 Uint32 m_correlation;
146 }; // class TupleCorrelation
147
148 class CorrelationData
149 {
150 public:
151 static const Uint32 wordCount = 3;
152
CorrelationData(const Uint32 * tupleData,Uint32 tupleLength)153 explicit CorrelationData(const Uint32* tupleData, Uint32 tupleLength):
154 m_corrPart(tupleData + tupleLength - wordCount)
155 {
156 assert(tupleLength >= wordCount);
157 assert(AttributeHeader(m_corrPart[0]).getAttributeId()
158 == AttributeHeader::CORR_FACTOR64);
159 assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
160 assert(getTupleCorrelation().getTupleId()<tupleNotFound);
161 assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
162 }
163
getRootReceiverId() const164 Uint32 getRootReceiverId() const
165 { return m_corrPart[2];}
166
getTupleCorrelation() const167 const TupleCorrelation getTupleCorrelation() const
168 { return TupleCorrelation(m_corrPart[1]); }
169
170 private:
171 const Uint32* const m_corrPart;
172 }; // class CorrelationData
173
174 /**
175 * If a query has a scan operation as its root, then that scan will normally
176 * read from several fragments of its target table. Each such root fragment
177 * scan, along with any child lookup operations that are spawned from it,
178 * runs independently, in the sense that:
179 * - The API will know when it has received all data from a fragment for a
180 * given batch and all child operations spawned from it.
181 * - When one fragment is complete (for a batch) the API will make these data
182 * avaliable to the application, even if other fragments are not yet complete.
183 * - The tuple identifiers that are used for matching children with parents are
184 * only guaranteed to be unique within one batch, operation, and root
185 * operation fragment. Tuples derived from different root fragments must
186 * thus be kept apart.
187 *
188 * This class manages the state of one such read operation, from one particular
189 * fragment of the target table of the root operation. If the root operation
190 * is a lookup, then there will be only one instance of this class.
191 */
192 class NdbRootFragment {
193 public:
194 /** Build hash map for mapping from root receiver id to NdbRootFragment
195 * instance.*/
196 static void buildReciverIdMap(NdbRootFragment* frags,
197 Uint32 noOfFrags);
198
199 /** Find NdbRootFragment instance corresponding to a given root receiver id.*/
200 static NdbRootFragment* receiverIdLookup(NdbRootFragment* frags,
201 Uint32 noOfFrags,
202 Uint32 receiverId);
203
204 explicit NdbRootFragment();
205
206 ~NdbRootFragment();
207
208 /**
209 * Initialize object.
210 * @param query Enclosing query.
211 * @param fragNo This object manages state for reading from the fragNo'th
212 * fragment that the root operation accesses.
213 */
214 void init(NdbQueryImpl& query, Uint32 fragNo);
215
216 static void clear(NdbRootFragment* frags, Uint32 noOfFrags);
217
getFragNo() const218 Uint32 getFragNo() const
219 { return m_fragNo; }
220
221 /**
222 * Prepare for receiving another batch of results.
223 */
224 void prepareNextReceiveSet();
225
226 bool hasRequestedMore() const;
227
228 /**
229 * Prepare for reading another batch of results.
230 */
231 void grabNextResultSet(); // Need mutex lock
232
233 bool hasReceivedMore() const; // Need mutex lock
234
235 void setReceivedMore(); // Need mutex lock
236
incrOutstandingResults(Int32 delta)237 void incrOutstandingResults(Int32 delta)
238 {
239 if (traceSignals) {
240 ndbout << "incrOutstandingResults: " << m_outstandingResults
241 << ", with: " << delta
242 << endl;
243 }
244 m_outstandingResults += delta;
245 assert(!(m_confReceived && m_outstandingResults<0));
246 }
247
throwRemainingResults()248 void throwRemainingResults()
249 {
250 if (traceSignals) {
251 ndbout << "throwRemainingResults: " << m_outstandingResults
252 << endl;
253 }
254 m_outstandingResults = 0;
255 m_confReceived = true;
256 postFetchRelease();
257 }
258
259 void setConfReceived(Uint32 tcPtrI);
260
261 /**
262 * The root operation will read from a number of fragments of a table.
263 * This method checks if all results for the current batch has been
264 * received for a given fragment. This includes both results for the root
265 * operation and any child operations. Note that child operations may access
266 * other fragments; the fragment number only refers to what
267 * the root operation does.
268 *
269 * @return True if current batch is complete for this fragment.
270 */
isFragBatchComplete() const271 bool isFragBatchComplete() const
272 {
273 assert(m_fragNo!=voidFragNo);
274 return m_confReceived && m_outstandingResults==0;
275 }
276
277 /**
278 * Get the result stream that handles results derived from this root
279 * fragment for a particular operation.
280 * @param operationNo The id of the operation.
281 * @return The result stream for this root fragment.
282 */
283 NdbResultStream& getResultStream(Uint32 operationNo) const;
284
getResultStream(const NdbQueryOperationImpl & op) const285 NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const
286 { return getResultStream(op.getQueryOperationDef().getOpNo()); }
287
288 Uint32 getReceiverId() const;
289 Uint32 getReceiverTcPtrI() const;
290
291 /**
292 * @return True if there are no more batches to be received for this fragment.
293 */
294 bool finalBatchReceived() const;
295
296 /**
297 * @return True if there are no more results from this root fragment (for
298 * the current batch).
299 */
300 bool isEmpty() const;
301
302 /**
303 * This method is used for marking which streams belonging to this
304 * NdbRootFragment which has remaining batches for a sub scan
305 * instantiated from the current batch of its parent operation.
306 */
setRemainingSubScans(Uint32 nodeMask)307 void setRemainingSubScans(Uint32 nodeMask)
308 {
309 m_remainingScans = nodeMask;
310 }
311
312 /** Release resources after last row has been returned */
313 void postFetchRelease();
314
315 private:
316 /** No copying.*/
317 NdbRootFragment(const NdbRootFragment&);
318 NdbRootFragment& operator=(const NdbRootFragment&);
319
320 STATIC_CONST( voidFragNo = 0xffffffff);
321
322 /** Enclosing query.*/
323 NdbQueryImpl* m_query;
324
325 /** Number of the root operation fragment.*/
326 Uint32 m_fragNo;
327
328 /** For processing results originating from this root fragment (Array of).*/
329 NdbResultStream* m_resultStreams;
330
331 /**
332 * Number of requested (pre-)fetches which has either not completed
333 * from datanodes yet, or which are completed, but not consumed.
334 * (Which implies they are also counted in m_availResultSets)
335 */
336 Uint32 m_pendingRequests;
337
338 /**
339 * Number of available 'm_pendingRequests' ( <= m_pendingRequests)
340 * which has been completely received. Will be made available
341 * for reading by calling ::grabNextResultSet()
342 */
343 Uint32 m_availResultSets; // Need mutex
344
345 /**
346 * The number of outstanding TCKEYREF or TRANSID_AI messages to receive
347 * for the fragment. This includes both messages related to the
348 * root operation and any descendant operation that was instantiated as
349 * a consequence of tuples found by the root operation.
350 * This number may temporarily be negative if e.g. TRANSID_AI arrives
351 * before SCAN_TABCONF.
352 */
353 Int32 m_outstandingResults;
354
355 /**
356 * This is an array with one element for each fragment that the root
357 * operation accesses (i.e. one for a lookup, all for a table scan).
358 *
359 * Each element is true iff a SCAN_TABCONF (for that fragment) or
360 * TCKEYCONF message has been received
361 */
362 bool m_confReceived;
363
364 /**
365 * A bitmask of operation id's for which we will receive more
366 * ResultSets in a NEXTREQ.
367 */
368 Uint32 m_remainingScans;
369
370 /**
371 * Used for implementing a hash map from root receiver ids to a
372 * NdbRootFragment instance. m_idMapHead is the index of the first
373 * NdbRootFragment in the m_fragNo'th hash bucket.
374 */
375 int m_idMapHead;
376
377 /**
378 * Used for implementing a hash map from root receiver ids to a
379 * NdbRootFragment instance. m_idMapNext is the index of the next
380 * NdbRootFragment in the same hash bucket as this one.
381 */
382 int m_idMapNext;
383 }; //NdbRootFragment
384
385 /**
386 * 'class NdbResultSet' is a helper for 'class NdbResultStream'.
387 * It manages the buffers which rows are received into and
388 * read from.
389 */
390 class NdbResultSet
391 {
392 friend class NdbResultStream;
393
394 public:
395 explicit NdbResultSet();
396
397 void init(NdbQueryImpl& query,
398 Uint32 maxRows, Uint32 bufferSize);
399
prepareReceive(NdbReceiver & receiver)400 void prepareReceive(NdbReceiver& receiver)
401 {
402 m_rowCount = 0;
403 receiver.prepareReceive(m_buffer);
404 }
405
getRowCount() const406 Uint32 getRowCount() const
407 { return m_rowCount; }
408
409 private:
410 /** No copying.*/
411 NdbResultSet(const NdbResultSet&);
412 NdbResultSet& operator=(const NdbResultSet&);
413
414 /** The buffers which we receive the results into */
415 NdbReceiverBuffer* m_buffer;
416
417 /** Array of TupleCorrelations for all rows in m_buffer */
418 TupleCorrelation* m_correlations;
419
420 /** The current #rows in 'm_buffer'.*/
421 Uint32 m_rowCount;
422
423 }; // class NdbResultSet
424
425 /**
426 * This class manages the subset of result data for one operation that is
427 * derived from one fragment of the root operation. Note that the result tuples
428 * may come from any fragment, but they all have initial ancestors from the
429 * same fragment of the root operation.
430 * For each operation there will thus be one NdbResultStream for each fragment
431 * that the root operation reads from (one in the case of lookups.)
432 * This class has an NdbReceiver object for processing tuples as well as
433 * structures for correlating child and parent tuples.
434 */
435 class NdbResultStream {
436 public:
437
438 /**
439 * @param operation The operation for which we will receive results.
440 * @param rootFragNo 0..n-1 when the root operation reads from n fragments.
441 */
442 explicit NdbResultStream(NdbQueryOperationImpl& operation,
443 NdbRootFragment& rootFrag);
444
445 ~NdbResultStream();
446
447 /**
448 * Prepare for receiving first results.
449 */
450 void prepare();
451
452 /** Prepare for receiving next batch of scan results. */
453 void prepareNextReceiveSet();
454
getReceiver()455 NdbReceiver& getReceiver()
456 { return m_receiver; }
457
getReceiver() const458 const NdbReceiver& getReceiver() const
459 { return m_receiver; }
460
getCurrentRow()461 const char* getCurrentRow()
462 { return m_receiver.getCurrentRow(); }
463
464 /**
465 * Process an incomming tuple for this stream. Extract parent and own tuple
466 * ids and pass it on to m_receiver.
467 *
468 * @param ptr buffer holding tuple.
469 * @param len buffer length.
470 */
471 void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
472 TupleCorrelation correlation);
473
474 /**
475 * A complete batch has been received for a fragment on this NdbResultStream,
476 * Update whatever required before the appl. are allowed to navigate the result.
477 * @return true if node and all its siblings have returned all rows.
478 */
479 bool prepareResultSet(Uint32 remainingScans);
480
481 /**
482 * Navigate within the current ResultSet to resp. first and next row.
483 * For non-parent operations in the pushed query, navigation is with respect
484 * to any preceding parents which results in this ResultSet depends on.
485 * Returns either the tupleNo within TupleSet[] which we navigated to, or
486 * tupleNotFound().
487 */
488 Uint16 firstResult();
489 Uint16 nextResult();
490
491 /**
492 * Returns true if last row matching the current parent tuple has been
493 * consumed.
494 */
isEmpty() const495 bool isEmpty() const
496 { return m_iterState == Iter_finished; }
497
498 /**
499 * This method
500 * returns true if this result stream holds the last batch of a sub scan.
501 * This means that it is the last batch of the scan that was instantiated
502 * from the current batch of its parent operation.
503 */
isSubScanComplete(Uint32 remainingScans) const504 bool isSubScanComplete(Uint32 remainingScans) const
505 {
506 /**
507 * Find the node number seen by the SPJ block. Since a unique index
508 * operation will have two distincts nodes in the tree used by the
509 * SPJ block, this number may be different from 'opNo'.
510 */
511 const Uint32 internalOpNo = m_operation.getInternalOpNo();
512
513 const bool complete = !((remainingScans >> internalOpNo) & 1);
514 return complete;
515 }
516
isScanQuery() const517 bool isScanQuery() const
518 { return (m_properties & Is_Scan_Query); }
519
isScanResult() const520 bool isScanResult() const
521 { return (m_properties & Is_Scan_Result); }
522
isInnerJoin() const523 bool isInnerJoin() const
524 { return (m_properties & Is_Inner_Join); }
525
526 /** For debugging.*/
527 friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
528
529 /**
530 * TupleSet contain two logically distinct set of information:
531 *
532 * - Child/Parent correlation set required to correlate
533 * child tuples with its parents. Child/Tuple pairs are indexed
534 * by tuple number which is the same as the order in which tuples
535 * appear in the NdbReceiver buffers.
536 *
537 * - A HashMap on 'm_parentId' used to locate tuples correlated
538 * to a parent tuple. Indexes by hashing the parentId such that:
539 * - [hash(parentId)].m_hash_head will then index the first
540 * TupleSet entry potential containing the parentId to locate.
541 * - .m_hash_next in the indexed TupleSet may index the next TupleSet
542 * to considder.
543 *
544 * Both the child/parent correlation set and the parentId HashMap has been
545 * folded into the same structure on order to reduce number of objects
546 * being dynamically allocated.
547 * As an advantage this results in an autoscaling of the hash bucket size .
548 *
549 * Structure is only present if 'isScanQuery'
550 */
551 class TupleSet {
552 public:
553 // Tuple ids are unique within this batch and stream
554 Uint16 m_parentId; // Id of parent tuple which this tuple is correlated with
555 Uint16 m_tupleId; // Id of this tuple
556
557 Uint16 m_hash_head; // Index of first item in TupleSet[] matching a hashed parentId.
558 Uint16 m_hash_next; // 'next' index matching
559
560 bool m_skip; // Skip this tuple in result processing for now
561
562 /** If the n'th bit is set, then a matching tuple for the n,th child has been seen.
563 * This information is needed when generating left join tuples for those tuples
564 * that had no matching children.*/
565 Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
566
TupleSet()567 explicit TupleSet() : m_hash_head(tupleNotFound)
568 {}
569
570 private:
571 /** No copying.*/
572 TupleSet(const TupleSet&);
573 TupleSet& operator=(const TupleSet&);
574 };
575
576 private:
577 /**
578 * This stream handles results derived from specified
579 * m_rootFrag of the root operation.
580 */
581 const NdbRootFragment& m_rootFrag;
582
583 /** Operation to which this resultStream belong.*/
584 NdbQueryOperationImpl& m_operation;
585
586 /** ResultStream for my parent operation, or NULL if I am root */
587 NdbResultStream* const m_parent;
588
589 const enum properties
590 {
591 Is_Scan_Query = 0x01,
592 Is_Scan_Result = 0x02,
593 Is_Inner_Join = 0x10
594 } m_properties;
595
596 /** The receiver object that unpacks transid_AI messages.*/
597 NdbReceiver m_receiver;
598
599 /**
600 * ResultSets are received into and read from this stream,
601 * possibly doublebuffered,
602 */
603 NdbResultSet m_resultSets[2];
604 Uint32 m_read; // We read from m_resultSets[m_read]
605 Uint32 m_recv; // We receive into m_resultSets[m_recv]
606
607 /** This is the state of the iterator used by firstResult(), nextResult().*/
608 enum
609 {
610 /** The first row has not been fetched yet.*/
611 Iter_notStarted,
612 /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/
613 Iter_started,
614 /** Last row for current ResultSet has been returned.*/
615 Iter_finished
616 } m_iterState;
617
618 /**
619 * Tuple id of the current tuple, or 'tupleNotFound'
620 * if Iter_notStarted or Iter_finished.
621 */
622 Uint16 m_currentRow;
623
624 /** Max #rows which this stream may recieve in its TupleSet structures */
625 Uint32 m_maxRows;
626
627 /** TupleSet contains the correlation between parent/childs */
628 TupleSet* m_tupleSet;
629
630 void buildResultCorrelations();
631
getTupleId(Uint16 tupleNo) const632 Uint16 getTupleId(Uint16 tupleNo) const
633 { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
634
getCurrentTupleId() const635 Uint16 getCurrentTupleId() const
636 { return (m_currentRow==tupleNotFound) ? tupleNotFound : getTupleId(m_currentRow); }
637
638 Uint16 findTupleWithParentId(Uint16 parentId) const;
639
640 Uint16 findNextTuple(Uint16 tupleNo) const;
641
642 /** No copying.*/
643 NdbResultStream(const NdbResultStream&);
644 NdbResultStream& operator=(const NdbResultStream&);
645 }; //class NdbResultStream
646
647 //////////////////////////////////////////////
648 ///////// NdbBulkAllocator methods ///////////
649 //////////////////////////////////////////////
650
NdbBulkAllocator(size_t objSize)651 NdbBulkAllocator::NdbBulkAllocator(size_t objSize)
652 :m_objSize(objSize),
653 m_maxObjs(0),
654 m_buffer(NULL),
655 m_nextObjNo(0)
656 {}
657
init(Uint32 maxObjs)658 int NdbBulkAllocator::init(Uint32 maxObjs)
659 {
660 assert(m_buffer == NULL);
661 m_maxObjs = maxObjs;
662 // Add check for buffer overrun.
663 m_buffer = new char[m_objSize*m_maxObjs+1];
664 if (unlikely(m_buffer == NULL))
665 {
666 return Err_MemoryAlloc;
667 }
668 m_buffer[m_maxObjs * m_objSize] = endMarker;
669 return 0;
670 }
671
reset()672 void NdbBulkAllocator::reset(){
673 // Overrun check.
674 assert(m_buffer == NULL || m_buffer[m_maxObjs * m_objSize] == endMarker);
675 delete [] m_buffer;
676 m_buffer = NULL;
677 m_nextObjNo = 0;
678 m_maxObjs = 0;
679 }
680
allocObjMem(Uint32 noOfObjs)681 void* NdbBulkAllocator::allocObjMem(Uint32 noOfObjs)
682 {
683 assert(m_nextObjNo + noOfObjs <= m_maxObjs);
684 void * const result = m_buffer+m_objSize*m_nextObjNo;
685 m_nextObjNo += noOfObjs;
686 return m_nextObjNo > m_maxObjs ? NULL : result;
687 }
688
689 ///////////////////////////////////////////
690 ///////// NdbResultSet methods ///////////
691 ///////////////////////////////////////////
NdbResultSet()692 NdbResultSet::NdbResultSet() :
693 m_buffer(NULL),
694 m_correlations(NULL),
695 m_rowCount(0)
696 {}
697
698 void
init(NdbQueryImpl & query,Uint32 maxRows,Uint32 bufferSize)699 NdbResultSet::init(NdbQueryImpl& query,
700 Uint32 maxRows,
701 Uint32 bufferSize)
702 {
703 {
704 NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
705 Uint32 *buffer = reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(bufferSize));
706 m_buffer = NdbReceiver::initReceiveBuffer(buffer, bufferSize, maxRows);
707
708 if (query.getQueryDef().isScanQuery())
709 {
710 m_correlations = reinterpret_cast<TupleCorrelation*>
711 (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation)));
712 }
713 }
714 }
715
716 //////////////////////////////////////////////
717 ///////// NdbResultStream methods ///////////
718 //////////////////////////////////////////////
719
NdbResultStream(NdbQueryOperationImpl & operation,NdbRootFragment & rootFrag)720 NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation,
721 NdbRootFragment& rootFrag)
722 :
723 m_rootFrag(rootFrag),
724 m_operation(operation),
725 m_parent(operation.getParentOperation()
726 ? &rootFrag.getResultStream(*operation.getParentOperation())
727 : NULL),
728 m_properties(
729 (enum properties)
730 ((operation.getQueryDef().isScanQuery()
731 ? Is_Scan_Query : 0)
732 | (operation.getQueryOperationDef().isScanOperation()
733 ? Is_Scan_Result : 0)
734 | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
735 ? Is_Inner_Join : 0))),
736 m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
737 m_resultSets(), m_read(0xffffffff), m_recv(0),
738 m_iterState(Iter_finished),
739 m_currentRow(tupleNotFound),
740 m_maxRows(0),
741 m_tupleSet(NULL)
742 {};
743
~NdbResultStream()744 NdbResultStream::~NdbResultStream()
745 {
746 for (int i = static_cast<int>(m_maxRows)-1; i >= 0; i--)
747 {
748 m_tupleSet[i].~TupleSet();
749 }
750 }
751
752 void
prepare()753 NdbResultStream::prepare()
754 {
755 NdbQueryImpl &query = m_operation.getQuery();
756
757 const Uint32 batchBufferSize = m_operation.getBatchBufferSize();
758 if (isScanQuery())
759 {
760 /* Parent / child correlation is only relevant for scan type queries
761 * Don't create a m_tupleSet with these correlation id's for lookups!
762 */
763 m_maxRows = m_operation.getMaxBatchRows();
764 m_tupleSet =
765 new (query.getTupleSetAlloc().allocObjMem(m_maxRows))
766 TupleSet[m_maxRows];
767
768 // Scan results may be double buffered
769 m_resultSets[0].init(query, m_maxRows, batchBufferSize);
770 m_resultSets[1].init(query, m_maxRows, batchBufferSize);
771 }
772 else
773 {
774 m_maxRows = 1;
775 m_resultSets[0].init(query, m_maxRows, batchBufferSize);
776 }
777
778 /* Alloc buffer for unpacked NdbRecord row */
779 const Uint32 rowSize = m_operation.getRowSize();
780 assert((rowSize % sizeof(Uint32)) == 0);
781 char *rowBuffer = reinterpret_cast<char*>(query.getRowBufferAlloc().allocObjMem(rowSize));
782 assert(rowBuffer != NULL);
783
784 m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, &m_operation);
785 m_receiver.do_setup_ndbrecord(
786 m_operation.getNdbRecord(),
787 rowBuffer,
788 false, /*read_range_no*/
789 false /*read_key_info*/);
790 } //NdbResultStream::prepare
791
792 /** Locate, and return 'tupleNo', of first tuple with specified parentId.
793 * parentId == tupleNotFound is use as a special value for iterating results
794 * from the root operation in the order which they was inserted by
795 * ::buildResultCorrelations()
796 *
797 * Position of 'currentRow' is *not* updated and should be modified by callee
798 * if it want to keep the new position.
799 */
800 Uint16
findTupleWithParentId(Uint16 parentId) const801 NdbResultStream::findTupleWithParentId(Uint16 parentId) const
802 {
803 assert ((parentId==tupleNotFound) == (m_parent==NULL));
804
805 if (likely(m_resultSets[m_read].m_rowCount>0))
806 {
807 if (m_tupleSet==NULL)
808 {
809 assert (m_resultSets[m_read].m_rowCount <= 1);
810 return 0;
811 }
812
813 const Uint16 hash = (parentId % m_maxRows);
814 Uint16 currentRow = m_tupleSet[hash].m_hash_head;
815 while (currentRow != tupleNotFound)
816 {
817 assert(currentRow < m_maxRows);
818 if (m_tupleSet[currentRow].m_skip == false &&
819 m_tupleSet[currentRow].m_parentId == parentId)
820 {
821 return currentRow;
822 }
823 currentRow = m_tupleSet[currentRow].m_hash_next;
824 }
825 }
826 return tupleNotFound;
827 } //NdbResultStream::findTupleWithParentId()
828
829
830 /** Locate, and return 'tupleNo', of next tuple with same parentId as currentRow
831 * Position of 'currentRow' is *not* updated and should be modified by callee
832 * if it want to keep the new position.
833 */
834 Uint16
findNextTuple(Uint16 tupleNo) const835 NdbResultStream::findNextTuple(Uint16 tupleNo) const
836 {
837 if (tupleNo!=tupleNotFound && m_tupleSet!=NULL)
838 {
839 assert(tupleNo < m_maxRows);
840 Uint16 parentId = m_tupleSet[tupleNo].m_parentId;
841 Uint16 nextRow = m_tupleSet[tupleNo].m_hash_next;
842
843 while (nextRow != tupleNotFound)
844 {
845 assert(nextRow < m_maxRows);
846 if (m_tupleSet[nextRow].m_skip == false &&
847 m_tupleSet[nextRow].m_parentId == parentId)
848 {
849 return nextRow;
850 }
851 nextRow = m_tupleSet[nextRow].m_hash_next;
852 }
853 }
854 return tupleNotFound;
855 } //NdbResultStream::findNextTuple()
856
857
858 Uint16
firstResult()859 NdbResultStream::firstResult()
860 {
861 Uint16 parentId = tupleNotFound;
862 if (m_parent!=NULL)
863 {
864 parentId = m_parent->getCurrentTupleId();
865 if (parentId == tupleNotFound)
866 {
867 m_currentRow = tupleNotFound;
868 m_iterState = Iter_finished;
869 return tupleNotFound;
870 }
871 }
872
873 if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
874 {
875 m_iterState = Iter_started;
876 const char *p = m_receiver.getRow(m_resultSets[m_read].m_buffer, m_currentRow);
877 assert(p != NULL); ((void)p);
878 return m_currentRow;
879 }
880
881 m_iterState = Iter_finished;
882 return tupleNotFound;
883 } //NdbResultStream::firstResult()
884
885 Uint16
nextResult()886 NdbResultStream::nextResult()
887 {
888 // Fetch next row for this stream
889 if (m_currentRow != tupleNotFound &&
890 (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
891 {
892 m_iterState = Iter_started;
893 const char *p = m_receiver.getRow(m_resultSets[m_read].m_buffer, m_currentRow);
894 assert(p != NULL); ((void)p);
895 return m_currentRow;
896 }
897 m_iterState = Iter_finished;
898 return tupleNotFound;
899 } //NdbResultStream::nextResult()
900
901 /**
902 * Callback when a TRANSID_AI signal (receive row) is processed.
903 */
904 void
execTRANSID_AI(const Uint32 * ptr,Uint32 len,TupleCorrelation correlation)905 NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
906 TupleCorrelation correlation)
907 {
908 NdbResultSet& receiveSet = m_resultSets[m_recv];
909 if (isScanQuery())
910 {
911 /**
912 * Store TupleCorrelation.
913 */
914 receiveSet.m_correlations[receiveSet.m_rowCount] = correlation;
915 }
916
917 m_receiver.execTRANSID_AI(ptr, len);
918 receiveSet.m_rowCount++;
919 } // NdbResultStream::execTRANSID_AI()
920
921 /**
922 * Make preparation for another batch of results to be received.
923 * This NdbResultStream, and all its sibling will receive a batch
924 * of results from the datanodes.
925 */
926 void
prepareNextReceiveSet()927 NdbResultStream::prepareNextReceiveSet()
928 {
929 if (isScanQuery()) // Doublebuffered ResultSet[] if isScanQuery()
930 {
931 m_recv = (m_recv+1) % 2; // Receive into next ResultSet
932 assert(m_recv != m_read);
933 }
934
935 m_resultSets[m_recv].prepareReceive(m_receiver);
936
937 /**
938 * If this stream will get new rows in the next batch, then so will
939 * all of its descendants.
940 */
941 for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
942 childNo++)
943 {
944 NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
945 m_rootFrag.getResultStream(child).prepareNextReceiveSet();
946 }
947 } //NdbResultStream::prepareNextReceiveSet
948
949 /**
950 * Make preparations for another batch of result to be read:
951 * - Advance to next NdbResultSet. (or reuse last)
952 * - Fill in parent/child result correlations in m_tupleSet[]
953 * - ... or reset m_tupleSet[] if we reuse the previous.
954 * - Apply inner/outer join filtering to remove non qualifying
955 * rows.
956 */
957 bool
prepareResultSet(Uint32 remainingScans)958 NdbResultStream::prepareResultSet(Uint32 remainingScans)
959 {
960 bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
961
962 /**
963 * Prepare NdbResultSet for reading - either the next
964 * 'new' received from datanodes or reuse the last as has been
965 * determined by ::prepareNextReceiveSet()
966 */
967 const bool newResults = (m_read != m_recv);
968 m_read = m_recv;
969 const NdbResultSet& readResult = m_resultSets[m_read];
970
971 if (m_tupleSet!=NULL)
972 {
973 if (newResults)
974 {
975 buildResultCorrelations();
976 }
977 else
978 {
979 // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
980 for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
981 {
982 m_tupleSet[tupleNo].m_skip = false;
983 }
984 }
985 }
986
987 /**
988 * Recursively iterate all child results depth first.
989 * Filter away any result rows which should not be visible (yet) -
990 * Either due to incomplete child batches, or the join being an 'inner join'.
991 * Set result itterator state to 'before first' resultrow.
992 */
993 for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
994 {
995 const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
996 NdbResultStream& childStream = m_rootFrag.getResultStream(child);
997 const bool allSubScansComplete = childStream.prepareResultSet(remainingScans);
998
999 Uint32 childId = child.getQueryOperationDef().getOpNo();
1000
1001 /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
1002 const bool skipNonMatches = !allSubScansComplete || // 1)
1003 childStream.isInnerJoin(); // 2)
1004
1005 if (m_tupleSet!=NULL)
1006 {
1007 for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
1008 {
1009 if (!m_tupleSet[tupleNo].m_skip)
1010 {
1011 Uint16 tupleId = getTupleId(tupleNo);
1012 if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
1013 m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
1014
1015 /////////////////////////////////
1016 // No child matched for this row. Making parent row visible
1017 // will cause a NULL (outer join) row to be produced.
1018 // Skip NULL row production when:
1019 // 1) Some child batches are not complete; they may contain later matches.
1020 // 2) Join type is 'inner join', skip as no child are matching.
1021 // 3) A match was found in a previous batch.
1022 // Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
1023 //
1024 else if (skipNonMatches // 1 & 2)
1025 || m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
1026 m_tupleSet[tupleNo].m_skip = true;
1027 }
1028 }
1029 }
1030 isComplete &= allSubScansComplete;
1031 }
1032
1033 // Set current position 'before first'
1034 m_iterState = Iter_notStarted;
1035 m_currentRow = tupleNotFound;
1036
1037 return isComplete;
1038 } // NdbResultStream::prepareResultSet()
1039
1040
1041 /**
1042 * Fill m_tupleSet[] with correlation data between parent
1043 * and child tuples. The 'TupleCorrelation' is stored
1044 * in an array of TupleCorrelations in each ResultSet
1045 * by execTRANSID_AI().
1046 *
1047 * NOTE: In order to reduce work done when holding the
1048 * transporter mutex, the 'TupleCorrelation' is only stored
1049 * in the buffer when it arrives. Later (here) we build the
1050 * correlation hashMap immediately before we prepare to
1051 * read the NdbResultSet.
1052 */
1053 void
buildResultCorrelations()1054 NdbResultStream::buildResultCorrelations()
1055 {
1056 const NdbResultSet& readResult = m_resultSets[m_read];
1057
1058 //if (m_tupleSet!=NULL)
1059 {
1060 /* Clear the hashmap structures */
1061 for (Uint32 i=0; i<m_maxRows; i++)
1062 {
1063 m_tupleSet[i].m_hash_head = tupleNotFound;
1064 }
1065
1066 /* Rebuild correlation & hashmap from 'readResult' */
1067 for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
1068 {
1069 const Uint16 tupleId = readResult.m_correlations[tupleNo].getTupleId();
1070 const Uint16 parentId = (m_parent!=NULL)
1071 ? readResult.m_correlations[tupleNo].getParentTupleId()
1072 : tupleNotFound;
1073
1074 m_tupleSet[tupleNo].m_skip = false;
1075 m_tupleSet[tupleNo].m_parentId = parentId;
1076 m_tupleSet[tupleNo].m_tupleId = tupleId;
1077 m_tupleSet[tupleNo].m_hasMatchingChild.clear();
1078
1079 /* Insert into parentId-hashmap */
1080 const Uint16 hash = (parentId % m_maxRows);
1081 if (m_parent==NULL)
1082 {
1083 /* Root stream: Insert sequentially in hash_next to make it
1084 * possible to use ::findTupleWithParentId() and ::findNextTuple()
1085 * to navigate even the root operation.
1086 */
1087 /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
1088 if (tupleNo==0)
1089 m_tupleSet[hash].m_hash_head = tupleNo;
1090 else
1091 m_tupleSet[tupleNo-1].m_hash_next = tupleNo;
1092 m_tupleSet[tupleNo].m_hash_next = tupleNotFound;
1093 }
1094 else
1095 {
1096 /* Insert parentId in HashMap */
1097 m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
1098 m_tupleSet[hash].m_hash_head = tupleNo;
1099 }
1100 }
1101 }
1102 } // NdbResultStream::buildResultCorrelations
1103
1104
1105 ///////////////////////////////////////////
1106 //////// NdbRootFragment methods /////////
1107 ///////////////////////////////////////////
buildReciverIdMap(NdbRootFragment * frags,Uint32 noOfFrags)1108 void NdbRootFragment::buildReciverIdMap(NdbRootFragment* frags,
1109 Uint32 noOfFrags)
1110 {
1111 for(Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
1112 {
1113 const Uint32 receiverId = frags[fragNo].getReceiverId();
1114 /**
1115 * For reasons unknow, NdbObjectIdMap shifts ids two bits to the left,
1116 * so we must do the opposite to get a good hash distribution.
1117 */
1118 assert((receiverId & 0x3) == 0);
1119 const int hash =
1120 (receiverId >> 2) % noOfFrags;
1121 frags[fragNo].m_idMapNext = frags[hash].m_idMapHead;
1122 frags[hash].m_idMapHead = fragNo;
1123 }
1124 }
1125
1126 //static
1127 NdbRootFragment*
receiverIdLookup(NdbRootFragment * frags,Uint32 noOfFrags,Uint32 receiverId)1128 NdbRootFragment::receiverIdLookup(NdbRootFragment* frags,
1129 Uint32 noOfFrags,
1130 Uint32 receiverId)
1131 {
1132 /**
1133 * For reasons unknow, NdbObjectIdMap shifts ids two bits to the left,
1134 * so we must do the opposite to get a good hash distribution.
1135 */
1136 assert((receiverId & 0x3) == 0);
1137 const int hash = (receiverId >> 2) % noOfFrags;
1138 int current = frags[hash].m_idMapHead;
1139 assert(current < static_cast<int>(noOfFrags));
1140 while (current >= 0 && frags[current].getReceiverId() != receiverId)
1141 {
1142 current = frags[current].m_idMapNext;
1143 assert(current < static_cast<int>(noOfFrags));
1144 }
1145 if (unlikely (current < 0))
1146 {
1147 return NULL;
1148 }
1149 else
1150 {
1151 return frags+current;
1152 }
1153 }
1154
1155
NdbRootFragment()1156 NdbRootFragment::NdbRootFragment():
1157 m_query(NULL),
1158 m_fragNo(voidFragNo),
1159 m_resultStreams(NULL),
1160 m_pendingRequests(0),
1161 m_availResultSets(0),
1162 m_outstandingResults(0),
1163 m_confReceived(false),
1164 m_remainingScans(0xffffffff),
1165 m_idMapHead(-1),
1166 m_idMapNext(-1)
1167 {
1168 }
1169
~NdbRootFragment()1170 NdbRootFragment::~NdbRootFragment()
1171 {
1172 assert(m_resultStreams==NULL);
1173 }
1174
init(NdbQueryImpl & query,Uint32 fragNo)1175 void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo)
1176 {
1177 assert(m_fragNo==voidFragNo);
1178 m_query = &query;
1179 m_fragNo = fragNo;
1180
1181 m_resultStreams = reinterpret_cast<NdbResultStream*>
1182 (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations()));
1183 assert(m_resultStreams!=NULL);
1184
1185 for (unsigned opNo=0; opNo<query.getNoOfOperations(); opNo++)
1186 {
1187 NdbQueryOperationImpl& op = query.getQueryOperation(opNo);
1188 new (&m_resultStreams[opNo]) NdbResultStream(op,*this);
1189 m_resultStreams[opNo].prepare();
1190 }
1191 }
1192
1193 /**
1194 * Release what we want need anymore after last available row has been
1195 * returned from datanodes.
1196 */
1197 void
postFetchRelease()1198 NdbRootFragment::postFetchRelease()
1199 {
1200 if (m_resultStreams != NULL)
1201 {
1202 for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
1203 {
1204 m_resultStreams[opNo].~NdbResultStream();
1205 }
1206 }
1207 /**
1208 * Don't 'delete' the object as it was in-place constructed from
1209 * ResultStreamAlloc'ed memory. Memory is released by
1210 * ResultStreamAlloc::reset().
1211 */
1212 m_resultStreams = NULL;
1213 }
1214
1215 NdbResultStream&
getResultStream(Uint32 operationNo) const1216 NdbRootFragment::getResultStream(Uint32 operationNo) const
1217 {
1218 assert(m_resultStreams);
1219 return m_resultStreams[operationNo];
1220 }
1221
1222 /**
1223 * Throw any pending ResultSets from specified rootFrags[]
1224 */
1225 //static
clear(NdbRootFragment * rootFrags,Uint32 noOfFrags)1226 void NdbRootFragment::clear(NdbRootFragment* rootFrags, Uint32 noOfFrags)
1227 {
1228 if (rootFrags != NULL)
1229 {
1230 for (Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
1231 {
1232 rootFrags[fragNo].m_pendingRequests = 0;
1233 rootFrags[fragNo].m_availResultSets = 0;
1234 }
1235 }
1236 }
1237
1238 /**
1239 * Check if there has been requested more ResultSets from
1240 * this fragment which has not been consumed yet.
1241 * (This is also a candicate check for ::hasReceivedMore())
1242 */
hasRequestedMore() const1243 bool NdbRootFragment::hasRequestedMore() const
1244 {
1245 return (m_pendingRequests > 0);
1246 }
1247
1248 /**
1249 * Signal that another complete ResultSet is available for
1250 * this NdbRootFragment.
1251 * Need mutex lock as 'm_availResultSets' is accesed both from
1252 * receiver and application thread.
1253 */
setReceivedMore()1254 void NdbRootFragment::setReceivedMore() // Need mutex
1255 {
1256 assert(m_availResultSets==0);
1257 m_availResultSets++;
1258 }
1259
1260 /**
1261 * Check if another ResultSets has been received and is available
1262 * for reading. It will be given to the application thread when it
1263 * call ::grabNextResultSet().
1264 * Need mutex lock as 'm_availResultSets' is accesed both from
1265 * receiver and application thread.
1266 */
hasReceivedMore() const1267 bool NdbRootFragment::hasReceivedMore() const // Need mutex
1268 {
1269 return (m_availResultSets > 0);
1270 }
1271
prepareNextReceiveSet()1272 void NdbRootFragment::prepareNextReceiveSet()
1273 {
1274 assert(m_fragNo!=voidFragNo);
1275 assert(m_outstandingResults == 0);
1276
1277 for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
1278 {
1279 NdbResultStream& resultStream = getResultStream(opNo);
1280 if (!resultStream.isSubScanComplete(m_remainingScans))
1281 {
1282 /**
1283 * Reset resultStream and all its descendants, since all these
1284 * streams will get a new set of rows in the next batch.
1285 */
1286 resultStream.prepareNextReceiveSet();
1287 }
1288 }
1289 m_confReceived = false;
1290 m_pendingRequests++;
1291 }
1292
1293 /**
1294 * Let the application thread takes ownership of an available
1295 * ResultSet, prepare it for reading first row.
1296 * Need mutex lock as 'm_availResultSets' is accesed both from
1297 * receiver and application thread.
1298 */
grabNextResultSet()1299 void NdbRootFragment::grabNextResultSet() // Need mutex
1300 {
1301 assert(m_availResultSets>0);
1302 m_availResultSets--;
1303
1304 assert(m_pendingRequests>0);
1305 m_pendingRequests--;
1306
1307 NdbResultStream& rootStream = getResultStream(0);
1308 rootStream.prepareResultSet(m_remainingScans);
1309
1310 /* Position at the first (sorted?) row available from this fragments.
1311 */
1312 rootStream.firstResult();
1313 }
1314
setConfReceived(Uint32 tcPtrI)1315 void NdbRootFragment::setConfReceived(Uint32 tcPtrI)
1316 {
1317 /* For a query with a lookup root, there may be more than one TCKEYCONF
1318 message. For a scan, there should only be one SCAN_TABCONF per root
1319 fragment.
1320 */
1321 assert(!getResultStream(0).isScanQuery() || !m_confReceived);
1322 getResultStream(0).getReceiver().m_tcPtrI = tcPtrI;
1323 m_confReceived = true;
1324 }
1325
finalBatchReceived() const1326 bool NdbRootFragment::finalBatchReceived() const
1327 {
1328 return m_confReceived && getReceiverTcPtrI()==RNIL;
1329 }
1330
isEmpty() const1331 bool NdbRootFragment::isEmpty() const
1332 {
1333 return getResultStream(0).isEmpty();
1334 }
1335
1336 /**
1337 * SPJ requests are identified by the receiver-id of the
1338 * *root* ResultStream for each RootFragment. Furthermore
1339 * a NEXTREQ use the tcPtrI saved in this ResultStream to
1340 * identify the 'cursor' to restart.
1341 *
1342 * We provide some convenient accessors for fetching this info
1343 */
getReceiverId() const1344 Uint32 NdbRootFragment::getReceiverId() const
1345 {
1346 return getResultStream(0).getReceiver().getId();
1347 }
1348
getReceiverTcPtrI() const1349 Uint32 NdbRootFragment::getReceiverTcPtrI() const
1350 {
1351 return getResultStream(0).getReceiver().m_tcPtrI;
1352 }
1353
1354 ///////////////////////////////////////////
1355 ///////// NdbQuery API methods ///////////
1356 ///////////////////////////////////////////
1357
NdbQuery(NdbQueryImpl & impl)1358 NdbQuery::NdbQuery(NdbQueryImpl& impl):
1359 m_impl(impl)
1360 {}
1361
~NdbQuery()1362 NdbQuery::~NdbQuery()
1363 {}
1364
1365 Uint32
getNoOfOperations() const1366 NdbQuery::getNoOfOperations() const
1367 {
1368 return m_impl.getNoOfOperations();
1369 }
1370
1371 NdbQueryOperation*
getQueryOperation(Uint32 index) const1372 NdbQuery::getQueryOperation(Uint32 index) const
1373 {
1374 return &m_impl.getQueryOperation(index).getInterface();
1375 }
1376
1377 NdbQueryOperation*
getQueryOperation(const char * ident) const1378 NdbQuery::getQueryOperation(const char* ident) const
1379 {
1380 NdbQueryOperationImpl* op = m_impl.getQueryOperation(ident);
1381 return (op) ? &op->getInterface() : NULL;
1382 }
1383
1384 Uint32
getNoOfParameters() const1385 NdbQuery::getNoOfParameters() const
1386 {
1387 return m_impl.getNoOfParameters();
1388 }
1389
1390 const NdbParamOperand*
getParameter(const char * name) const1391 NdbQuery::getParameter(const char* name) const
1392 {
1393 return m_impl.getParameter(name);
1394 }
1395
1396 const NdbParamOperand*
getParameter(Uint32 num) const1397 NdbQuery::getParameter(Uint32 num) const
1398 {
1399 return m_impl.getParameter(num);
1400 }
1401
1402 int
setBound(const NdbRecord * keyRecord,const NdbIndexScanOperation::IndexBound * bound)1403 NdbQuery::setBound(const NdbRecord *keyRecord,
1404 const NdbIndexScanOperation::IndexBound *bound)
1405 {
1406 const int error = m_impl.setBound(keyRecord,bound);
1407 if (unlikely(error)) {
1408 m_impl.setErrorCode(error);
1409 return -1;
1410 } else {
1411 return 0;
1412 }
1413 }
1414
1415 NdbQuery::NextResultOutcome
nextResult(bool fetchAllowed,bool forceSend)1416 NdbQuery::nextResult(bool fetchAllowed, bool forceSend)
1417 {
1418 return m_impl.nextResult(fetchAllowed, forceSend);
1419 }
1420
1421 void
close(bool forceSend)1422 NdbQuery::close(bool forceSend)
1423 {
1424 m_impl.close(forceSend);
1425 }
1426
1427 NdbTransaction*
getNdbTransaction() const1428 NdbQuery::getNdbTransaction() const
1429 {
1430 return &m_impl.getNdbTransaction();
1431 }
1432
1433 const NdbError&
getNdbError() const1434 NdbQuery::getNdbError() const {
1435 return m_impl.getNdbError();
1436 };
1437
isPrunable(bool & prunable) const1438 int NdbQuery::isPrunable(bool& prunable) const
1439 {
1440 return m_impl.isPrunable(prunable);
1441 }
1442
NdbQueryOperation(NdbQueryOperationImpl & impl)1443 NdbQueryOperation::NdbQueryOperation(NdbQueryOperationImpl& impl)
1444 :m_impl(impl)
1445 {}
~NdbQueryOperation()1446 NdbQueryOperation::~NdbQueryOperation()
1447 {}
1448
1449 Uint32
getNoOfParentOperations() const1450 NdbQueryOperation::getNoOfParentOperations() const
1451 {
1452 return m_impl.getNoOfParentOperations();
1453 }
1454
1455 NdbQueryOperation*
getParentOperation(Uint32 i) const1456 NdbQueryOperation::getParentOperation(Uint32 i) const
1457 {
1458 return &m_impl.getParentOperation(i).getInterface();
1459 }
1460
1461 Uint32
getNoOfChildOperations() const1462 NdbQueryOperation::getNoOfChildOperations() const
1463 {
1464 return m_impl.getNoOfChildOperations();
1465 }
1466
1467 NdbQueryOperation*
getChildOperation(Uint32 i) const1468 NdbQueryOperation::getChildOperation(Uint32 i) const
1469 {
1470 return &m_impl.getChildOperation(i).getInterface();
1471 }
1472
1473 const NdbQueryOperationDef&
getQueryOperationDef() const1474 NdbQueryOperation::getQueryOperationDef() const
1475 {
1476 return m_impl.getQueryOperationDef().getInterface();
1477 }
1478
1479 NdbQuery&
getQuery() const1480 NdbQueryOperation::getQuery() const {
1481 return m_impl.getQuery().getInterface();
1482 };
1483
1484 NdbRecAttr*
getValue(const char * anAttrName,char * resultBuffer)1485 NdbQueryOperation::getValue(const char* anAttrName,
1486 char* resultBuffer)
1487 {
1488 return m_impl.getValue(anAttrName, resultBuffer);
1489 }
1490
1491 NdbRecAttr*
getValue(Uint32 anAttrId,char * resultBuffer)1492 NdbQueryOperation::getValue(Uint32 anAttrId,
1493 char* resultBuffer)
1494 {
1495 return m_impl.getValue(anAttrId, resultBuffer);
1496 }
1497
1498 NdbRecAttr*
getValue(const NdbDictionary::Column * column,char * resultBuffer)1499 NdbQueryOperation::getValue(const NdbDictionary::Column* column,
1500 char* resultBuffer)
1501 {
1502 if (unlikely(column==NULL)) {
1503 m_impl.getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
1504 return NULL;
1505 }
1506 return m_impl.getValue(NdbColumnImpl::getImpl(*column), resultBuffer);
1507 }
1508
1509 int
setResultRowBuf(const NdbRecord * rec,char * resBuffer,const unsigned char * result_mask)1510 NdbQueryOperation::setResultRowBuf (
1511 const NdbRecord *rec,
1512 char* resBuffer,
1513 const unsigned char* result_mask)
1514 {
1515 if (unlikely(rec==0 || resBuffer==0)) {
1516 m_impl.getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
1517 return -1;
1518 }
1519 return m_impl.setResultRowBuf(rec, resBuffer, result_mask);
1520 }
1521
1522 int
setResultRowRef(const NdbRecord * rec,const char * & bufRef,const unsigned char * result_mask)1523 NdbQueryOperation::setResultRowRef (
1524 const NdbRecord* rec,
1525 const char* & bufRef,
1526 const unsigned char* result_mask)
1527 {
1528 return m_impl.setResultRowRef(rec, bufRef, result_mask);
1529 }
1530
1531 int
setOrdering(NdbQueryOptions::ScanOrdering ordering)1532 NdbQueryOperation::setOrdering(NdbQueryOptions::ScanOrdering ordering)
1533 {
1534 return m_impl.setOrdering(ordering);
1535 }
1536
1537 NdbQueryOptions::ScanOrdering
getOrdering() const1538 NdbQueryOperation::getOrdering() const
1539 {
1540 return m_impl.getOrdering();
1541 }
1542
setParallelism(Uint32 parallelism)1543 int NdbQueryOperation::setParallelism(Uint32 parallelism){
1544 return m_impl.setParallelism(parallelism);
1545 }
1546
setMaxParallelism()1547 int NdbQueryOperation::setMaxParallelism(){
1548 return m_impl.setMaxParallelism();
1549 }
1550
setAdaptiveParallelism()1551 int NdbQueryOperation::setAdaptiveParallelism(){
1552 return m_impl.setAdaptiveParallelism();
1553 }
1554
setBatchSize(Uint32 batchSize)1555 int NdbQueryOperation::setBatchSize(Uint32 batchSize){
1556 return m_impl.setBatchSize(batchSize);
1557 }
1558
setInterpretedCode(const NdbInterpretedCode & code) const1559 int NdbQueryOperation::setInterpretedCode(const NdbInterpretedCode& code) const
1560 {
1561 return m_impl.setInterpretedCode(code);
1562 }
1563
1564 NdbQuery::NextResultOutcome
firstResult()1565 NdbQueryOperation::firstResult()
1566 {
1567 return m_impl.firstResult();
1568 }
1569
1570 NdbQuery::NextResultOutcome
nextResult(bool fetchAllowed,bool forceSend)1571 NdbQueryOperation::nextResult(bool fetchAllowed, bool forceSend)
1572 {
1573 return m_impl.nextResult(fetchAllowed, forceSend);
1574 }
1575
1576 bool
isRowNULL() const1577 NdbQueryOperation::isRowNULL() const
1578 {
1579 return m_impl.isRowNULL();
1580 }
1581
1582 bool
isRowChanged() const1583 NdbQueryOperation::isRowChanged() const
1584 {
1585 return m_impl.isRowChanged();
1586 }
1587
1588 /////////////////////////////////////////////////
1589 ///////// NdbQueryParamValue methods ///////////
1590 /////////////////////////////////////////////////
1591
1592 enum Type
1593 {
1594 Type_NULL,
1595 Type_raw, // Raw data formated according to bound Column format.
1596 Type_raw_shrink, // As Type_raw, except short VarChar has to be shrinked.
1597 Type_string, // '\0' terminated C-type string, char/varchar data only
1598 Type_Uint16,
1599 Type_Uint32,
1600 Type_Uint64,
1601 Type_Double
1602 };
1603
NdbQueryParamValue(Uint16 val)1604 NdbQueryParamValue::NdbQueryParamValue(Uint16 val) : m_type(Type_Uint16)
1605 { m_value.uint16 = val; }
1606
NdbQueryParamValue(Uint32 val)1607 NdbQueryParamValue::NdbQueryParamValue(Uint32 val) : m_type(Type_Uint32)
1608 { m_value.uint32 = val; }
1609
NdbQueryParamValue(Uint64 val)1610 NdbQueryParamValue::NdbQueryParamValue(Uint64 val) : m_type(Type_Uint64)
1611 { m_value.uint64 = val; }
1612
NdbQueryParamValue(double val)1613 NdbQueryParamValue::NdbQueryParamValue(double val) : m_type(Type_Double)
1614 { m_value.dbl = val; }
1615
1616 // C-type string, terminated by '\0'
NdbQueryParamValue(const char * val)1617 NdbQueryParamValue::NdbQueryParamValue(const char* val) : m_type(Type_string)
1618 { m_value.string = val; }
1619
1620 // Raw data
NdbQueryParamValue(const void * val,bool shrinkVarChar)1621 NdbQueryParamValue::NdbQueryParamValue(const void* val, bool shrinkVarChar)
1622 : m_type(shrinkVarChar ? Type_raw_shrink : Type_raw)
1623 { m_value.raw = val; }
1624
1625 // NULL-value, also used as optional end marker
NdbQueryParamValue()1626 NdbQueryParamValue::NdbQueryParamValue() : m_type(Type_NULL)
1627 {}
1628
1629
1630 int
serializeValue(const class NdbColumnImpl & column,Uint32Buffer & dst,Uint32 & len,bool & isNull) const1631 NdbQueryParamValue::serializeValue(const class NdbColumnImpl& column,
1632 Uint32Buffer& dst,
1633 Uint32& len,
1634 bool& isNull) const
1635 {
1636 const Uint32 maxSize = column.getSizeInBytes();
1637 isNull = false;
1638 // Start at (32-bit) word boundary.
1639 dst.skipRestOfWord();
1640
1641 // Fetch parameter value and length.
1642 // Rudimentary typecheck of paramvalue: At least length should be as expected:
1643 // - Extend with more types if required
1644 // - Considder to add simple type conversion, ex: Int64 -> Int32
1645 // - Or
1646 // -- Represent all exact numeric as Int64 and convert to 'smaller' int
1647 // -- Represent all floats as Double and convert to smaller floats
1648 //
1649 switch(m_type)
1650 {
1651 case Type_NULL:
1652 isNull = true;
1653 len = 0;
1654 break;
1655
1656 case Type_Uint16:
1657 if (unlikely(column.getType() != NdbDictionary::Column::Smallint &&
1658 column.getType() != NdbDictionary::Column::Smallunsigned))
1659 return QRY_PARAMETER_HAS_WRONG_TYPE;
1660
1661 len = static_cast<Uint32>(sizeof(m_value.uint16));
1662 assert(len == maxSize);
1663 dst.appendBytes(&m_value.uint16, len);
1664 break;
1665
1666 case Type_Uint32:
1667 if (unlikely(column.getType() != NdbDictionary::Column::Int &&
1668 column.getType() != NdbDictionary::Column::Unsigned))
1669 return QRY_PARAMETER_HAS_WRONG_TYPE;
1670
1671 len = static_cast<Uint32>(sizeof(m_value.uint32));
1672 assert(len == maxSize);
1673 dst.appendBytes(&m_value.uint32, len);
1674 break;
1675
1676 case Type_Uint64:
1677 if (unlikely(column.getType() != NdbDictionary::Column::Bigint &&
1678 column.getType() != NdbDictionary::Column::Bigunsigned))
1679 return QRY_PARAMETER_HAS_WRONG_TYPE;
1680
1681 len = static_cast<Uint32>(sizeof(m_value.uint64));
1682 assert(len == maxSize);
1683 dst.appendBytes(&m_value.uint64, len);
1684 break;
1685
1686 case Type_Double:
1687 if (unlikely(column.getType() != NdbDictionary::Column::Double))
1688 return QRY_PARAMETER_HAS_WRONG_TYPE;
1689
1690 len = static_cast<Uint32>(sizeof(m_value.dbl));
1691 assert(len == maxSize);
1692 dst.appendBytes(&m_value.dbl, len);
1693 break;
1694
1695 case Type_string:
1696 if (unlikely(column.getType() != NdbDictionary::Column::Char &&
1697 column.getType() != NdbDictionary::Column::Varchar &&
1698 column.getType() != NdbDictionary::Column::Longvarchar))
1699 return QRY_PARAMETER_HAS_WRONG_TYPE;
1700 {
1701 len = static_cast<Uint32>(strlen(m_value.string));
1702 if (unlikely(len > maxSize))
1703 return QRY_CHAR_PARAMETER_TRUNCATED;
1704
1705 dst.appendBytes(m_value.string, len);
1706 }
1707 break;
1708
1709 case Type_raw:
1710 // 'Raw' data is readily formated according to the bound column
1711 if (likely(column.m_arrayType == NDB_ARRAYTYPE_FIXED))
1712 {
1713 len = maxSize;
1714 dst.appendBytes(m_value.raw, maxSize);
1715 }
1716 else if (column.m_arrayType == NDB_ARRAYTYPE_SHORT_VAR)
1717 {
1718 len = 1+*((Uint8*)(m_value.raw));
1719
1720 assert(column.getType() == NdbDictionary::Column::Varchar ||
1721 column.getType() == NdbDictionary::Column::Varbinary);
1722 if (unlikely(len > 1+static_cast<Uint32>(column.getLength())))
1723 return QRY_CHAR_PARAMETER_TRUNCATED;
1724
1725 dst.appendBytes(m_value.raw, len);
1726 }
1727 else if (column.m_arrayType == NDB_ARRAYTYPE_MEDIUM_VAR)
1728 {
1729 len = 2+uint2korr((Uint8*)m_value.raw);
1730
1731 assert(column.getType() == NdbDictionary::Column::Longvarchar ||
1732 column.getType() == NdbDictionary::Column::Longvarbinary);
1733 if (unlikely(len > 2+static_cast<Uint32>(column.getLength())))
1734 return QRY_CHAR_PARAMETER_TRUNCATED;
1735 dst.appendBytes(m_value.raw, len);
1736 }
1737 else
1738 {
1739 assert(0);
1740 }
1741 break;
1742
1743 case Type_raw_shrink:
1744 // Only short VarChar can be shrinked
1745 if (unlikely(column.m_arrayType != NDB_ARRAYTYPE_SHORT_VAR))
1746 return QRY_PARAMETER_HAS_WRONG_TYPE;
1747
1748 assert(column.getType() == NdbDictionary::Column::Varchar ||
1749 column.getType() == NdbDictionary::Column::Varbinary);
1750
1751 {
1752 // Convert from two-byte to one-byte length field.
1753 len = 1+uint2korr((Uint8*)m_value.raw);
1754 assert(len <= 0x100);
1755
1756 if (unlikely(len > 1+static_cast<Uint32>(column.getLength())))
1757 return QRY_CHAR_PARAMETER_TRUNCATED;
1758
1759 const Uint8 shortLen = static_cast<Uint8>(len-1);
1760 dst.appendBytes(&shortLen, 1);
1761 dst.appendBytes(((Uint8*)m_value.raw)+2, shortLen);
1762 }
1763 break;
1764
1765 default:
1766 assert(false);
1767 }
1768 if (unlikely(dst.isMemoryExhausted())) {
1769 return Err_MemoryAlloc;
1770 }
1771 return 0;
1772 } // NdbQueryParamValue::serializeValue
1773
1774 ///////////////////////////////////////////
1775 ///////// NdbQueryImpl methods ///////////
1776 ///////////////////////////////////////////
1777
NdbQueryImpl(NdbTransaction & trans,const NdbQueryDefImpl & queryDef)1778 NdbQueryImpl::NdbQueryImpl(NdbTransaction& trans,
1779 const NdbQueryDefImpl& queryDef):
1780 m_interface(*this),
1781 m_state(Initial),
1782 m_tcState(Inactive),
1783 m_next(NULL),
1784 m_queryDef(&queryDef),
1785 m_error(),
1786 m_errorReceived(0),
1787 m_transaction(trans),
1788 m_scanTransaction(NULL),
1789 m_operations(0),
1790 m_countOperations(0),
1791 m_globalCursor(0),
1792 m_pendingFrags(0),
1793 m_rootFragCount(0),
1794 m_rootFrags(NULL),
1795 m_applFrags(),
1796 m_finalBatchFrags(0),
1797 m_num_bounds(0),
1798 m_shortestBound(0xffffffff),
1799 m_attrInfo(),
1800 m_keyInfo(),
1801 m_startIndicator(false),
1802 m_commitIndicator(false),
1803 m_prunability(Prune_No),
1804 m_pruneHashVal(0),
1805 m_operationAlloc(sizeof(NdbQueryOperationImpl)),
1806 m_tupleSetAlloc(sizeof(NdbResultStream::TupleSet)),
1807 m_resultStreamAlloc(sizeof(NdbResultStream)),
1808 m_pointerAlloc(sizeof(void*)),
1809 m_rowBufferAlloc(sizeof(char))
1810 {
1811 // Allocate memory for all m_operations[] in a single chunk
1812 m_countOperations = queryDef.getNoOfOperations();
1813 const int error = m_operationAlloc.init(m_countOperations);
1814 if (unlikely(error != 0))
1815 {
1816 setErrorCode(error);
1817 return;
1818 }
1819 m_operations = reinterpret_cast<NdbQueryOperationImpl*>
1820 (m_operationAlloc.allocObjMem(m_countOperations));
1821
1822 // Then; use placement new to construct each individual
1823 // NdbQueryOperationImpl object in m_operations
1824 for (Uint32 i=0; i<m_countOperations; ++i)
1825 {
1826 const NdbQueryOperationDefImpl& def = queryDef.getQueryOperation(i);
1827 new(&m_operations[i]) NdbQueryOperationImpl(*this, def);
1828 // Failed to create NdbQueryOperationImpl object.
1829 if (m_error.code != 0)
1830 {
1831 // Destroy those objects that we have already constructed.
1832 for (int j = static_cast<int>(i)-1; j>= 0; j--)
1833 {
1834 m_operations[j].~NdbQueryOperationImpl();
1835 }
1836 m_operations = NULL;
1837 return;
1838 }
1839 }
1840
1841 // Serialized QueryTree definition is first part of ATTRINFO.
1842 m_attrInfo.append(queryDef.getSerialized());
1843 }
1844
~NdbQueryImpl()1845 NdbQueryImpl::~NdbQueryImpl()
1846 {
1847 /** BEWARE:
1848 * Don't refer NdbQueryDef or NdbQueryOperationDefs after
1849 * NdbQuery::close() as at this stage the appliaction is
1850 * allowed to destruct the Def's.
1851 */
1852 assert(m_state==Closed);
1853 assert(m_rootFrags==NULL);
1854
1855 // NOTE: m_operations[] was allocated as a single memory chunk with
1856 // placement new construction of each operation.
1857 // Requires explicit call to d'tor of each operation before memory is free'ed.
1858 if (m_operations != NULL) {
1859 for (int i=m_countOperations-1; i>=0; --i)
1860 { m_operations[i].~NdbQueryOperationImpl();
1861 }
1862 m_operations = NULL;
1863 }
1864 m_state = Destructed;
1865 }
1866
1867 void
postFetchRelease()1868 NdbQueryImpl::postFetchRelease()
1869 {
1870 if (m_rootFrags != NULL)
1871 {
1872 for (unsigned i=0; i<m_rootFragCount; i++)
1873 { m_rootFrags[i].postFetchRelease();
1874 }
1875 }
1876 if (m_operations != NULL)
1877 {
1878 for (unsigned i=0; i<m_countOperations; i++)
1879 { m_operations[i].postFetchRelease();
1880 }
1881 }
1882 delete[] m_rootFrags;
1883 m_rootFrags = NULL;
1884
1885 m_rowBufferAlloc.reset();
1886 m_tupleSetAlloc.reset();
1887 m_resultStreamAlloc.reset();
1888 }
1889
1890
1891 //static
1892 NdbQueryImpl*
buildQuery(NdbTransaction & trans,const NdbQueryDefImpl & queryDef)1893 NdbQueryImpl::buildQuery(NdbTransaction& trans,
1894 const NdbQueryDefImpl& queryDef)
1895 {
1896 assert(queryDef.getNoOfOperations() > 0);
1897 // Check for online upgrade/downgrade.
1898 if (unlikely(!ndb_join_pushdown(trans.getNdb()->getMinDbNodeVersion())))
1899 {
1900 trans.setOperationErrorCodeAbort(Err_FunctionNotImplemented);
1901 return NULL;
1902 }
1903 NdbQueryImpl* const query = new NdbQueryImpl(trans, queryDef);
1904 if (unlikely(query==NULL)) {
1905 trans.setOperationErrorCodeAbort(Err_MemoryAlloc);
1906 return NULL;
1907 }
1908 if (unlikely(query->m_error.code != 0))
1909 {
1910 // Transaction error code set already.
1911 query->release();
1912 return NULL;
1913 }
1914 assert(query->m_state==Initial);
1915 return query;
1916 }
1917
1918
1919 /** Assign supplied parameter values to the parameter placeholders
1920 * Created when the query was defined.
1921 * Values are *copied* into this NdbQueryImpl object:
1922 * Memory location used as source for parameter values don't have
1923 * to be valid after this assignment.
1924 */
1925 int
assignParameters(const NdbQueryParamValue paramValues[])1926 NdbQueryImpl::assignParameters(const NdbQueryParamValue paramValues[])
1927 {
1928 /**
1929 * Immediately build the serialized parameter representation in order
1930 * to avoid storing param values elsewhere until query is executed.
1931 * Also calculates prunable property, and possibly its hashValue.
1932 */
1933 // Build explicit key/filter/bounds for root operation, possibly refering paramValues
1934 const int error = getRoot().prepareKeyInfo(m_keyInfo, paramValues);
1935 if (unlikely(error != 0))
1936 {
1937 setErrorCode(error);
1938 return -1;
1939 }
1940
1941 // Serialize parameter values for the other (non-root) operations
1942 // (No need to serialize for root (i==0) as root key is part of keyInfo above)
1943 for (Uint32 i=1; i<getNoOfOperations(); ++i)
1944 {
1945 if (getQueryDef().getQueryOperation(i).getNoOfParameters() > 0)
1946 {
1947 const int error = getQueryOperation(i).serializeParams(paramValues);
1948 if (unlikely(error != 0))
1949 {
1950 setErrorCode(error);
1951 return -1;
1952 }
1953 }
1954 }
1955 assert(m_state<Defined);
1956 m_state = Defined;
1957 return 0;
1958 } // NdbQueryImpl::assignParameters
1959
1960
1961 static int
insert_bound(Uint32Buffer & keyInfo,const NdbRecord * key_record,Uint32 column_index,const char * row,Uint32 bound_type)1962 insert_bound(Uint32Buffer& keyInfo, const NdbRecord *key_record,
1963 Uint32 column_index,
1964 const char *row,
1965 Uint32 bound_type)
1966 {
1967 char buf[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
1968 const NdbRecord::Attr *column= &key_record->columns[column_index];
1969
1970 bool is_null= column->is_null(row);
1971 Uint32 len= 0;
1972 const void *aValue= row+column->offset;
1973
1974 if (!is_null)
1975 {
1976 bool len_ok;
1977 /* Support for special mysqld varchar format in keys. */
1978 if (column->flags & NdbRecord::IsMysqldShrinkVarchar)
1979 {
1980 len_ok= column->shrink_varchar(row, len, buf);
1981 aValue= buf;
1982 }
1983 else
1984 {
1985 len_ok= column->get_var_length(row, len);
1986 }
1987 if (!len_ok) {
1988 return Err_WrongFieldLength;
1989 }
1990 }
1991
1992 AttributeHeader ah(column->index_attrId, len);
1993 keyInfo.append(bound_type);
1994 keyInfo.append(ah.m_value);
1995 keyInfo.appendBytes(aValue,len);
1996
1997 return 0;
1998 }
1999
2000
2001 int
setBound(const NdbRecord * key_record,const NdbIndexScanOperation::IndexBound * bound)2002 NdbQueryImpl::setBound(const NdbRecord *key_record,
2003 const NdbIndexScanOperation::IndexBound *bound)
2004 {
2005 m_prunability = Prune_Unknown;
2006 if (unlikely(key_record == NULL || bound==NULL))
2007 return QRY_REQ_ARG_IS_NULL;
2008
2009 if (unlikely(getRoot().getQueryOperationDef().getType()
2010 != NdbQueryOperationDef::OrderedIndexScan))
2011 {
2012 return QRY_WRONG_OPERATION_TYPE;
2013 }
2014
2015 assert(m_state >= Defined);
2016 if (m_state != Defined)
2017 {
2018 return QRY_ILLEGAL_STATE;
2019 }
2020
2021 int startPos = m_keyInfo.getSize();
2022
2023 // We don't handle both NdbQueryIndexBound defined in ::scanIndex()
2024 // in combination with a later ::setBound(NdbIndexScanOperation::IndexBound)
2025 //assert (m_bound.lowKeys==0 && m_bound.highKeys==0);
2026
2027 if (unlikely(bound->range_no != m_num_bounds ||
2028 bound->range_no > NdbIndexScanOperation::MaxRangeNo))
2029 {
2030 // setErrorCodeAbort(4286);
2031 return Err_InvalidRangeNo;
2032 }
2033
2034 Uint32 key_count= bound->low_key_count;
2035 Uint32 common_key_count= key_count;
2036 if (key_count < bound->high_key_count)
2037 key_count= bound->high_key_count;
2038 else
2039 common_key_count= bound->high_key_count;
2040
2041 if (m_shortestBound > common_key_count)
2042 {
2043 m_shortestBound = common_key_count;
2044 }
2045 /* Has the user supplied an open range (no bounds)? */
2046 const bool openRange= ((bound->low_key == NULL || bound->low_key_count == 0) &&
2047 (bound->high_key == NULL || bound->high_key_count == 0));
2048 if (likely(!openRange))
2049 {
2050 /* If low and high key pointers are the same and key counts are
2051 * the same, we send as an Eq bound to save bandwidth.
2052 * This will not send an EQ bound if :
2053 * - Different numbers of high and low keys are EQ
2054 * - High and low keys are EQ, but use different ptrs
2055 */
2056 const bool isEqRange=
2057 (bound->low_key == bound->high_key) &&
2058 (bound->low_key_count == bound->high_key_count) &&
2059 (bound->low_inclusive && bound->high_inclusive); // Does this matter?
2060
2061 if (isEqRange)
2062 {
2063 /* Using BoundEQ will result in bound being sent only once */
2064 for (unsigned j= 0; j<key_count; j++)
2065 {
2066 const int error=
2067 insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2068 bound->low_key, NdbIndexScanOperation::BoundEQ);
2069 if (unlikely(error))
2070 return error;
2071 }
2072 }
2073 else
2074 {
2075 /* Distinct upper and lower bounds, must specify them independently */
2076 /* Note : Protocol allows individual columns to be specified as EQ
2077 * or some prefix of columns. This is not currently supported from
2078 * NDBAPI.
2079 */
2080 for (unsigned j= 0; j<key_count; j++)
2081 {
2082 Uint32 bound_type;
2083 /* If key is part of lower bound */
2084 if (bound->low_key && j<bound->low_key_count)
2085 {
2086 /* Inclusive if defined, or matching rows can include this value */
2087 bound_type= bound->low_inclusive || j+1 < bound->low_key_count ?
2088 NdbIndexScanOperation::BoundLE : NdbIndexScanOperation::BoundLT;
2089 const int error=
2090 insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2091 bound->low_key, bound_type);
2092 if (unlikely(error))
2093 return error;
2094 }
2095 /* If key is part of upper bound */
2096 if (bound->high_key && j<bound->high_key_count)
2097 {
2098 /* Inclusive if defined, or matching rows can include this value */
2099 bound_type= bound->high_inclusive || j+1 < bound->high_key_count ?
2100 NdbIndexScanOperation::BoundGE : NdbIndexScanOperation::BoundGT;
2101 const int error=
2102 insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2103 bound->high_key, bound_type);
2104 if (unlikely(error))
2105 return error;
2106 }
2107 }
2108 }
2109 }
2110 else
2111 {
2112 /* Open range - all rows must be returned.
2113 * To encode this, we'll request all rows where the first
2114 * key column value is >= NULL
2115 */
2116 AttributeHeader ah(0, 0);
2117 m_keyInfo.append(NdbIndexScanOperation::BoundLE);
2118 m_keyInfo.append(ah.m_value);
2119 }
2120
2121 Uint32 length = m_keyInfo.getSize()-startPos;
2122 if (unlikely(m_keyInfo.isMemoryExhausted())) {
2123 return Err_MemoryAlloc;
2124 } else if (unlikely(length > 0xFFFF)) {
2125 return QRY_DEFINITION_TOO_LARGE; // Query definition too large.
2126 } else if (likely(length > 0)) {
2127 m_keyInfo.put(startPos, m_keyInfo.get(startPos) | (length << 16) | (bound->range_no << 4));
2128 }
2129
2130 #ifdef TRACE_SERIALIZATION
2131 ndbout << "Serialized KEYINFO w/ bounds for indexScan root : ";
2132 for (Uint32 i = startPos; i < m_keyInfo.getSize(); i++) {
2133 char buf[12];
2134 sprintf(buf, "%.8x", m_keyInfo.get(i));
2135 ndbout << buf << " ";
2136 }
2137 ndbout << endl;
2138 #endif
2139
2140 m_num_bounds++;
2141 return 0;
2142 } // NdbQueryImpl::setBound()
2143
2144
2145 Uint32
getNoOfOperations() const2146 NdbQueryImpl::getNoOfOperations() const
2147 {
2148 return m_countOperations;
2149 }
2150
2151 Uint32
getNoOfLeafOperations() const2152 NdbQueryImpl::getNoOfLeafOperations() const
2153 {
2154 return getQueryOperation(Uint32(0)).getNoOfLeafOperations();
2155 }
2156
2157 NdbQueryOperationImpl&
getQueryOperation(Uint32 index) const2158 NdbQueryImpl::getQueryOperation(Uint32 index) const
2159 {
2160 assert(index<m_countOperations);
2161 return m_operations[index];
2162 }
2163
2164 NdbQueryOperationImpl*
getQueryOperation(const char * ident) const2165 NdbQueryImpl::getQueryOperation(const char* ident) const
2166 {
2167 for(Uint32 i = 0; i<m_countOperations; i++){
2168 if(strcmp(m_operations[i].getQueryOperationDef().getName(), ident) == 0){
2169 return &m_operations[i];
2170 }
2171 }
2172 return NULL;
2173 }
2174
2175 Uint32
getNoOfParameters() const2176 NdbQueryImpl::getNoOfParameters() const
2177 {
2178 return 0; // FIXME
2179 }
2180
2181 const NdbParamOperand*
getParameter(const char * name) const2182 NdbQueryImpl::getParameter(const char* name) const
2183 {
2184 return NULL; // FIXME
2185 }
2186
2187 const NdbParamOperand*
getParameter(Uint32 num) const2188 NdbQueryImpl::getParameter(Uint32 num) const
2189 {
2190 return NULL; // FIXME
2191 }
2192
2193 /**
2194 * NdbQueryImpl::nextResult() - The 'global' cursor on the query results
2195 *
2196 * Will itterate and fetch results for all combinations of results from the NdbOperations
2197 * which this query consists of. Except for the root operations which will follow any
2198 * optinal ScanOrdering, we have no control of the ordering which the results from the
2199 * QueryOperations appear in.
2200 */
2201
2202 NdbQuery::NextResultOutcome
nextResult(bool fetchAllowed,bool forceSend)2203 NdbQueryImpl::nextResult(bool fetchAllowed, bool forceSend)
2204 {
2205 if (unlikely(m_state < Executing || m_state >= Closed)) {
2206 assert (m_state >= Initial && m_state < Destructed);
2207 if (m_state == Failed)
2208 setErrorCode(QRY_IN_ERROR_STATE);
2209 else
2210 setErrorCode(QRY_ILLEGAL_STATE);
2211 DEBUG_CRASH();
2212 return NdbQuery::NextResult_error;
2213 }
2214
2215 assert (m_globalCursor < getNoOfOperations());
2216
2217 while (m_state != EndOfData) // Or likely: return when 'gotRow'
2218 {
2219 NdbQuery::NextResultOutcome res =
2220 getQueryOperation(m_globalCursor).nextResult(fetchAllowed,forceSend);
2221
2222 if (unlikely(res == NdbQuery::NextResult_error))
2223 return res;
2224
2225 else if (res == NdbQuery::NextResult_scanComplete)
2226 {
2227 if (m_globalCursor == 0) // Completed reading all results from root
2228 break;
2229 m_globalCursor--; // Get 'next' from ancestor
2230 }
2231
2232 else if (res == NdbQuery::NextResult_gotRow)
2233 {
2234 // Position to 'firstResult()' for all childs.
2235 // Update m_globalCursor to itterate from last operation with results next time
2236 //
2237 for (uint child=m_globalCursor+1; child<getNoOfOperations(); child++)
2238 {
2239 res = getQueryOperation(child).firstResult();
2240 if (unlikely(res == NdbQuery::NextResult_error))
2241 return res;
2242 else if (res == NdbQuery::NextResult_gotRow)
2243 m_globalCursor = child;
2244 }
2245 return NdbQuery::NextResult_gotRow;
2246 }
2247 else
2248 {
2249 assert (res == NdbQuery::NextResult_bufferEmpty);
2250 return res;
2251 }
2252 }
2253
2254 assert (m_state == EndOfData);
2255 return NdbQuery::NextResult_scanComplete;
2256
2257 } //NdbQueryImpl::nextResult()
2258
2259
2260 /**
2261 * Local cursor component which implements the special case of 'next' on the
2262 * root operation of entire NdbQuery. In addition to fetch 'next' result from
2263 * the root operation, we should also retrieve more results from the datanodes
2264 * if required and allowed.
2265 */
2266 NdbQuery::NextResultOutcome
nextRootResult(bool fetchAllowed,bool forceSend)2267 NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend)
2268 {
2269 /* To minimize lock contention, each query has the separate root fragment
2270 * conatiner 'm_applFrags'. m_applFrags is only accessed by the application
2271 * thread, so it is safe to use it without locks.
2272 */
2273 while (m_state != EndOfData) // Or likely: return when 'gotRow' or error
2274 {
2275 const NdbRootFragment* rootFrag = m_applFrags.getCurrent();
2276 if (unlikely(rootFrag==NULL))
2277 {
2278 /* m_applFrags is empty, so we cannot get more results without
2279 * possibly blocking.
2280 *
2281 * ::awaitMoreResults() will either copy fragments that are already
2282 * complete (under mutex protection), or block until data
2283 * previously requested arrives.
2284 */
2285 const FetchResult fetchResult = awaitMoreResults(forceSend);
2286 switch (fetchResult) {
2287
2288 case FetchResult_ok: // OK - got data wo/ error
2289 assert(m_state != Failed);
2290 rootFrag = m_applFrags.getCurrent();
2291 assert (rootFrag!=NULL);
2292 break;
2293
2294 case FetchResult_noMoreData: // No data, no error
2295 assert(m_state != Failed);
2296 assert (m_applFrags.getCurrent()==NULL);
2297 getRoot().nullifyResult();
2298 m_state = EndOfData;
2299 postFetchRelease();
2300 return NdbQuery::NextResult_scanComplete;
2301
2302 case FetchResult_noMoreCache: // No cached data, no error
2303 assert(m_state != Failed);
2304 assert (m_applFrags.getCurrent()==NULL);
2305 getRoot().nullifyResult();
2306 if (fetchAllowed)
2307 {
2308 break; // ::sendFetchMore() may request more results
2309 }
2310 return NdbQuery::NextResult_bufferEmpty;
2311
2312 case FetchResult_gotError: // Error in 'm_error.code'
2313 assert (m_error.code != 0);
2314 return NdbQuery::NextResult_error;
2315
2316 default:
2317 assert(false);
2318 }
2319 }
2320 else
2321 {
2322 rootFrag->getResultStream(0).nextResult(); // Consume current
2323 m_applFrags.reorganize(); // Calculate new current
2324 // Reorg. may update 'current' RootFragment
2325 rootFrag = m_applFrags.getCurrent();
2326 }
2327
2328 /**
2329 * If allowed to request more rows from datanodes, we do this asynch
2330 * and request more rows as soon as we have consumed all rows from a
2331 * fragment. ::awaitMoreResults() may eventually block and wait for these
2332 * when required.
2333 */
2334 if (fetchAllowed)
2335 {
2336 // Ask for a new batch if we emptied some.
2337 NdbRootFragment** frags;
2338 const Uint32 cnt = m_applFrags.getFetchMore(frags);
2339 if (cnt > 0 && sendFetchMore(frags, cnt, forceSend) != 0)
2340 {
2341 return NdbQuery::NextResult_error;
2342 }
2343 }
2344
2345 if (rootFrag!=NULL)
2346 {
2347 getRoot().fetchRow(rootFrag->getResultStream(0));
2348 return NdbQuery::NextResult_gotRow;
2349 }
2350 } // m_state != EndOfData
2351
2352 assert (m_state == EndOfData);
2353 return NdbQuery::NextResult_scanComplete;
2354 } //NdbQueryImpl::nextRootResult()
2355
2356
2357 /**
2358 * Wait for more scan results which already has been REQuested to arrive.
2359 * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
2360 * and 1 of there are no more rows to receive.
2361 */
2362 NdbQueryImpl::FetchResult
awaitMoreResults(bool forceSend)2363 NdbQueryImpl::awaitMoreResults(bool forceSend)
2364 {
2365 assert(m_applFrags.getCurrent() == NULL);
2366
2367 /* Check if there are any more completed fragments available.*/
2368 if (getQueryDef().isScanQuery())
2369 {
2370 assert (m_scanTransaction);
2371 assert (m_state==Executing);
2372
2373 NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
2374 {
2375 /* This part needs to be done under mutex due to synchronization with
2376 * receiver thread.
2377 */
2378 PollGuard poll_guard(*ndb);
2379
2380 /* There may be pending (asynchronous received, mutex protected) errors
2381 * from TC / datanodes. Propogate these into m_error.code in 'API space'.
2382 */
2383 while (likely(!hasReceivedError()))
2384 {
2385 /* Scan m_rootFrags (under mutex protection) for fragments
2386 * which has received a complete batch. Add these to m_applFrags.
2387 */
2388 m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
2389 if (m_applFrags.getCurrent() != NULL)
2390 {
2391 return FetchResult_ok;
2392 }
2393
2394 /* There are no more available fragment results available without
2395 * first waiting for more to be received from the datanodes
2396 */
2397 if (m_pendingFrags == 0)
2398 {
2399 // 'No more *pending* results', ::sendFetchMore() may make more available
2400 return (m_finalBatchFrags < getRootFragCount()) ? FetchResult_noMoreCache
2401 : FetchResult_noMoreData;
2402 }
2403
2404 const Uint32 timeout = ndb->get_waitfor_timeout();
2405 const Uint32 nodeId = m_transaction.getConnectedNodeId();
2406 const Uint32 seq = m_transaction.theNodeSequence;
2407
2408 /* More results are on the way, so we wait for them.*/
2409 const FetchResult waitResult = static_cast<FetchResult>
2410 (poll_guard.wait_scan(3*timeout,
2411 nodeId,
2412 forceSend));
2413
2414 if (ndb->getNodeSequence(nodeId) != seq)
2415 setFetchTerminated(Err_NodeFailCausedAbort,false);
2416 else if (likely(waitResult == FetchResult_ok))
2417 continue;
2418 else if (waitResult == FetchResult_timeOut)
2419 setFetchTerminated(Err_ReceiveTimedOut,false);
2420 else
2421 setFetchTerminated(Err_NodeFailCausedAbort,false);
2422
2423 assert (m_state != Failed);
2424 } // while(!hasReceivedError())
2425 } // Terminates scope of 'PollGuard'
2426
2427 // Fall through only if ::hasReceivedError()
2428 assert (m_error.code);
2429 return FetchResult_gotError;
2430 }
2431 else // is a Lookup query
2432 {
2433 /* The root operation is a lookup. Lookups are guaranteed to be complete
2434 * before NdbTransaction::execute() returns. Therefore we do not set
2435 * the lock, because we know that the signal receiver thread will not
2436 * be accessing m_rootFrags at this time.
2437 */
2438 m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
2439 if (m_applFrags.getCurrent() != NULL)
2440 {
2441 return FetchResult_ok;
2442 }
2443
2444 /* Getting here means that either:
2445 * - No results was returned (TCKEYREF)
2446 * - There was no matching row for an inner join.
2447 * - or, the application called nextResult() twice for a lookup query.
2448 */
2449 assert(m_pendingFrags == 0);
2450 assert(m_finalBatchFrags == getRootFragCount());
2451 return FetchResult_noMoreData;
2452 } // if(getQueryDef().isScanQuery())
2453
2454 } //NdbQueryImpl::awaitMoreResults
2455
2456
2457 /*
2458 ::handleBatchComplete() is intended to be called when receiving signals only.
2459 The PollGuard mutex is then set and the shared 'm_pendingFrags' and
2460 'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled.
2461
2462 returns: 'true' when application thread should be resumed.
2463 */
2464 bool
handleBatchComplete(NdbRootFragment & rootFrag)2465 NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag)
2466 {
2467 if (traceSignals) {
2468 ndbout << "NdbQueryImpl::handleBatchComplete"
2469 << ", fragNo=" << rootFrag.getFragNo()
2470 << ", pendingFrags=" << (m_pendingFrags-1)
2471 << ", finalBatchFrags=" << m_finalBatchFrags
2472 << endl;
2473 }
2474 assert(rootFrag.isFragBatchComplete());
2475
2476 /* May received fragment data after a SCANREF() (timeout?)
2477 * terminated the scan. We are about to close this query,
2478 * and didn't expect any more data - ignore it!
2479 */
2480 if (likely(m_errorReceived == 0))
2481 {
2482 assert(m_pendingFrags > 0); // Check against underflow.
2483 assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
2484 m_pendingFrags--;
2485
2486 if (rootFrag.finalBatchReceived())
2487 {
2488 m_finalBatchFrags++;
2489 assert(m_finalBatchFrags <= m_rootFragCount);
2490 }
2491
2492 /* When application thread ::awaitMoreResults() it will later be
2493 * added to m_applFrags under mutex protection.
2494 */
2495 rootFrag.setReceivedMore();
2496 return true;
2497 }
2498 else if (!getQueryDef().isScanQuery()) // A failed lookup query
2499 {
2500 /**
2501 * A lookup query will retrieve the rows as part of ::execute().
2502 * -> Error must be visible through API before we return control
2503 * to the application.
2504 */
2505 setErrorCode(m_errorReceived);
2506 return true;
2507 }
2508
2509 return false;
2510 } // NdbQueryImpl::handleBatchComplete
2511
2512 int
close(bool forceSend)2513 NdbQueryImpl::close(bool forceSend)
2514 {
2515 int res = 0;
2516
2517 assert (m_state >= Initial && m_state < Destructed);
2518 if (m_state != Closed)
2519 {
2520 if (m_tcState != Inactive)
2521 {
2522 /* We have started a scan, but we have not yet received the last batch
2523 * for all root fragments. We must therefore close the scan to release
2524 * the scan context at TC.*/
2525 res = closeTcCursor(forceSend);
2526 }
2527
2528 // Throw any pending results
2529 NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
2530 m_applFrags.clear();
2531
2532 Ndb* const ndb = m_transaction.getNdb();
2533 if (m_scanTransaction != NULL)
2534 {
2535 assert (m_state != Closed);
2536 assert (m_scanTransaction->m_scanningQuery == this);
2537 m_scanTransaction->m_scanningQuery = NULL;
2538 ndb->closeTransaction(m_scanTransaction);
2539 ndb->theRemainingStartTransactions--; // Compensate; m_scanTransaction was not a real Txn
2540 m_scanTransaction = NULL;
2541 }
2542
2543 postFetchRelease();
2544 m_state = Closed; // Even if it was previously 'Failed' it is closed now!
2545 }
2546
2547 /** BEWARE:
2548 * Don't refer NdbQueryDef or its NdbQueryOperationDefs after ::close()
2549 * as the application is allowed to destruct the Def's after this point.
2550 */
2551 m_queryDef= NULL;
2552
2553 return res;
2554 } //NdbQueryImpl::close
2555
2556
2557 void
release()2558 NdbQueryImpl::release()
2559 {
2560 assert (m_state >= Initial && m_state < Destructed);
2561 if (m_state != Closed) {
2562 close(true); // Ignore any errors, explicit ::close() first if errors are of interest
2563 }
2564
2565 delete this;
2566 }
2567
2568 void
setErrorCode(int aErrorCode)2569 NdbQueryImpl::setErrorCode(int aErrorCode)
2570 {
2571 assert (aErrorCode!=0);
2572 m_error.code = aErrorCode;
2573 m_transaction.theErrorLine = 0;
2574 m_transaction.theErrorOperation = NULL;
2575
2576 switch(aErrorCode)
2577 {
2578 // Not realy an error. A root lookup found no match.
2579 case Err_TupleNotFound:
2580 // Simple or dirty read failed due to node failure. Transaction will be aborted.
2581 case Err_SimpleDirtyReadFailed:
2582 m_transaction.setOperationErrorCode(aErrorCode);
2583 break;
2584
2585 // For any other error, abort the transaction.
2586 default:
2587 m_state = Failed;
2588 m_transaction.setOperationErrorCodeAbort(aErrorCode);
2589 break;
2590 }
2591 }
2592
2593 /*
2594 * ::setFetchTerminated() Should only be called with mutex locked.
2595 * Register result fetching as completed (possibly prematurely, w/ errorCode).
2596 */
2597 void
setFetchTerminated(int errorCode,bool needClose)2598 NdbQueryImpl::setFetchTerminated(int errorCode, bool needClose)
2599 {
2600 assert(m_finalBatchFrags < getRootFragCount());
2601 if (!needClose)
2602 {
2603 m_finalBatchFrags = getRootFragCount();
2604 }
2605 if (errorCode!=0)
2606 {
2607 m_errorReceived = errorCode;
2608 }
2609 m_pendingFrags = 0;
2610 } // NdbQueryImpl::setFetchTerminated()
2611
2612
2613 /* There may be pending (asynchronous received, mutex protected) errors
2614 * from TC / datanodes. Propogate these into 'API space'.
2615 * ::hasReceivedError() Should only be called with mutex locked
2616 */
2617 bool
hasReceivedError()2618 NdbQueryImpl::hasReceivedError()
2619 {
2620 if (unlikely(m_errorReceived))
2621 {
2622 setErrorCode(m_errorReceived);
2623 return true;
2624 }
2625 return false;
2626 } // NdbQueryImpl::hasReceivedError
2627
2628
2629 bool
execTCKEYCONF()2630 NdbQueryImpl::execTCKEYCONF()
2631 {
2632 if (traceSignals) {
2633 ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl;
2634 }
2635 assert(!getQueryDef().isScanQuery());
2636 NdbRootFragment& rootFrag = m_rootFrags[0];
2637
2638 // We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
2639 rootFrag.setConfReceived(RNIL);
2640 rootFrag.incrOutstandingResults(-1);
2641
2642 bool ret = false;
2643 if (rootFrag.isFragBatchComplete())
2644 {
2645 ret = handleBatchComplete(rootFrag);
2646 }
2647
2648 if (traceSignals) {
2649 ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret
2650 << ", m_pendingFrags=" << m_pendingFrags
2651 << ", rootStream= {" << rootFrag.getResultStream(0) << "}"
2652 << endl;
2653 }
2654 return ret;
2655 } // NdbQueryImpl::execTCKEYCONF
2656
2657 void
execCLOSE_SCAN_REP(int errorCode,bool needClose)2658 NdbQueryImpl::execCLOSE_SCAN_REP(int errorCode, bool needClose)
2659 {
2660 if (traceSignals)
2661 {
2662 ndbout << "NdbQueryImpl::execCLOSE_SCAN_REP()" << endl;
2663 }
2664 setFetchTerminated(errorCode,needClose);
2665 }
2666
2667 int
prepareSend()2668 NdbQueryImpl::prepareSend()
2669 {
2670 if (unlikely(m_state != Defined)) {
2671 assert (m_state >= Initial && m_state < Destructed);
2672 if (m_state == Failed)
2673 setErrorCode(QRY_IN_ERROR_STATE);
2674 else
2675 setErrorCode(QRY_ILLEGAL_STATE);
2676 DEBUG_CRASH();
2677 return -1;
2678 }
2679
2680 // Determine execution parameters 'batch size'.
2681 // May be user specified (TODO), and/or, limited/specified by config values
2682 //
2683 if (getQueryDef().isScanQuery())
2684 {
2685 /* For the first batch, we read from all fragments for both ordered
2686 * and unordered scans.*/
2687 if (getQueryOperation(0U).m_parallelism == Parallelism_max)
2688 {
2689 m_rootFragCount
2690 = getRoot().getQueryOperationDef().getTable().getFragmentCount();
2691 }
2692 else
2693 {
2694 assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
2695 m_rootFragCount
2696 = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
2697 getQueryOperation(0U).m_parallelism);
2698 }
2699 Ndb* const ndb = m_transaction.getNdb();
2700
2701 /** Scan operations need a own sub-transaction object associated with each
2702 * query.
2703 */
2704 ndb->theRemainingStartTransactions++; // Compensate; does not start a real Txn
2705 NdbTransaction *scanTxn = ndb->hupp(&m_transaction);
2706 if (scanTxn==NULL) {
2707 ndb->theRemainingStartTransactions--;
2708 m_transaction.setOperationErrorCodeAbort(ndb->getNdbError().code);
2709 return -1;
2710 }
2711 scanTxn->theMagicNumber = 0x37412619;
2712 scanTxn->m_scanningQuery = this;
2713 this->m_scanTransaction = scanTxn;
2714 }
2715 else // Lookup query
2716 {
2717 m_rootFragCount = 1;
2718 }
2719
2720 int error = m_resultStreamAlloc.init(m_rootFragCount * getNoOfOperations());
2721 if (error != 0)
2722 {
2723 setErrorCode(error);
2724 return -1;
2725 }
2726 // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
2727 error = m_pointerAlloc.init(m_rootFragCount *
2728 (OrderedFragSet::pointersPerFragment));
2729 if (error != 0)
2730 {
2731 setErrorCode(error);
2732 return -1;
2733 }
2734
2735 // Some preparation for later batchsize calculations pr. (sub) scan
2736 getRoot().calculateBatchedRows(NULL);
2737 getRoot().setBatchedRows(1);
2738
2739 /**
2740 * Calculate total amount of row buffer space for all operations and
2741 * fragments.
2742 */
2743 Uint32 totalBuffSize = 0;
2744 for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
2745 {
2746 const NdbQueryOperationImpl& op = getQueryOperation(opNo);
2747
2748 // Add space for batchBuffer & m_correlations
2749 Uint32 opBuffSize = op.getBatchBufferSize();
2750 if (getQueryDef().isScanQuery())
2751 {
2752 opBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows());
2753 opBuffSize *= 2; // Scans are double buffered
2754 }
2755 opBuffSize += op.getRowSize(); // Unpacked row from buffers
2756 totalBuffSize += opBuffSize;
2757 }
2758 m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize);
2759
2760 if (getQueryDef().isScanQuery())
2761 {
2762 Uint32 totalRows = 0;
2763 for (Uint32 i = 0; i<getNoOfOperations(); i++)
2764 {
2765 totalRows += getQueryOperation(i).getMaxBatchRows();
2766 }
2767 error = m_tupleSetAlloc.init(2 * m_rootFragCount * totalRows);
2768 if (unlikely(error != 0))
2769 {
2770 setErrorCode(error);
2771 return -1;
2772 }
2773 }
2774
2775 /**
2776 * Allocate and initialize fragment state variables.
2777 * Will also cause a ResultStream object containing a
2778 * NdbReceiver to be constructed for each operation in QueryTree
2779 */
2780 m_rootFrags = new NdbRootFragment[m_rootFragCount];
2781 if (m_rootFrags == NULL)
2782 {
2783 setErrorCode(Err_MemoryAlloc);
2784 return -1;
2785 }
2786 for (Uint32 i = 0; i<m_rootFragCount; i++)
2787 {
2788 m_rootFrags[i].init(*this, i); // Set fragment number.
2789 }
2790
2791 // Fill in parameters (into ATTRINFO) for QueryTree.
2792 for (Uint32 i = 0; i < m_countOperations; i++) {
2793 const int error = m_operations[i].prepareAttrInfo(m_attrInfo);
2794 if (unlikely(error))
2795 {
2796 setErrorCode(error);
2797 return -1;
2798 }
2799 }
2800
2801 if (unlikely(m_attrInfo.isMemoryExhausted() || m_keyInfo.isMemoryExhausted())) {
2802 setErrorCode(Err_MemoryAlloc);
2803 return -1;
2804 }
2805
2806 if (unlikely(m_attrInfo.getSize() > ScanTabReq::MaxTotalAttrInfo ||
2807 m_keyInfo.getSize() > ScanTabReq::MaxTotalAttrInfo)) {
2808 setErrorCode(Err_ReadTooMuch); // TODO: find a more suitable errorcode,
2809 return -1;
2810 }
2811
2812 // Setup m_applStreams and m_fullStreams for receiving results
2813 const NdbRecord* keyRec = NULL;
2814 if(getRoot().getQueryOperationDef().getIndex()!=NULL)
2815 {
2816 /* keyRec is needed for comparing records when doing ordered index scans.*/
2817 keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
2818 assert(keyRec!=NULL);
2819 }
2820 m_applFrags.prepare(m_pointerAlloc,
2821 getRoot().getOrdering(),
2822 m_rootFragCount,
2823 keyRec,
2824 getRoot().m_ndbRecord);
2825
2826 if (getQueryDef().isScanQuery())
2827 {
2828 NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
2829 }
2830
2831 #ifdef TRACE_SERIALIZATION
2832 ndbout << "Serialized ATTRINFO : ";
2833 for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){
2834 char buf[12];
2835 sprintf(buf, "%.8x", m_attrInfo.get(i));
2836 ndbout << buf << " ";
2837 }
2838 ndbout << endl;
2839 #endif
2840
2841 assert (m_pendingFrags==0);
2842 m_state = Prepared;
2843 return 0;
2844 } // NdbQueryImpl::prepareSend
2845
2846
2847
2848 /** This iterator is used for inserting a sequence of receiver ids
2849 * for the initial batch of a scan into a section via a GenericSectionPtr.*/
2850 class InitialReceiverIdIterator: public GenericSectionIterator
2851 {
2852 public:
2853
InitialReceiverIdIterator(NdbRootFragment rootFrags[],Uint32 cnt)2854 InitialReceiverIdIterator(NdbRootFragment rootFrags[],
2855 Uint32 cnt)
2856 :m_rootFrags(rootFrags),
2857 m_fragCount(cnt),
2858 m_currFragNo(0)
2859 {}
2860
~InitialReceiverIdIterator()2861 virtual ~InitialReceiverIdIterator() {};
2862
2863 /**
2864 * Get next batch of receiver ids.
2865 * @param sz This will be set to the number of receiver ids that have been
2866 * put in the buffer (0 if end has been reached.)
2867 * @return Array of receiver ids (or NULL if end reached.
2868 */
2869 virtual const Uint32* getNextWords(Uint32& sz);
2870
reset()2871 virtual void reset()
2872 { m_currFragNo = 0;};
2873
2874 private:
2875 /**
2876 * Size of internal receiver id buffer. This value is arbitrary, but
2877 * a larger buffer would mean fewer calls to getNextWords(), possibly
2878 * improving efficiency.
2879 */
2880 static const Uint32 bufSize = 16;
2881
2882 /** Set of root fragments which we want to itterate receiver ids for.*/
2883 NdbRootFragment* m_rootFrags;
2884 const Uint32 m_fragCount;
2885
2886 /** The next fragment numnber to be processed. (Range for 0 to no of
2887 * fragments.)*/
2888 Uint32 m_currFragNo;
2889 /** Buffer for storing one batch of receiver ids.*/
2890 Uint32 m_receiverIds[bufSize];
2891 };
2892
getNextWords(Uint32 & sz)2893 const Uint32* InitialReceiverIdIterator::getNextWords(Uint32& sz)
2894 {
2895 /**
2896 * For the initial batch, we want to retrieve one batch for each fragment
2897 * whether it is a sorted scan or not.
2898 */
2899 if (m_currFragNo >= m_fragCount)
2900 {
2901 sz = 0;
2902 return NULL;
2903 }
2904 else
2905 {
2906 Uint32 cnt = 0;
2907 while (cnt < bufSize && m_currFragNo < m_fragCount)
2908 {
2909 m_receiverIds[cnt] = m_rootFrags[m_currFragNo].getReceiverId();
2910 cnt++;
2911 m_currFragNo++;
2912 }
2913 sz = cnt;
2914 return m_receiverIds;
2915 }
2916 }
2917
2918 /** This iterator is used for inserting a sequence of 'TcPtrI'
2919 * for a NEXTREQ to a single or multiple fragments via a GenericSectionPtr.*/
2920 class FetchMoreTcIdIterator: public GenericSectionIterator
2921 {
2922 public:
FetchMoreTcIdIterator(NdbRootFragment * rootFrags[],Uint32 cnt)2923 FetchMoreTcIdIterator(NdbRootFragment* rootFrags[],
2924 Uint32 cnt)
2925 :m_rootFrags(rootFrags),
2926 m_fragCount(cnt),
2927 m_currFragNo(0)
2928 {}
2929
~FetchMoreTcIdIterator()2930 virtual ~FetchMoreTcIdIterator() {};
2931
2932 /**
2933 * Get next batch of receiver ids.
2934 * @param sz This will be set to the number of receiver ids that have been
2935 * put in the buffer (0 if end has been reached.)
2936 * @return Array of receiver ids (or NULL if end reached.
2937 */
2938 virtual const Uint32* getNextWords(Uint32& sz);
2939
reset()2940 virtual void reset()
2941 { m_currFragNo = 0;};
2942
2943 private:
2944 /**
2945 * Size of internal receiver id buffer. This value is arbitrary, but
2946 * a larger buffer would mean fewer calls to getNextWords(), possibly
2947 * improving efficiency.
2948 */
2949 static const Uint32 bufSize = 16;
2950
2951 /** Set of root fragments which we want to itterate TcPtrI ids for.*/
2952 NdbRootFragment** m_rootFrags;
2953 const Uint32 m_fragCount;
2954
2955 /** The next fragment numnber to be processed. (Range for 0 to no of
2956 * fragments.)*/
2957 Uint32 m_currFragNo;
2958 /** Buffer for storing one batch of receiver ids.*/
2959 Uint32 m_receiverIds[bufSize];
2960 };
2961
getNextWords(Uint32 & sz)2962 const Uint32* FetchMoreTcIdIterator::getNextWords(Uint32& sz)
2963 {
2964 /**
2965 * For the initial batch, we want to retrieve one batch for each fragment
2966 * whether it is a sorted scan or not.
2967 */
2968 if (m_currFragNo >= m_fragCount)
2969 {
2970 sz = 0;
2971 return NULL;
2972 }
2973 else
2974 {
2975 Uint32 cnt = 0;
2976 while (cnt < bufSize && m_currFragNo < m_fragCount)
2977 {
2978 m_receiverIds[cnt] = m_rootFrags[m_currFragNo]->getReceiverTcPtrI();
2979 cnt++;
2980 m_currFragNo++;
2981 }
2982 sz = cnt;
2983 return m_receiverIds;
2984 }
2985 }
2986
2987 /******************************************************************************
2988 int doSend() Send serialized queryTree and parameters encapsulated in
2989 either a SCAN_TABREQ or TCKEYREQ to TC.
2990
2991 NOTE: The TransporterFacade mutex is already set by callee.
2992
2993 Return Value: Return >0 : send was succesful, returns number of signals sent
2994 Return -1: In all other case.
2995 Parameters: nodeId: Receiving processor node
2996 Remark: Send a TCKEYREQ or SCAN_TABREQ (long) signal depending of
2997 the query being either a lookup or scan type.
2998 KEYINFO and ATTRINFO are included as part of the long signal
2999 ******************************************************************************/
3000 int
doSend(int nodeId,bool lastFlag)3001 NdbQueryImpl::doSend(int nodeId, bool lastFlag)
3002 {
3003 if (unlikely(m_state != Prepared)) {
3004 assert (m_state >= Initial && m_state < Destructed);
3005 if (m_state == Failed)
3006 setErrorCode(QRY_IN_ERROR_STATE);
3007 else
3008 setErrorCode(QRY_ILLEGAL_STATE);
3009 DEBUG_CRASH();
3010 return -1;
3011 }
3012
3013 Ndb& ndb = *m_transaction.getNdb();
3014 NdbImpl * impl = ndb.theImpl;
3015
3016 const NdbQueryOperationImpl& root = getRoot();
3017 const NdbQueryOperationDefImpl& rootDef = root.getQueryOperationDef();
3018 const NdbTableImpl* const rootTable = rootDef.getIndex()
3019 ? rootDef.getIndex()->getIndexTable()
3020 : &rootDef.getTable();
3021
3022 Uint32 tTableId = rootTable->m_id;
3023 Uint32 tSchemaVersion = rootTable->m_version;
3024
3025 for (Uint32 i=0; i<m_rootFragCount; i++)
3026 {
3027 m_rootFrags[i].prepareNextReceiveSet();
3028 }
3029
3030 if (rootDef.isScanOperation())
3031 {
3032 Uint32 scan_flags = 0; // TODO: Specify with ScanOptions::SO_SCANFLAGS
3033
3034 bool tupScan = (scan_flags & NdbScanOperation::SF_TupScan);
3035 bool rangeScan = false;
3036
3037 bool dummy;
3038 const int error = isPrunable(dummy);
3039 if (unlikely(error != 0))
3040 return error;
3041
3042 /* Handle IndexScan specifics */
3043 if ( (int) rootTable->m_indexType ==
3044 (int) NdbDictionary::Index::OrderedIndex )
3045 {
3046 rangeScan = true;
3047 tupScan = false;
3048 }
3049 const Uint32 descending =
3050 root.getOrdering()==NdbQueryOptions::ScanOrdering_descending ? 1 : 0;
3051 assert(descending==0 || (int) rootTable->m_indexType ==
3052 (int) NdbDictionary::Index::OrderedIndex);
3053
3054 assert (root.getMaxBatchRows() > 0);
3055
3056 NdbApiSignal tSignal(&ndb);
3057 tSignal.setSignal(GSN_SCAN_TABREQ, refToBlock(m_scanTransaction->m_tcRef));
3058
3059 ScanTabReq * const scanTabReq = CAST_PTR(ScanTabReq, tSignal.getDataPtrSend());
3060 Uint32 reqInfo = 0;
3061
3062 const Uint64 transId = m_scanTransaction->getTransactionId();
3063
3064 scanTabReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3065 scanTabReq->buddyConPtr = m_scanTransaction->theBuddyConPtr; // 'buddy' refers 'real-transaction'->theTCConPtr
3066 scanTabReq->spare = 0; // Unused in later protocoll versions
3067 scanTabReq->tableId = tTableId;
3068 scanTabReq->tableSchemaVersion = tSchemaVersion;
3069 scanTabReq->storedProcId = 0xFFFF;
3070 scanTabReq->transId1 = (Uint32) transId;
3071 scanTabReq->transId2 = (Uint32) (transId >> 32);
3072
3073 Uint32 batchRows = root.getMaxBatchRows();
3074 Uint32 batchByteSize;
3075 NdbReceiver::calculate_batch_size(* ndb.theImpl,
3076 getRootFragCount(),
3077 batchRows,
3078 batchByteSize);
3079 assert(batchRows==root.getMaxBatchRows());
3080 assert(batchRows<=batchByteSize);
3081
3082 /**
3083 * Check if query is a sorted scan-scan.
3084 * Ordering can then only be guarented by restricting
3085 * parent batch to contain single rows.
3086 * (Child scans will have 'normal' batch size).
3087 */
3088 if (root.getOrdering() != NdbQueryOptions::ScanOrdering_unordered &&
3089 getQueryDef().getQueryType() == NdbQueryDef::MultiScanQuery)
3090 {
3091 batchRows = 1;
3092 }
3093 ScanTabReq::setScanBatch(reqInfo, batchRows);
3094 scanTabReq->batch_byte_size = batchByteSize;
3095 scanTabReq->first_batch_size = batchRows;
3096
3097 ScanTabReq::setViaSPJFlag(reqInfo, 1);
3098 ScanTabReq::setPassAllConfsFlag(reqInfo, 1);
3099
3100 Uint32 nodeVersion = impl->getNodeNdbVersion(nodeId);
3101 if (!ndbd_scan_tabreq_implicit_parallelism(nodeVersion))
3102 {
3103 // Implicit parallelism implies support for greater
3104 // parallelism than storable explicitly in old reqInfo.
3105 Uint32 fragments = getRootFragCount();
3106 if (fragments > PARALLEL_MASK)
3107 {
3108 setErrorCode(Err_SendFailed /* TODO: TooManyFragments, to too old cluster version */);
3109 return -1;
3110 }
3111 ScanTabReq::setParallelism(reqInfo, fragments);
3112 }
3113
3114 ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
3115 ScanTabReq::setDescendingFlag(reqInfo, descending);
3116 ScanTabReq::setTupScanFlag(reqInfo, tupScan);
3117 ScanTabReq::setNoDiskFlag(reqInfo, !root.diskInUserProjection());
3118 ScanTabReq::set4WordConf(reqInfo, 1);
3119
3120 // Assume LockMode LM_ReadCommited, set related lock flags
3121 ScanTabReq::setLockMode(reqInfo, false); // not exclusive
3122 ScanTabReq::setHoldLockFlag(reqInfo, false);
3123 ScanTabReq::setReadCommittedFlag(reqInfo, true);
3124
3125 // m_keyInfo = (scan_flags & NdbScanOperation::SF_KeyInfo) ? 1 : 0;
3126
3127 // If scan is pruned, use optional 'distributionKey' to hold hashvalue
3128 if (m_prunability == Prune_Yes)
3129 {
3130 // printf("Build pruned SCANREQ, w/ hashValue:%d\n", hashValue);
3131 ScanTabReq::setDistributionKeyFlag(reqInfo, 1);
3132 scanTabReq->distributionKey= m_pruneHashVal;
3133 tSignal.setLength(ScanTabReq::StaticLength + 1);
3134 } else {
3135 tSignal.setLength(ScanTabReq::StaticLength);
3136 }
3137 scanTabReq->requestInfo = reqInfo;
3138
3139 /**
3140 * Then send the signal:
3141 *
3142 * SCANTABREQ always has 2 mandatory sections and an optional
3143 * third section
3144 * Section 0 : List of receiver Ids NDBAPI has allocated
3145 * for the scan
3146 * Section 1 : ATTRINFO section
3147 * Section 2 : Optional KEYINFO section
3148 */
3149 GenericSectionPtr secs[3];
3150 InitialReceiverIdIterator receiverIdIter(m_rootFrags, m_rootFragCount);
3151 LinearSectionIterator attrInfoIter(m_attrInfo.addr(), m_attrInfo.getSize());
3152 LinearSectionIterator keyInfoIter(m_keyInfo.addr(), m_keyInfo.getSize());
3153
3154 secs[0].sectionIter= &receiverIdIter;
3155 secs[0].sz= getRootFragCount();
3156
3157 secs[1].sectionIter= &attrInfoIter;
3158 secs[1].sz= m_attrInfo.getSize();
3159
3160 Uint32 numSections= 2;
3161 if (m_keyInfo.getSize() > 0)
3162 {
3163 secs[2].sectionIter= &keyInfoIter;
3164 secs[2].sz= m_keyInfo.getSize();
3165 numSections= 3;
3166 }
3167
3168 /* Send Fragmented as SCAN_TABREQ can be large */
3169 const int res = impl->sendFragmentedSignal(&tSignal, nodeId, secs, numSections);
3170 if (unlikely(res == -1))
3171 {
3172 setErrorCode(Err_SendFailed); // Error: 'Send to NDB failed'
3173 return FetchResult_sendFail;
3174 }
3175 m_tcState = Active;
3176
3177 } else { // Lookup query
3178
3179 NdbApiSignal tSignal(&ndb);
3180 tSignal.setSignal(GSN_TCKEYREQ, refToBlock(m_transaction.m_tcRef));
3181
3182 TcKeyReq * const tcKeyReq = CAST_PTR(TcKeyReq, tSignal.getDataPtrSend());
3183
3184 const Uint64 transId = m_transaction.getTransactionId();
3185 tcKeyReq->apiConnectPtr = m_transaction.theTCConPtr;
3186 tcKeyReq->apiOperationPtr = root.getIdOfReceiver();
3187 tcKeyReq->tableId = tTableId;
3188 tcKeyReq->tableSchemaVersion = tSchemaVersion;
3189 tcKeyReq->transId1 = (Uint32) transId;
3190 tcKeyReq->transId2 = (Uint32) (transId >> 32);
3191
3192 Uint32 attrLen = 0;
3193 tcKeyReq->setAttrinfoLen(attrLen, 0); // Not required for long signals.
3194 tcKeyReq->attrLen = attrLen;
3195
3196 Uint32 reqInfo = 0;
3197 Uint32 interpretedFlag= root.hasInterpretedCode() &&
3198 rootDef.getType() == NdbQueryOperationDef::PrimaryKeyAccess;
3199
3200 TcKeyReq::setOperationType(reqInfo, NdbOperation::ReadRequest);
3201 TcKeyReq::setViaSPJFlag(reqInfo, true);
3202 TcKeyReq::setKeyLength(reqInfo, 0); // This is a long signal
3203 TcKeyReq::setAIInTcKeyReq(reqInfo, 0); // Not needed
3204 TcKeyReq::setInterpretedFlag(reqInfo, interpretedFlag);
3205 TcKeyReq::setStartFlag(reqInfo, m_startIndicator);
3206 TcKeyReq::setExecuteFlag(reqInfo, lastFlag);
3207 TcKeyReq::setNoDiskFlag(reqInfo, !root.diskInUserProjection());
3208 TcKeyReq::setAbortOption(reqInfo, NdbOperation::AO_IgnoreError);
3209
3210 TcKeyReq::setDirtyFlag(reqInfo, true);
3211 TcKeyReq::setSimpleFlag(reqInfo, true);
3212 TcKeyReq::setCommitFlag(reqInfo, m_commitIndicator);
3213 tcKeyReq->requestInfo = reqInfo;
3214
3215 tSignal.setLength(TcKeyReq::StaticLength);
3216
3217 /****
3218 // Unused optional part located after TcKeyReq::StaticLength
3219 tcKeyReq->scanInfo = 0;
3220 tcKeyReq->distrGroupHashValue = 0;
3221 tcKeyReq->distributionKeySize = 0;
3222 tcKeyReq->storedProcId = 0xFFFF;
3223 ***/
3224
3225 /**** TODO ... maybe - from NdbOperation::prepareSendNdbRecord(AbortOption ao)
3226 Uint8 abortOption= (ao == DefaultAbortOption) ?
3227 (Uint8) m_abortOption : (Uint8) ao;
3228
3229 m_abortOption= theSimpleIndicator && theOperationType==ReadRequest ?
3230 (Uint8) AO_IgnoreError : (Uint8) abortOption;
3231
3232 TcKeyReq::setAbortOption(reqInfo, m_abortOption);
3233 TcKeyReq::setCommitFlag(tcKeyReq->requestInfo, theCommitIndicator);
3234 *****/
3235
3236 LinearSectionPtr secs[2];
3237 secs[TcKeyReq::KeyInfoSectionNum].p= m_keyInfo.addr();
3238 secs[TcKeyReq::KeyInfoSectionNum].sz= m_keyInfo.getSize();
3239 Uint32 numSections= 1;
3240
3241 if (m_attrInfo.getSize() > 0)
3242 {
3243 secs[TcKeyReq::AttrInfoSectionNum].p= m_attrInfo.addr();
3244 secs[TcKeyReq::AttrInfoSectionNum].sz= m_attrInfo.getSize();
3245 numSections= 2;
3246 }
3247
3248 const int res = impl->sendSignal(&tSignal, nodeId, secs, numSections);
3249 if (unlikely(res == -1))
3250 {
3251 setErrorCode(Err_SendFailed); // Error: 'Send to NDB failed'
3252 return FetchResult_sendFail;
3253 }
3254 m_transaction.OpSent();
3255 m_rootFrags[0].incrOutstandingResults(1 + getNoOfOperations() +
3256 getNoOfLeafOperations());
3257 } // if
3258
3259 assert (m_pendingFrags==0);
3260 m_pendingFrags = m_rootFragCount;
3261
3262 // Shrink memory footprint by removing structures not required after ::execute()
3263 m_keyInfo.releaseExtend();
3264 m_attrInfo.releaseExtend();
3265
3266 // TODO: Release m_interpretedCode now?
3267
3268 /* Todo : Consider calling NdbOperation::postExecuteRelease()
3269 * Ideally it should be called outside TP mutex, so not added
3270 * here yet
3271 */
3272
3273 m_state = Executing;
3274 return 1;
3275 } // NdbQueryImpl::doSend()
3276
3277
3278 /******************************************************************************
3279 int sendFetchMore() - Fetch another scan batch, optionaly closing the scan
3280
3281 Request another batch of rows to be retrieved from the scan.
3282
3283 Return Value: 0 if send succeeded, -1 otherwise.
3284 Parameters: emptyFrag: Root frgament for which to ask for another batch.
3285 Remark:
3286 ******************************************************************************/
3287 int
sendFetchMore(NdbRootFragment * rootFrags[],Uint32 cnt,bool forceSend)3288 NdbQueryImpl::sendFetchMore(NdbRootFragment* rootFrags[],
3289 Uint32 cnt,
3290 bool forceSend)
3291 {
3292 assert(getQueryDef().isScanQuery());
3293
3294 for (Uint32 i=0; i<cnt; i++)
3295 {
3296 NdbRootFragment* rootFrag = rootFrags[i];
3297 assert(rootFrag->isFragBatchComplete());
3298 assert(!rootFrag->finalBatchReceived());
3299 rootFrag->prepareNextReceiveSet();
3300 }
3301
3302 Ndb& ndb = *getNdbTransaction().getNdb();
3303 NdbApiSignal tSignal(&ndb);
3304 tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(m_scanTransaction->m_tcRef));
3305 ScanNextReq * const scanNextReq =
3306 CAST_PTR(ScanNextReq, tSignal.getDataPtrSend());
3307
3308 assert (m_scanTransaction);
3309 const Uint64 transId = m_scanTransaction->getTransactionId();
3310
3311 scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3312 scanNextReq->stopScan = 0;
3313 scanNextReq->transId1 = (Uint32) transId;
3314 scanNextReq->transId2 = (Uint32) (transId >> 32);
3315 tSignal.setLength(ScanNextReq::SignalLength);
3316
3317 FetchMoreTcIdIterator receiverIdIter(rootFrags, cnt);
3318
3319 GenericSectionPtr secs[1];
3320 secs[ScanNextReq::ReceiverIdsSectionNum].sectionIter = &receiverIdIter;
3321 secs[ScanNextReq::ReceiverIdsSectionNum].sz = cnt;
3322
3323 NdbImpl * impl = ndb.theImpl;
3324 Uint32 nodeId = m_transaction.getConnectedNodeId();
3325 Uint32 seq = m_transaction.theNodeSequence;
3326
3327 /* This part needs to be done under mutex due to synchronization with
3328 * receiver thread.
3329 */
3330 PollGuard poll_guard(* impl);
3331
3332 if (unlikely(hasReceivedError()))
3333 {
3334 // Errors arrived inbetween ::await released mutex, and sendFetchMore grabbed it
3335 return -1;
3336 }
3337 if (impl->getNodeSequence(nodeId) != seq ||
3338 impl->sendSignal(&tSignal, nodeId, secs, 1) != 0)
3339 {
3340 setErrorCode(Err_NodeFailCausedAbort);
3341 return -1;
3342 }
3343 impl->do_forceSend(forceSend);
3344
3345 m_pendingFrags += cnt;
3346 assert(m_pendingFrags <= getRootFragCount());
3347
3348 return 0;
3349 } // NdbQueryImpl::sendFetchMore()
3350
3351 int
closeTcCursor(bool forceSend)3352 NdbQueryImpl::closeTcCursor(bool forceSend)
3353 {
3354 assert (getQueryDef().isScanQuery());
3355
3356 NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
3357 const Uint32 timeout = ndb->get_waitfor_timeout();
3358 const Uint32 nodeId = m_transaction.getConnectedNodeId();
3359 const Uint32 seq = m_transaction.theNodeSequence;
3360
3361 /* This part needs to be done under mutex due to synchronization with
3362 * receiver thread.
3363 */
3364 PollGuard poll_guard(*ndb);
3365
3366 if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3367 {
3368 setErrorCode(Err_NodeFailCausedAbort);
3369 return -1; // Transporter disconnected and reconnected, no need to close
3370 }
3371
3372 /* Wait for outstanding scan results from current batch fetch */
3373 while (m_pendingFrags > 0)
3374 {
3375 const FetchResult result = static_cast<FetchResult>
3376 (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
3377
3378 if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3379 setFetchTerminated(Err_NodeFailCausedAbort,false);
3380 else if (unlikely(result != FetchResult_ok))
3381 {
3382 if (result == FetchResult_timeOut)
3383 setFetchTerminated(Err_ReceiveTimedOut,false);
3384 else
3385 setFetchTerminated(Err_NodeFailCausedAbort,false);
3386 }
3387 if (hasReceivedError())
3388 {
3389 break;
3390 }
3391 } // while
3392
3393 assert(m_pendingFrags==0);
3394 NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
3395 m_errorReceived = 0; // Clear errors caused by previous fetching
3396 m_error.code = 0;
3397
3398 if (m_finalBatchFrags < getRootFragCount()) // TC has an open scan cursor.
3399 {
3400 /* Send SCAN_NEXTREQ(close) */
3401 const int error = sendClose(m_transaction.getConnectedNodeId());
3402 if (unlikely(error))
3403 return error;
3404
3405 assert(m_finalBatchFrags+m_pendingFrags==getRootFragCount());
3406
3407 /* Wait for close to be confirmed: */
3408 while (m_pendingFrags > 0)
3409 {
3410 const FetchResult result = static_cast<FetchResult>
3411 (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
3412
3413 if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3414 setFetchTerminated(Err_NodeFailCausedAbort,false);
3415 else if (unlikely(result != FetchResult_ok))
3416 {
3417 if (result == FetchResult_timeOut)
3418 setFetchTerminated(Err_ReceiveTimedOut,false);
3419 else
3420 setFetchTerminated(Err_NodeFailCausedAbort,false);
3421 }
3422 if (hasReceivedError())
3423 {
3424 break;
3425 }
3426 } // while
3427 } // if
3428
3429 return 0;
3430 } //NdbQueryImpl::closeTcCursor
3431
3432
3433 /*
3434 * This method is called with the PollGuard mutex held on the transporter.
3435 */
3436 int
sendClose(int nodeId)3437 NdbQueryImpl::sendClose(int nodeId)
3438 {
3439 assert(m_finalBatchFrags < getRootFragCount());
3440 m_pendingFrags = getRootFragCount() - m_finalBatchFrags;
3441
3442 Ndb& ndb = *m_transaction.getNdb();
3443 NdbApiSignal tSignal(&ndb);
3444 tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(m_scanTransaction->m_tcRef));
3445 ScanNextReq * const scanNextReq = CAST_PTR(ScanNextReq, tSignal.getDataPtrSend());
3446
3447 assert (m_scanTransaction);
3448 const Uint64 transId = m_scanTransaction->getTransactionId();
3449
3450 scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3451 scanNextReq->stopScan = true;
3452 scanNextReq->transId1 = (Uint32) transId;
3453 scanNextReq->transId2 = (Uint32) (transId >> 32);
3454 tSignal.setLength(ScanNextReq::SignalLength);
3455
3456 NdbImpl * impl = ndb.theImpl;
3457 return impl->sendSignal(&tSignal, nodeId);
3458
3459 } // NdbQueryImpl::sendClose()
3460
3461
isPrunable(bool & prunable)3462 int NdbQueryImpl::isPrunable(bool& prunable)
3463 {
3464 if (m_prunability == Prune_Unknown)
3465 {
3466 const int error = getRoot().getQueryOperationDef()
3467 .checkPrunable(m_keyInfo, m_shortestBound, prunable, m_pruneHashVal);
3468 if (unlikely(error != 0))
3469 {
3470 prunable = false;
3471 setErrorCode(error);
3472 return -1;
3473 }
3474 m_prunability = prunable ? Prune_Yes : Prune_No;
3475 }
3476 prunable = (m_prunability == Prune_Yes);
3477 return 0;
3478 }
3479
3480
3481 /****************
3482 * NdbQueryImpl::OrderedFragSet methods.
3483 ***************/
3484
OrderedFragSet()3485 NdbQueryImpl::OrderedFragSet::OrderedFragSet():
3486 m_capacity(0),
3487 m_activeFragCount(0),
3488 m_fetchMoreFragCount(0),
3489 m_finalFragReceivedCount(0),
3490 m_finalFragConsumedCount(0),
3491 m_ordering(NdbQueryOptions::ScanOrdering_void),
3492 m_keyRecord(NULL),
3493 m_resultRecord(NULL),
3494 m_activeFrags(NULL),
3495 m_fetchMoreFrags(NULL)
3496 {
3497 }
3498
~OrderedFragSet()3499 NdbQueryImpl::OrderedFragSet::~OrderedFragSet()
3500 {
3501 m_activeFrags = NULL;
3502 m_fetchMoreFrags = NULL;
3503 }
3504
clear()3505 void NdbQueryImpl::OrderedFragSet::clear()
3506 {
3507 m_activeFragCount = 0;
3508 m_fetchMoreFragCount = 0;
3509 }
3510
3511 void
prepare(NdbBulkAllocator & allocator,NdbQueryOptions::ScanOrdering ordering,int capacity,const NdbRecord * keyRecord,const NdbRecord * resultRecord)3512 NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
3513 NdbQueryOptions::ScanOrdering ordering,
3514 int capacity,
3515 const NdbRecord* keyRecord,
3516 const NdbRecord* resultRecord)
3517 {
3518 assert(m_activeFrags==NULL);
3519 assert(m_capacity==0);
3520 assert(ordering!=NdbQueryOptions::ScanOrdering_void);
3521
3522 if (capacity > 0)
3523 {
3524 m_capacity = capacity;
3525
3526 m_activeFrags =
3527 reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
3528 bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
3529
3530 m_fetchMoreFrags =
3531 reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
3532 bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*));
3533 }
3534 m_ordering = ordering;
3535 m_keyRecord = keyRecord;
3536 m_resultRecord = resultRecord;
3537 } // OrderedFragSet::prepare()
3538
3539
3540 /**
3541 * Get current RootFragment which to return results from.
3542 * Logic relies on that ::reorganize() is called whenever the current
3543 * RootFragment is advanced to next result. This will eliminate
3544 * empty RootFragments from the OrderedFragSet object
3545 *
3546 */
3547 NdbRootFragment*
getCurrent() const3548 NdbQueryImpl::OrderedFragSet::getCurrent() const
3549 {
3550 if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
3551 {
3552 /**
3553 * Must have tuples for each (non-completed) fragment when doing ordered
3554 * scan.
3555 */
3556 if (unlikely(m_activeFragCount+m_finalFragConsumedCount < m_capacity))
3557 {
3558 return NULL;
3559 }
3560 }
3561
3562 if (unlikely(m_activeFragCount==0))
3563 {
3564 return NULL;
3565 }
3566 else
3567 {
3568 assert(!m_activeFrags[m_activeFragCount-1]->isEmpty());
3569 return m_activeFrags[m_activeFragCount-1];
3570 }
3571 } // OrderedFragSet::getCurrent()
3572
3573 /**
3574 * Keep the FragSet ordered, both with respect to specified ScanOrdering, and
3575 * such that RootFragments which becomes empty are removed from
3576 * m_activeFrags[].
3577 * Thus, ::getCurrent() should be as lightweight as possible and only has
3578 * to return the 'next' available from array wo/ doing any housekeeping.
3579 */
3580 void
reorganize()3581 NdbQueryImpl::OrderedFragSet::reorganize()
3582 {
3583 assert(m_activeFragCount > 0);
3584 NdbRootFragment* const frag = m_activeFrags[m_activeFragCount-1];
3585
3586 // Remove the current fragment if the batch has been emptied.
3587 if (frag->isEmpty())
3588 {
3589 /**
3590 * MT-note: Although ::finalBatchReceived() normally requires mutex,
3591 * its safe to call it here wo/ mutex as:
3592 *
3593 * - 'not hasRequestedMore()' guaranty that there can't be any
3594 * receiver thread simultaneously accessing the mutex protected members.
3595 * - As this fragment has already been added to (the mutex protected)
3596 * class OrderedFragSet, we know that the mutex has been
3597 * previously set for this 'frag'. This would have resolved
3598 * any cache coherency problems related to mt'ed access to
3599 * 'frag->finalBatchReceived()'.
3600 */
3601 if (!frag->hasRequestedMore() && frag->finalBatchReceived())
3602 {
3603 assert(m_finalFragReceivedCount > m_finalFragConsumedCount);
3604 m_finalFragConsumedCount++;
3605 }
3606
3607 /**
3608 * Without doublebuffering we can't 'fetchMore' for fragments until
3609 * the current ResultSet has been consumed bu application.
3610 * (Compared to how ::prepareMoreResults() immediately 'fetchMore')
3611 */
3612 else if (!useDoubleBuffers)
3613 {
3614 m_fetchMoreFrags[m_fetchMoreFragCount++] = frag;
3615 }
3616 m_activeFragCount--;
3617 }
3618
3619 // Reorder fragments if add'ed nonEmpty fragment to a sorted scan.
3620 else if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
3621 {
3622 /**
3623 * This is a sorted scan. There are more data to be read from
3624 * m_activeFrags[m_activeFragCount-1]. Move it to its proper place.
3625 *
3626 * Use binary search to find the largest record that is smaller than or
3627 * equal to m_activeFrags[m_activeFragCount-1].
3628 */
3629 int first = 0;
3630 int last = m_activeFragCount-1;
3631 int middle = (first+last)/2;
3632
3633 while (first<last)
3634 {
3635 assert(middle<m_activeFragCount);
3636 const int cmpRes = compare(*frag, *m_activeFrags[middle]);
3637 if (cmpRes < 0)
3638 {
3639 first = middle + 1;
3640 }
3641 else if (cmpRes == 0)
3642 {
3643 last = first = middle;
3644 }
3645 else
3646 {
3647 last = middle;
3648 }
3649 middle = (first+last)/2;
3650 }
3651
3652 // Move into correct sorted position
3653 if (middle < m_activeFragCount-1)
3654 {
3655 assert(compare(*frag, *m_activeFrags[middle]) >= 0);
3656 memmove(m_activeFrags+middle+1,
3657 m_activeFrags+middle,
3658 (m_activeFragCount - middle - 1) * sizeof(NdbRootFragment*));
3659 m_activeFrags[middle] = frag;
3660 }
3661 assert(verifySortOrder());
3662 }
3663 assert(m_activeFragCount+m_finalFragConsumedCount <= m_capacity);
3664 assert(m_fetchMoreFragCount+m_finalFragReceivedCount <= m_capacity);
3665 } // OrderedFragSet::reorganize()
3666
3667 void
add(NdbRootFragment & frag)3668 NdbQueryImpl::OrderedFragSet::add(NdbRootFragment& frag)
3669 {
3670 assert(m_activeFragCount+m_finalFragConsumedCount < m_capacity);
3671
3672 m_activeFrags[m_activeFragCount++] = &frag; // Add avail fragment
3673 reorganize(); // Move into position
3674 } // OrderedFragSet::add()
3675
3676 /**
3677 * Scan rootFrags[] for fragments which has received a ResultSet batch.
3678 * Add these to m_applFrags (Require mutex protection)
3679 */
3680 void
prepareMoreResults(NdbRootFragment rootFrags[],Uint32 cnt)3681 NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt)
3682 {
3683 for (Uint32 fragNo = 0; fragNo < cnt; fragNo++)
3684 {
3685 NdbRootFragment& rootFrag = rootFrags[fragNo];
3686 if (rootFrag.isEmpty() && // Current ResultSet is empty
3687 rootFrag.hasReceivedMore()) // Another ResultSet is available
3688 {
3689 if (rootFrag.finalBatchReceived())
3690 {
3691 m_finalFragReceivedCount++;
3692 }
3693 /**
3694 * When doublebuffered fetch is active:
3695 * Received fragment is a candidates for immediate prefetch.
3696 */
3697 else if (useDoubleBuffers)
3698 {
3699 m_fetchMoreFrags[m_fetchMoreFragCount++] = &rootFrag;
3700 } // useDoubleBuffers
3701
3702 rootFrag.grabNextResultSet(); // Get new ResultSet.
3703 add(rootFrag); // Make avail. to appl. thread
3704 }
3705 } // for all 'rootFrags[]'
3706
3707 assert(m_activeFragCount+m_finalFragConsumedCount <= m_capacity);
3708 assert(m_fetchMoreFragCount+m_finalFragReceivedCount <= m_capacity);
3709 } // OrderedFragSet::prepareMoreResults()
3710
3711 /**
3712 * Determine if a ::sendFetchMore() should be requested at this point.
3713 */
3714 Uint32
getFetchMore(NdbRootFragment ** & frags)3715 NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
3716 {
3717 /**
3718 * Decides (pre-)fetch strategy:
3719 *
3720 * 1) No doublebuffered ResultSets: Immediately request prefetch.
3721 * (This is fetches related to 'isEmpty' fragments)
3722 * 2) If ordered ResultSets; Immediately request prefetch.
3723 * (Need rows from all fragments to do sort-merge)
3724 * 3) When unordered, reduce #NEXTREQs to TC by avoid prefetch
3725 * until there are pending request to all datanodes having more
3726 * ResultSets
3727 */
3728 if (m_fetchMoreFragCount > 0 &&
3729 (!useDoubleBuffers || // 1)
3730 m_ordering != NdbQueryOptions::ScanOrdering_unordered || // 2)
3731 m_fetchMoreFragCount+m_finalFragReceivedCount >= m_capacity)) // 3)
3732 {
3733 const int cnt = m_fetchMoreFragCount;
3734 frags = m_fetchMoreFrags;
3735 m_fetchMoreFragCount = 0;
3736 return cnt;
3737 }
3738 return 0;
3739 }
3740
3741 bool
verifySortOrder() const3742 NdbQueryImpl::OrderedFragSet::verifySortOrder() const
3743 {
3744 for (int i = 0; i<m_activeFragCount-1; i++)
3745 {
3746 if (compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
3747 {
3748 assert(false);
3749 return false;
3750 }
3751 }
3752 return true;
3753 }
3754
3755 /**
3756 * Compare frags such that f1<f2 if f1 is empty but f2 is not.
3757 * - Othewise compare record contents.
3758 * @return negative if frag1<frag2, 0 if frag1 == frag2, otherwise positive.
3759 */
3760 int
compare(const NdbRootFragment & frag1,const NdbRootFragment & frag2) const3761 NdbQueryImpl::OrderedFragSet::compare(const NdbRootFragment& frag1,
3762 const NdbRootFragment& frag2) const
3763 {
3764 assert(m_ordering!=NdbQueryOptions::ScanOrdering_unordered);
3765
3766 /* f1<f2 if f1 is empty but f2 is not.*/
3767 if(frag1.isEmpty())
3768 {
3769 if(!frag2.isEmpty())
3770 {
3771 return -1;
3772 }
3773 else
3774 {
3775 return 0;
3776 }
3777 }
3778
3779 /* Neither stream is empty so we must compare records.*/
3780 return compare_ndbrecord(&frag1.getResultStream(0).getReceiver(),
3781 &frag2.getResultStream(0).getReceiver(),
3782 m_keyRecord,
3783 m_resultRecord,
3784 m_ordering
3785 == NdbQueryOptions::ScanOrdering_descending,
3786 false);
3787 }
3788
3789
3790
3791 ////////////////////////////////////////////////////
3792 ///////// NdbQueryOperationImpl methods ///////////
3793 ////////////////////////////////////////////////////
3794
NdbQueryOperationImpl(NdbQueryImpl & queryImpl,const NdbQueryOperationDefImpl & def)3795 NdbQueryOperationImpl::NdbQueryOperationImpl(
3796 NdbQueryImpl& queryImpl,
3797 const NdbQueryOperationDefImpl& def):
3798 m_interface(*this),
3799 m_magic(MAGIC),
3800 m_queryImpl(queryImpl),
3801 m_operationDef(def),
3802 m_parent(NULL),
3803 m_children(0),
3804 m_maxBatchRows(0), // >0: User specified prefered value, ==0: Use default CFG values
3805 m_params(),
3806 m_resultBuffer(NULL),
3807 m_resultRef(NULL),
3808 m_isRowNull(true),
3809 m_ndbRecord(NULL),
3810 m_read_mask(NULL),
3811 m_firstRecAttr(NULL),
3812 m_lastRecAttr(NULL),
3813 m_ordering(NdbQueryOptions::ScanOrdering_unordered),
3814 m_interpretedCode(NULL),
3815 m_diskInUserProjection(false),
3816 m_parallelism(def.getOpNo() == 0
3817 ? Parallelism_max : Parallelism_adaptive),
3818 m_rowSize(0xffffffff),
3819 m_batchBufferSize(0xffffffff)
3820 {
3821 if (m_children.expand(def.getNoOfChildOperations()))
3822 {
3823 // Memory allocation during Vector::expand() failed.
3824 queryImpl.setErrorCode(Err_MemoryAlloc);
3825 return;
3826 }
3827 // Fill in operations parent refs, and append it as child of its parent
3828 const NdbQueryOperationDefImpl* parent = def.getParentOperation();
3829 if (parent != NULL)
3830 {
3831 const Uint32 ix = parent->getOpNo();
3832 assert (ix < m_queryImpl.getNoOfOperations());
3833 m_parent = &m_queryImpl.getQueryOperation(ix);
3834 const int res = m_parent->m_children.push_back(this);
3835 UNUSED(res);
3836 /**
3837 Enough memory should have been allocated when creating
3838 m_parent->m_children, so res!=0 should never happen.
3839 */
3840 assert(res == 0);
3841 }
3842 if (def.getType()==NdbQueryOperationDef::OrderedIndexScan)
3843 {
3844 const NdbQueryOptions::ScanOrdering defOrdering =
3845 static_cast<const NdbQueryIndexScanOperationDefImpl&>(def).getOrdering();
3846 if (defOrdering != NdbQueryOptions::ScanOrdering_void)
3847 {
3848 // Use value from definition, if one was set.
3849 m_ordering = defOrdering;
3850 }
3851 }
3852 }
3853
~NdbQueryOperationImpl()3854 NdbQueryOperationImpl::~NdbQueryOperationImpl()
3855 {
3856 /**
3857 * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
3858 * Either by fetching through last row, or calling ::close() which forcefully terminates fetch
3859 */
3860 assert (m_firstRecAttr == NULL);
3861 assert (m_interpretedCode == NULL);
3862 } //NdbQueryOperationImpl::~NdbQueryOperationImpl()
3863
3864 /**
3865 * Release what we want need anymore after last available row has been
3866 * returned from datanodes.
3867 */
3868 void
postFetchRelease()3869 NdbQueryOperationImpl::postFetchRelease()
3870 {
3871 Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
3872 NdbRecAttr* recAttr = m_firstRecAttr;
3873 while (recAttr != NULL) {
3874 NdbRecAttr* saveRecAttr = recAttr;
3875 recAttr = recAttr->next();
3876 ndb->releaseRecAttr(saveRecAttr);
3877 }
3878 m_firstRecAttr = NULL;
3879
3880 // Set API exposed info to indicate NULL-row
3881 m_isRowNull = true;
3882 if (m_resultRef!=NULL) {
3883 *m_resultRef = NULL;
3884 }
3885
3886 // TODO: Consider if interpretedCode can be deleted imm. after ::doSend
3887 delete m_interpretedCode;
3888 m_interpretedCode = NULL;
3889 } //NdbQueryOperationImpl::postFetchRelease()
3890
3891
3892 Uint32
getNoOfParentOperations() const3893 NdbQueryOperationImpl::getNoOfParentOperations() const
3894 {
3895 return (m_parent) ? 1 : 0;
3896 }
3897
3898 NdbQueryOperationImpl&
getParentOperation(Uint32 i) const3899 NdbQueryOperationImpl::getParentOperation(Uint32 i) const
3900 {
3901 assert(i==0 && m_parent!=NULL);
3902 return *m_parent;
3903 }
3904 NdbQueryOperationImpl*
getParentOperation() const3905 NdbQueryOperationImpl::getParentOperation() const
3906 {
3907 return m_parent;
3908 }
3909
3910 Uint32
getNoOfChildOperations() const3911 NdbQueryOperationImpl::getNoOfChildOperations() const
3912 {
3913 return m_children.size();
3914 }
3915
3916 NdbQueryOperationImpl&
getChildOperation(Uint32 i) const3917 NdbQueryOperationImpl::getChildOperation(Uint32 i) const
3918 {
3919 return *m_children[i];
3920 }
3921
getNoOfDescendantOperations() const3922 Int32 NdbQueryOperationImpl::getNoOfDescendantOperations() const
3923 {
3924 Int32 children = 0;
3925
3926 for (unsigned i = 0; i < getNoOfChildOperations(); i++)
3927 children += 1 + getChildOperation(i).getNoOfDescendantOperations();
3928
3929 return children;
3930 }
3931
3932 Uint32
getNoOfLeafOperations() const3933 NdbQueryOperationImpl::getNoOfLeafOperations() const
3934 {
3935 if (getNoOfChildOperations() == 0)
3936 {
3937 return 1;
3938 }
3939 else
3940 {
3941 Uint32 sum = 0;
3942 for (unsigned i = 0; i < getNoOfChildOperations(); i++)
3943 sum += getChildOperation(i).getNoOfLeafOperations();
3944
3945 return sum;
3946 }
3947 }
3948
3949 NdbRecAttr*
getValue(const char * anAttrName,char * resultBuffer)3950 NdbQueryOperationImpl::getValue(
3951 const char* anAttrName,
3952 char* resultBuffer)
3953 {
3954 if (unlikely(anAttrName == NULL)) {
3955 getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
3956 return NULL;
3957 }
3958 const NdbColumnImpl* const column
3959 = m_operationDef.getTable().getColumn(anAttrName);
3960 if(unlikely(column==NULL)){
3961 getQuery().setErrorCode(Err_UnknownColumn);
3962 return NULL;
3963 } else {
3964 return getValue(*column, resultBuffer);
3965 }
3966 }
3967
3968 NdbRecAttr*
getValue(Uint32 anAttrId,char * resultBuffer)3969 NdbQueryOperationImpl::getValue(
3970 Uint32 anAttrId,
3971 char* resultBuffer)
3972 {
3973 const NdbColumnImpl* const column
3974 = m_operationDef.getTable().getColumn(anAttrId);
3975 if(unlikely(column==NULL)){
3976 getQuery().setErrorCode(Err_UnknownColumn);
3977 return NULL;
3978 } else {
3979 return getValue(*column, resultBuffer);
3980 }
3981 }
3982
3983 NdbRecAttr*
getValue(const NdbColumnImpl & column,char * resultBuffer)3984 NdbQueryOperationImpl::getValue(
3985 const NdbColumnImpl& column,
3986 char* resultBuffer)
3987 {
3988 if (unlikely(getQuery().m_state != NdbQueryImpl::Defined)) {
3989 int state = getQuery().m_state;
3990 assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
3991
3992 if (state == NdbQueryImpl::Failed)
3993 getQuery().setErrorCode(QRY_IN_ERROR_STATE);
3994 else
3995 getQuery().setErrorCode(QRY_ILLEGAL_STATE);
3996 DEBUG_CRASH();
3997 return NULL;
3998 }
3999 Ndb* const ndb = getQuery().getNdbTransaction().getNdb();
4000 NdbRecAttr* const recAttr = ndb->getRecAttr();
4001 if(unlikely(recAttr == NULL)) {
4002 getQuery().setErrorCode(Err_MemoryAlloc);
4003 return NULL;
4004 }
4005 if(unlikely(recAttr->setup(&column, resultBuffer))) {
4006 ndb->releaseRecAttr(recAttr);
4007 getQuery().setErrorCode(Err_MemoryAlloc);
4008 return NULL;
4009 }
4010 // Append to tail of list.
4011 if(m_firstRecAttr == NULL){
4012 m_firstRecAttr = recAttr;
4013 }else{
4014 m_lastRecAttr->next(recAttr);
4015 }
4016 m_lastRecAttr = recAttr;
4017 assert(recAttr->next()==NULL);
4018 return recAttr;
4019 }
4020
4021 int
setResultRowBuf(const NdbRecord * rec,char * resBuffer,const unsigned char * result_mask)4022 NdbQueryOperationImpl::setResultRowBuf (
4023 const NdbRecord *rec,
4024 char* resBuffer,
4025 const unsigned char* result_mask)
4026 {
4027 if (unlikely(rec==0)) {
4028 getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
4029 return -1;
4030 }
4031 if (unlikely(getQuery().m_state != NdbQueryImpl::Defined)) {
4032 int state = getQuery().m_state;
4033 assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
4034
4035 if (state == NdbQueryImpl::Failed)
4036 getQuery().setErrorCode(QRY_IN_ERROR_STATE);
4037 else
4038 getQuery().setErrorCode(QRY_ILLEGAL_STATE);
4039 DEBUG_CRASH();
4040 return -1;
4041 }
4042 if (rec->tableId !=
4043 static_cast<Uint32>(m_operationDef.getTable().getTableId())){
4044 /* The key_record and attribute_record in primary key operation do not
4045 belong to the same table.*/
4046 getQuery().setErrorCode(Err_DifferentTabForKeyRecAndAttrRec);
4047 return -1;
4048 }
4049 if (unlikely(m_ndbRecord != NULL)) {
4050 getQuery().setErrorCode(QRY_RESULT_ROW_ALREADY_DEFINED);
4051 return -1;
4052 }
4053 m_ndbRecord = rec;
4054 m_read_mask = result_mask;
4055 m_resultBuffer = resBuffer;
4056 return 0;
4057 }
4058
4059 int
setResultRowRef(const NdbRecord * rec,const char * & bufRef,const unsigned char * result_mask)4060 NdbQueryOperationImpl::setResultRowRef (
4061 const NdbRecord* rec,
4062 const char* & bufRef,
4063 const unsigned char* result_mask)
4064 {
4065 m_resultRef = &bufRef;
4066 *m_resultRef = NULL; // No result row yet
4067 return setResultRowBuf(rec, NULL, result_mask);
4068 }
4069
4070 NdbQuery::NextResultOutcome
firstResult()4071 NdbQueryOperationImpl::firstResult()
4072 {
4073 if (unlikely(getQuery().m_state < NdbQueryImpl::Executing ||
4074 getQuery().m_state >= NdbQueryImpl::Closed)) {
4075 int state = getQuery().m_state;
4076 assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
4077 if (state == NdbQueryImpl::Failed)
4078 getQuery().setErrorCode(QRY_IN_ERROR_STATE);
4079 else
4080 getQuery().setErrorCode(QRY_ILLEGAL_STATE);
4081 DEBUG_CRASH();
4082 return NdbQuery::NextResult_error;
4083 }
4084
4085 const NdbRootFragment* rootFrag;
4086
4087 #if 0 // TODO ::firstResult() on root operation is unused, incomplete & untested
4088 if (unlikely(getParentOperation()==NULL))
4089 {
4090 // Reset *all* ResultStreams, optionaly order them, and find new current among them
4091 for( Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++)
4092 {
4093 m_resultStreams[i]->firstResult();
4094 }
4095 rootFrag = m_queryImpl.m_applFrags.reorganize();
4096 assert(rootFrag==NULL || rootFrag==m_queryImpl.m_applFrags.getCurrent());
4097 }
4098 else
4099 #endif
4100
4101 {
4102 assert(getParentOperation()!=NULL); // TODO, See above
4103 rootFrag = m_queryImpl.m_applFrags.getCurrent();
4104 }
4105
4106 if (rootFrag != NULL)
4107 {
4108 NdbResultStream& resultStream = rootFrag->getResultStream(*this);
4109 if (resultStream.firstResult() != tupleNotFound)
4110 {
4111 fetchRow(resultStream);
4112 return NdbQuery::NextResult_gotRow;
4113 }
4114 }
4115 nullifyResult();
4116 return NdbQuery::NextResult_scanComplete;
4117 } //NdbQueryOperationImpl::firstResult()
4118
4119
4120 NdbQuery::NextResultOutcome
nextResult(bool fetchAllowed,bool forceSend)4121 NdbQueryOperationImpl::nextResult(bool fetchAllowed, bool forceSend)
4122 {
4123 if (unlikely(getQuery().m_state < NdbQueryImpl::Executing ||
4124 getQuery().m_state >= NdbQueryImpl::Closed)) {
4125 int state = getQuery().m_state;
4126 assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
4127 if (state == NdbQueryImpl::Failed)
4128 getQuery().setErrorCode(QRY_IN_ERROR_STATE);
4129 else
4130 getQuery().setErrorCode(QRY_ILLEGAL_STATE);
4131 DEBUG_CRASH();
4132 return NdbQuery::NextResult_error;
4133 }
4134
4135 if (this == &getRoot())
4136 {
4137 return m_queryImpl.nextRootResult(fetchAllowed,forceSend);
4138 }
4139 /**
4140 * 'next' will never be able to return anything for a lookup operation.
4141 * NOTE: This is a pure optimization shortcut!
4142 */
4143 else if (m_operationDef.isScanOperation())
4144 {
4145 const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent();
4146 if (rootFrag!=NULL)
4147 {
4148 NdbResultStream& resultStream = rootFrag->getResultStream(*this);
4149 if (resultStream.nextResult() != tupleNotFound)
4150 {
4151 fetchRow(resultStream);
4152 return NdbQuery::NextResult_gotRow;
4153 }
4154 }
4155 }
4156 nullifyResult();
4157 return NdbQuery::NextResult_scanComplete;
4158 } //NdbQueryOperationImpl::nextResult()
4159
4160
4161 void
fetchRow(NdbResultStream & resultStream)4162 NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
4163 {
4164 const char* buff = resultStream.getCurrentRow();
4165 assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
4166
4167 m_isRowNull = false;
4168 if (m_firstRecAttr != NULL)
4169 {
4170 // Retrieve any RecAttr (getValues()) for current row
4171 const int retVal = resultStream.getReceiver().get_AttrValues(m_firstRecAttr);
4172 assert(retVal==0); ((void)retVal);
4173 }
4174 if (m_ndbRecord != NULL)
4175 {
4176 if (m_resultRef!=NULL)
4177 {
4178 // Set application pointer to point into internal buffer.
4179 *m_resultRef = buff;
4180 }
4181 else
4182 {
4183 assert(m_resultBuffer!=NULL);
4184 // Copy result to buffer supplied by application.
4185 memcpy(m_resultBuffer, buff, m_ndbRecord->m_row_size);
4186 }
4187 }
4188 } // NdbQueryOperationImpl::fetchRow
4189
4190
4191 void
nullifyResult()4192 NdbQueryOperationImpl::nullifyResult()
4193 {
4194 if (!m_isRowNull)
4195 {
4196 /* This operation gave no result for the current row.*/
4197 m_isRowNull = true;
4198 if (m_resultRef!=NULL)
4199 {
4200 // Set the pointer supplied by the application to NULL.
4201 *m_resultRef = NULL;
4202 }
4203 /* We should not give any results for the descendants either.*/
4204 for (Uint32 i = 0; i<getNoOfChildOperations(); i++)
4205 {
4206 getChildOperation(i).nullifyResult();
4207 }
4208 }
4209 } // NdbQueryOperationImpl::nullifyResult
4210
4211 bool
isRowNULL() const4212 NdbQueryOperationImpl::isRowNULL() const
4213 {
4214 return m_isRowNull;
4215 }
4216
4217 bool
isRowChanged() const4218 NdbQueryOperationImpl::isRowChanged() const
4219 {
4220 // FIXME: Need to be implemented as scan linked with scan is now implemented.
4221 return true;
4222 }
4223
isSetInMask(const unsigned char * mask,int bitNo)4224 static bool isSetInMask(const unsigned char* mask, int bitNo)
4225 {
4226 return mask[bitNo>>3] & 1<<(bitNo&7);
4227 }
4228
4229 int
serializeProject(Uint32Buffer & attrInfo)4230 NdbQueryOperationImpl::serializeProject(Uint32Buffer& attrInfo)
4231 {
4232 Uint32 startPos = attrInfo.getSize();
4233 attrInfo.append(0U); // Temp write firste 'length' word, update later
4234
4235 /**
4236 * If the columns in the projections are specified as
4237 * in NdbRecord format, attrId are assumed to be ordered ascending.
4238 * In this form the projection spec. can be packed as
4239 * a single bitmap.
4240 */
4241 if (m_ndbRecord != NULL) {
4242 Bitmask<MAXNROFATTRIBUTESINWORDS> readMask;
4243 Uint32 requestedCols= 0;
4244 Uint32 maxAttrId= 0;
4245
4246 for (Uint32 i= 0; i<m_ndbRecord->noOfColumns; i++)
4247 {
4248 const NdbRecord::Attr* const col= &m_ndbRecord->columns[i];
4249 Uint32 attrId= col->attrId;
4250
4251 if (m_read_mask == NULL || isSetInMask(m_read_mask, i))
4252 { if (attrId > maxAttrId)
4253 maxAttrId= attrId;
4254
4255 readMask.set(attrId);
4256 requestedCols++;
4257
4258 const NdbColumnImpl* const column = getQueryOperationDef().getTable()
4259 .getColumn(col->column_no);
4260 if (column->getStorageType() == NDB_STORAGETYPE_DISK)
4261 {
4262 m_diskInUserProjection = true;
4263 }
4264 }
4265 }
4266
4267 // Test for special case, get all columns:
4268 if (requestedCols == (unsigned)m_operationDef.getTable().getNoOfColumns()) {
4269 Uint32 ah;
4270 AttributeHeader::init(&ah, AttributeHeader::READ_ALL, requestedCols);
4271 attrInfo.append(ah);
4272 } else if (requestedCols > 0) {
4273 /* Serialize projection as a bitmap.*/
4274 const Uint32 wordCount = 1+maxAttrId/32; // Size of mask.
4275 Uint32* dst = attrInfo.alloc(wordCount+1);
4276 AttributeHeader::init(dst,
4277 AttributeHeader::READ_PACKED, 4*wordCount);
4278 memcpy(dst+1, &readMask, 4*wordCount);
4279 }
4280 } // if (m_ndbRecord...)
4281
4282 /** Projection is specified in RecAttr format.
4283 * This may also be combined with the NdbRecord format.
4284 */
4285 const NdbRecAttr* recAttr = m_firstRecAttr;
4286 /* Serialize projection as a list of Attribute id's.*/
4287 while (recAttr) {
4288 Uint32 ah;
4289 AttributeHeader::init(&ah,
4290 recAttr->attrId(),
4291 0);
4292 attrInfo.append(ah);
4293 if (recAttr->getColumn()->getStorageType() == NDB_STORAGETYPE_DISK)
4294 {
4295 m_diskInUserProjection = true;
4296 }
4297 recAttr = recAttr->next();
4298 }
4299
4300 bool withCorrelation = getRoot().getQueryDef().isScanQuery();
4301 if (withCorrelation) {
4302 Uint32 ah;
4303 AttributeHeader::init(&ah, AttributeHeader::CORR_FACTOR64, 0);
4304 attrInfo.append(ah);
4305 }
4306
4307 // Size of projection in words.
4308 Uint32 length = attrInfo.getSize() - startPos - 1 ;
4309 attrInfo.put(startPos, length);
4310 return 0;
4311 } // NdbQueryOperationImpl::serializeProject
4312
serializeParams(const NdbQueryParamValue * paramValues)4313 int NdbQueryOperationImpl::serializeParams(const NdbQueryParamValue* paramValues)
4314 {
4315 if (unlikely(paramValues == NULL))
4316 {
4317 return QRY_REQ_ARG_IS_NULL;
4318 }
4319
4320 const NdbQueryOperationDefImpl& def = getQueryOperationDef();
4321 for (Uint32 i=0; i<def.getNoOfParameters(); i++)
4322 {
4323 const NdbParamOperandImpl& paramDef = def.getParameter(i);
4324 const NdbQueryParamValue& paramValue = paramValues[paramDef.getParamIx()];
4325
4326 /**
4327 * Add parameter value to serialized data.
4328 * Each value has a Uint32 length field (in bytes), followed by
4329 * the actuall value. Allocation is in Uint32 units with unused bytes
4330 * zero padded.
4331 **/
4332 const Uint32 oldSize = m_params.getSize();
4333 m_params.append(0); // Place holder for length.
4334 bool null;
4335 Uint32 len;
4336 const int error =
4337 paramValue.serializeValue(*paramDef.getColumn(), m_params, len, null);
4338 if (unlikely(error))
4339 return error;
4340 if (unlikely(null))
4341 return Err_KeyIsNULL;
4342
4343 if(unlikely(m_params.isMemoryExhausted())){
4344 return Err_MemoryAlloc;
4345 }
4346 // Back patch length field.
4347 m_params.put(oldSize, len);
4348 }
4349 return 0;
4350 } // NdbQueryOperationImpl::serializeParams
4351
4352 Uint32
4353 NdbQueryOperationImpl
calculateBatchedRows(const NdbQueryOperationImpl * closestScan)4354 ::calculateBatchedRows(const NdbQueryOperationImpl* closestScan)
4355 {
4356 const NdbQueryOperationImpl* myClosestScan;
4357 if (m_operationDef.isScanOperation())
4358 {
4359 myClosestScan = this;
4360 }
4361 else
4362 {
4363 myClosestScan = closestScan;
4364 }
4365
4366 Uint32 maxBatchRows = 0;
4367 if (myClosestScan != NULL)
4368 {
4369 // To force usage of SCAN_NEXTREQ even for small scans resultsets
4370 if (DBUG_EVALUATE_IF("max_4rows_in_spj_batches", true, false))
4371 {
4372 m_maxBatchRows = 4;
4373 }
4374 else if (DBUG_EVALUATE_IF("max_64rows_in_spj_batches", true, false))
4375 {
4376 m_maxBatchRows = 64;
4377 }
4378 else if (enforcedBatchSize)
4379 {
4380 m_maxBatchRows = enforcedBatchSize;
4381 }
4382
4383 const Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
4384
4385 /**
4386 * For each batch, a lookup operation must be able to receive as many rows
4387 * as the closest ancestor scan operation.
4388 * We must thus make sure that we do not set a batch size for the scan
4389 * that exceeds what any of its scan descendants can use.
4390 *
4391 * Ignore calculated 'batchByteSize'
4392 * here - Recalculated when building signal after max-batchRows has been
4393 * determined.
4394 */
4395 Uint32 batchByteSize;
4396 /**
4397 * myClosestScan->m_maxBatchRows may be zero to indicate that we
4398 * should use default values, or non-zero if the application had an
4399 * explicit preference.
4400 */
4401 maxBatchRows = myClosestScan->m_maxBatchRows;
4402 NdbReceiver::calculate_batch_size(* ndb.theImpl,
4403 getRoot().m_parallelism
4404 == Parallelism_max
4405 ? m_queryImpl.getRootFragCount()
4406 : getRoot().m_parallelism,
4407 maxBatchRows,
4408 batchByteSize);
4409 assert(maxBatchRows > 0);
4410 assert(maxBatchRows <= batchByteSize);
4411 }
4412
4413 // Find the largest value that is acceptable to all lookup descendants.
4414 for (Uint32 i = 0; i < m_children.size(); i++)
4415 {
4416 const Uint32 childMaxBatchRows =
4417 m_children[i]->calculateBatchedRows(myClosestScan);
4418 maxBatchRows = MIN(maxBatchRows, childMaxBatchRows);
4419 }
4420
4421 if (m_operationDef.isScanOperation())
4422 {
4423 // Use this value for current op and all lookup descendants.
4424 m_maxBatchRows = maxBatchRows;
4425 // Return max(Unit32) to avoid interfering with batch size calculation
4426 // for parent.
4427 return 0xffffffff;
4428 }
4429 else
4430 {
4431 return maxBatchRows;
4432 }
4433 } // NdbQueryOperationImpl::calculateBatchedRows
4434
4435
4436 void
setBatchedRows(Uint32 batchedRows)4437 NdbQueryOperationImpl::setBatchedRows(Uint32 batchedRows)
4438 {
4439 if (!m_operationDef.isScanOperation())
4440 {
4441 /** Lookup operations should handle the same number of rows as
4442 * the closest scan ancestor.
4443 */
4444 m_maxBatchRows = batchedRows;
4445 }
4446
4447 for (Uint32 i = 0; i < m_children.size(); i++)
4448 {
4449 m_children[i]->setBatchedRows(m_maxBatchRows);
4450 }
4451 }
4452
4453 int
prepareAttrInfo(Uint32Buffer & attrInfo)4454 NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo)
4455 {
4456 const NdbQueryOperationDefImpl& def = getQueryOperationDef();
4457
4458 /**
4459 * Serialize parameters refered by this NdbQueryOperation.
4460 * Params for the complete NdbQuery is collected in a single
4461 * serializedParams chunk. Each operations params are
4462 * proceeded by 'length' for this operation.
4463 */
4464 if (def.getType() == NdbQueryOperationDef::UniqueIndexAccess)
4465 {
4466 // Reserve memory for LookupParameters, fill in contents later when
4467 // 'length' and 'requestInfo' has been calculated.
4468 Uint32 startPos = attrInfo.getSize();
4469 attrInfo.alloc(QN_LookupParameters::NodeSize);
4470 Uint32 requestInfo = 0;
4471
4472 if (m_params.getSize() > 0)
4473 {
4474 // parameter values has been serialized as part of NdbTransaction::createQuery()
4475 // Only need to append it to rest of the serialized arguments
4476 requestInfo |= DABits::PI_KEY_PARAMS;
4477 attrInfo.append(m_params);
4478 }
4479
4480 QN_LookupParameters* param = reinterpret_cast<QN_LookupParameters*>(attrInfo.addr(startPos));
4481 if (unlikely(param==NULL))
4482 return Err_MemoryAlloc;
4483
4484 param->requestInfo = requestInfo;
4485 param->resultData = getIdOfReceiver();
4486 Uint32 length = attrInfo.getSize() - startPos;
4487 if (unlikely(length > 0xFFFF)) {
4488 return QRY_DEFINITION_TOO_LARGE; //Query definition too large.
4489 }
4490 QueryNodeParameters::setOpLen(param->len,
4491 QueryNodeParameters::QN_LOOKUP,
4492 length);
4493
4494 #ifdef __TRACE_SERIALIZATION
4495 ndbout << "Serialized params for index node "
4496 << getInternalOpNo()-1 << " : ";
4497 for(Uint32 i = startPos; i < attrInfo.getSize(); i++){
4498 char buf[12];
4499 sprintf(buf, "%.8x", attrInfo.get(i));
4500 ndbout << buf << " ";
4501 }
4502 ndbout << endl;
4503 #endif
4504 } // if (UniqueIndexAccess ...
4505
4506 // Reserve memory for LookupParameters, fill in contents later when
4507 // 'length' and 'requestInfo' has been calculated.
4508 Uint32 startPos = attrInfo.getSize();
4509 Uint32 requestInfo = 0;
4510 bool isRoot = (def.getOpNo()==0);
4511
4512 QueryNodeParameters::OpType paramType =
4513 !def.isScanOperation() ? QueryNodeParameters::QN_LOOKUP
4514 : (isRoot) ? QueryNodeParameters::QN_SCAN_FRAG
4515 : QueryNodeParameters::QN_SCAN_INDEX;
4516
4517 if (paramType == QueryNodeParameters::QN_SCAN_INDEX)
4518 attrInfo.alloc(QN_ScanIndexParameters::NodeSize);
4519 else if (paramType == QueryNodeParameters::QN_SCAN_FRAG)
4520 attrInfo.alloc(QN_ScanFragParameters::NodeSize);
4521 else
4522 attrInfo.alloc(QN_LookupParameters::NodeSize);
4523
4524 // SPJ block assume PARAMS to be supplied before ATTR_LIST
4525 if (m_params.getSize() > 0 &&
4526 def.getType() != NdbQueryOperationDef::UniqueIndexAccess)
4527 {
4528 // parameter values has been serialized as part of NdbTransaction::createQuery()
4529 // Only need to append it to rest of the serialized arguments
4530 requestInfo |= DABits::PI_KEY_PARAMS;
4531 attrInfo.append(m_params);
4532 }
4533
4534 if (hasInterpretedCode())
4535 {
4536 requestInfo |= DABits::PI_ATTR_INTERPRET;
4537 const int error= prepareInterpretedCode(attrInfo);
4538 if (unlikely(error))
4539 {
4540 return error;
4541 }
4542 }
4543
4544 if (m_ndbRecord==NULL && m_firstRecAttr==NULL)
4545 {
4546 // Leaf operations with empty projections are not supported.
4547 if (getNoOfChildOperations() == 0)
4548 {
4549 return QRY_EMPTY_PROJECTION;
4550 }
4551 }
4552 else
4553 {
4554 requestInfo |= DABits::PI_ATTR_LIST;
4555 const int error = serializeProject(attrInfo);
4556 if (unlikely(error)) {
4557 return error;
4558 }
4559 }
4560
4561 if (diskInUserProjection())
4562 {
4563 requestInfo |= DABits::PI_DISK_ATTR;
4564 }
4565
4566 Uint32 length = attrInfo.getSize() - startPos;
4567 if (unlikely(length > 0xFFFF)) {
4568 return QRY_DEFINITION_TOO_LARGE; //Query definition too large.
4569 }
4570
4571 if (paramType == QueryNodeParameters::QN_SCAN_INDEX)
4572 {
4573 QN_ScanIndexParameters* param = reinterpret_cast<QN_ScanIndexParameters*>(attrInfo.addr(startPos));
4574 if (unlikely(param==NULL))
4575 return Err_MemoryAlloc;
4576
4577 Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
4578
4579 Uint32 batchRows = getMaxBatchRows();
4580 Uint32 batchByteSize;
4581 NdbReceiver::calculate_batch_size(* ndb.theImpl,
4582 m_queryImpl.getRootFragCount(),
4583 batchRows,
4584 batchByteSize);
4585 assert(batchRows == getMaxBatchRows());
4586 assert(batchRows <= batchByteSize);
4587 assert(m_parallelism == Parallelism_max ||
4588 m_parallelism == Parallelism_adaptive);
4589 if (m_parallelism == Parallelism_max)
4590 {
4591 requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
4592 }
4593 if (def.hasParamInPruneKey())
4594 {
4595 requestInfo |= QN_ScanIndexParameters::SIP_PRUNE_PARAMS;
4596 }
4597 param->requestInfo = requestInfo;
4598 // Check that both values fit in param->batchSize.
4599 assert(getMaxBatchRows() < (1<<QN_ScanIndexParameters::BatchRowBits));
4600 assert(batchByteSize < (1 << (sizeof param->batchSize * 8
4601 - QN_ScanIndexParameters::BatchRowBits)));
4602 param->batchSize = (batchByteSize << 11) | getMaxBatchRows();
4603 param->resultData = getIdOfReceiver();
4604 QueryNodeParameters::setOpLen(param->len, paramType, length);
4605 }
4606 else if (paramType == QueryNodeParameters::QN_SCAN_FRAG)
4607 {
4608 QN_ScanFragParameters* param = reinterpret_cast<QN_ScanFragParameters*>(attrInfo.addr(startPos));
4609 if (unlikely(param==NULL))
4610 return Err_MemoryAlloc;
4611
4612 param->requestInfo = requestInfo;
4613 param->resultData = getIdOfReceiver();
4614 QueryNodeParameters::setOpLen(param->len, paramType, length);
4615 }
4616 else
4617 {
4618 assert(paramType == QueryNodeParameters::QN_LOOKUP);
4619 QN_LookupParameters* param = reinterpret_cast<QN_LookupParameters*>(attrInfo.addr(startPos));
4620 if (unlikely(param==NULL))
4621 return Err_MemoryAlloc;
4622
4623 param->requestInfo = requestInfo;
4624 param->resultData = getIdOfReceiver();
4625 QueryNodeParameters::setOpLen(param->len, paramType, length);
4626 }
4627
4628 #ifdef __TRACE_SERIALIZATION
4629 ndbout << "Serialized params for node "
4630 << getInternalOpNo() << " : ";
4631 for(Uint32 i = startPos; i < attrInfo.getSize(); i++){
4632 char buf[12];
4633 sprintf(buf, "%.8x", attrInfo.get(i));
4634 ndbout << buf << " ";
4635 }
4636 ndbout << endl;
4637 #endif
4638
4639 // Parameter values was appended to AttrInfo, shrink param buffer
4640 // to reduce memory footprint.
4641 m_params.releaseExtend();
4642
4643 return 0;
4644 } // NdbQueryOperationImpl::prepareAttrInfo
4645
4646
4647 int
prepareKeyInfo(Uint32Buffer & keyInfo,const NdbQueryParamValue * actualParam)4648 NdbQueryOperationImpl::prepareKeyInfo(
4649 Uint32Buffer& keyInfo,
4650 const NdbQueryParamValue* actualParam)
4651 {
4652 assert(this == &getRoot()); // Should only be called for root operation.
4653 #ifdef TRACE_SERIALIZATION
4654 int startPos = keyInfo.getSize();
4655 #endif
4656
4657 const NdbQueryOperationDefImpl::IndexBound* bounds = m_operationDef.getBounds();
4658 if (bounds)
4659 {
4660 const int error = prepareIndexKeyInfo(keyInfo, bounds, actualParam);
4661 if (unlikely(error))
4662 return error;
4663 }
4664
4665 const NdbQueryOperandImpl* const* keys = m_operationDef.getKeyOperands();
4666 if (keys)
4667 {
4668 const int error = prepareLookupKeyInfo(keyInfo, keys, actualParam);
4669 if (unlikely(error))
4670 return error;
4671 }
4672
4673 if (unlikely(keyInfo.isMemoryExhausted())) {
4674 return Err_MemoryAlloc;
4675 }
4676
4677 #ifdef TRACE_SERIALIZATION
4678 ndbout << "Serialized KEYINFO for NdbQuery root : ";
4679 for (Uint32 i = startPos; i < keyInfo.getSize(); i++) {
4680 char buf[12];
4681 sprintf(buf, "%.8x", keyInfo.get(i));
4682 ndbout << buf << " ";
4683 }
4684 ndbout << endl;
4685 #endif
4686
4687 return 0;
4688 } // NdbQueryOperationImpl::prepareKeyInfo
4689
4690
4691 /**
4692 * Convert constant operand into sequence of words that may be sent to data
4693 * nodes.
4694 * @param constOp Operand to convert.
4695 * @param buffer Destination buffer.
4696 * @param len Will be set to length in bytes.
4697 * @return 0 if ok, otherwise error code.
4698 */
4699 static int
serializeConstOp(const NdbConstOperandImpl & constOp,Uint32Buffer & buffer,Uint32 & len)4700 serializeConstOp(const NdbConstOperandImpl& constOp,
4701 Uint32Buffer& buffer,
4702 Uint32& len)
4703 {
4704 // Check that column->shrink_varchar() not specified, only used by mySQL
4705 // assert (!(column->flags & NdbDictionary::RecMysqldShrinkVarchar));
4706 buffer.skipRestOfWord();
4707 len = constOp.getSizeInBytes();
4708 Uint8 shortLen[2];
4709 switch (constOp.getColumn()->getArrayType()) {
4710 case NdbDictionary::Column::ArrayTypeFixed:
4711 buffer.appendBytes(constOp.getAddr(), len);
4712 break;
4713
4714 case NdbDictionary::Column::ArrayTypeShortVar:
4715 // Such errors should have been caught in convert2ColumnType().
4716 assert(len <= 0xFF);
4717 shortLen[0] = (unsigned char)len;
4718 buffer.appendBytes(shortLen, 1);
4719 buffer.appendBytes(constOp.getAddr(), len);
4720 len+=1;
4721 break;
4722
4723 case NdbDictionary::Column::ArrayTypeMediumVar:
4724 // Such errors should have been caught in convert2ColumnType().
4725 assert(len <= 0xFFFF);
4726 shortLen[0] = (unsigned char)(len & 0xFF);
4727 shortLen[1] = (unsigned char)(len >> 8);
4728 buffer.appendBytes(shortLen, 2);
4729 buffer.appendBytes(constOp.getAddr(), len);
4730 len+=2;
4731 break;
4732
4733 default:
4734 assert(false);
4735 }
4736 if (unlikely(buffer.isMemoryExhausted())) {
4737 return Err_MemoryAlloc;
4738 }
4739 return 0;
4740 } // static serializeConstOp
4741
4742 static int
appendBound(Uint32Buffer & keyInfo,NdbIndexScanOperation::BoundType type,const NdbQueryOperandImpl * bound,const NdbQueryParamValue * actualParam)4743 appendBound(Uint32Buffer& keyInfo,
4744 NdbIndexScanOperation::BoundType type, const NdbQueryOperandImpl* bound,
4745 const NdbQueryParamValue* actualParam)
4746 {
4747 Uint32 len = 0;
4748
4749 keyInfo.append(type);
4750 const Uint32 oldSize = keyInfo.getSize();
4751 keyInfo.append(0); // Place holder for AttributeHeader
4752
4753 switch(bound->getKind()){
4754 case NdbQueryOperandImpl::Const:
4755 {
4756 const NdbConstOperandImpl& constOp =
4757 static_cast<const NdbConstOperandImpl&>(*bound);
4758
4759 const int error = serializeConstOp(constOp, keyInfo, len);
4760 if (unlikely(error))
4761 return error;
4762
4763 break;
4764 }
4765 case NdbQueryOperandImpl::Param:
4766 {
4767 const NdbParamOperandImpl* const paramOp
4768 = static_cast<const NdbParamOperandImpl*>(bound);
4769 const int paramNo = paramOp->getParamIx();
4770 assert(actualParam != NULL);
4771
4772 bool null;
4773 const int error =
4774 actualParam[paramNo].serializeValue(*paramOp->getColumn(), keyInfo,
4775 len, null);
4776 if (unlikely(error))
4777 return error;
4778 if (unlikely(null))
4779 return Err_KeyIsNULL;
4780 break;
4781 }
4782 case NdbQueryOperandImpl::Linked: // Root operation cannot have linked operands.
4783 default:
4784 assert(false);
4785 }
4786
4787 // Back patch attribute header.
4788 keyInfo.put(oldSize,
4789 AttributeHeader(bound->getColumn()->m_attrId, len).m_value);
4790
4791 return 0;
4792 } // static appendBound()
4793
4794
4795 int
prepareIndexKeyInfo(Uint32Buffer & keyInfo,const NdbQueryOperationDefImpl::IndexBound * bounds,const NdbQueryParamValue * actualParam)4796 NdbQueryOperationImpl::prepareIndexKeyInfo(
4797 Uint32Buffer& keyInfo,
4798 const NdbQueryOperationDefImpl::IndexBound* bounds,
4799 const NdbQueryParamValue* actualParam)
4800 {
4801 int startPos = keyInfo.getSize();
4802 if (bounds->lowKeys==0 && bounds->highKeys==0) // No Bounds defined
4803 return 0;
4804
4805 const unsigned key_count =
4806 (bounds->lowKeys >= bounds->highKeys) ? bounds->lowKeys : bounds->highKeys;
4807
4808 for (unsigned keyNo = 0; keyNo < key_count; keyNo++)
4809 {
4810 NdbIndexScanOperation::BoundType bound_type;
4811
4812 /* If upper and lower limit is equal, a single BoundEQ is sufficient */
4813 if (keyNo < bounds->lowKeys &&
4814 keyNo < bounds->highKeys &&
4815 bounds->low[keyNo] == bounds->high[keyNo])
4816 {
4817 /* Inclusive if defined, or matching rows can include this value */
4818 bound_type= NdbIndexScanOperation::BoundEQ;
4819 int error = appendBound(keyInfo, bound_type, bounds->low[keyNo], actualParam);
4820 if (unlikely(error))
4821 return error;
4822
4823 } else {
4824
4825 /* If key is part of lower bound */
4826 if (keyNo < bounds->lowKeys)
4827 {
4828 /* Inclusive if defined, or matching rows can include this value */
4829 bound_type= bounds->lowIncl || keyNo+1 < bounds->lowKeys ?
4830 NdbIndexScanOperation::BoundLE : NdbIndexScanOperation::BoundLT;
4831
4832 int error = appendBound(keyInfo, bound_type, bounds->low[keyNo], actualParam);
4833 if (unlikely(error))
4834 return error;
4835 }
4836
4837 /* If key is part of upper bound */
4838 if (keyNo < bounds->highKeys)
4839 {
4840 /* Inclusive if defined, or matching rows can include this value */
4841 bound_type= bounds->highIncl || keyNo+1 < bounds->highKeys ?
4842 NdbIndexScanOperation::BoundGE : NdbIndexScanOperation::BoundGT;
4843
4844 int error = appendBound(keyInfo, bound_type, bounds->high[keyNo], actualParam);
4845 if (unlikely(error))
4846 return error;
4847 }
4848 }
4849 }
4850
4851 Uint32 length = keyInfo.getSize()-startPos;
4852 if (unlikely(keyInfo.isMemoryExhausted())) {
4853 return Err_MemoryAlloc;
4854 } else if (unlikely(length > 0xFFFF)) {
4855 return QRY_DEFINITION_TOO_LARGE; // Query definition too large.
4856 } else if (likely(length > 0)) {
4857 keyInfo.put(startPos, keyInfo.get(startPos) | (length << 16));
4858 }
4859
4860 m_queryImpl.m_shortestBound =(bounds->lowKeys <= bounds->highKeys) ? bounds->lowKeys : bounds->highKeys;
4861 return 0;
4862 } // NdbQueryOperationImpl::prepareIndexKeyInfo
4863
4864
4865 int
prepareLookupKeyInfo(Uint32Buffer & keyInfo,const NdbQueryOperandImpl * const keys[],const NdbQueryParamValue * actualParam)4866 NdbQueryOperationImpl::prepareLookupKeyInfo(
4867 Uint32Buffer& keyInfo,
4868 const NdbQueryOperandImpl* const keys[],
4869 const NdbQueryParamValue* actualParam)
4870 {
4871 const int keyCount = m_operationDef.getIndex()!=NULL ?
4872 static_cast<int>(m_operationDef.getIndex()->getNoOfColumns()) :
4873 m_operationDef.getTable().getNoOfPrimaryKeys();
4874
4875 for (int keyNo = 0; keyNo<keyCount; keyNo++)
4876 {
4877 Uint32 dummy;
4878
4879 switch(keys[keyNo]->getKind()){
4880 case NdbQueryOperandImpl::Const:
4881 {
4882 const NdbConstOperandImpl* const constOp
4883 = static_cast<const NdbConstOperandImpl*>(keys[keyNo]);
4884 const int error =
4885 serializeConstOp(*constOp, keyInfo, dummy);
4886 if (unlikely(error))
4887 return error;
4888
4889 break;
4890 }
4891 case NdbQueryOperandImpl::Param:
4892 {
4893 const NdbParamOperandImpl* const paramOp
4894 = static_cast<const NdbParamOperandImpl*>(keys[keyNo]);
4895 int paramNo = paramOp->getParamIx();
4896 assert(actualParam != NULL);
4897
4898 bool null;
4899 const int error =
4900 actualParam[paramNo].serializeValue(*paramOp->getColumn(), keyInfo,
4901 dummy, null);
4902
4903 if (unlikely(error))
4904 return error;
4905 if (unlikely(null))
4906 return Err_KeyIsNULL;
4907 break;
4908 }
4909 case NdbQueryOperandImpl::Linked: // Root operation cannot have linked operands.
4910 default:
4911 assert(false);
4912 }
4913 }
4914
4915 if (unlikely(keyInfo.isMemoryExhausted())) {
4916 return Err_MemoryAlloc;
4917 }
4918
4919 return 0;
4920 } // NdbQueryOperationImpl::prepareLookupKeyInfo
4921
4922
4923 bool
execTRANSID_AI(const Uint32 * ptr,Uint32 len)4924 NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
4925 {
4926 TupleCorrelation tupleCorrelation;
4927 NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
4928
4929 if (getQueryDef().isScanQuery())
4930 {
4931 const CorrelationData correlData(ptr, len);
4932 const Uint32 receiverId = correlData.getRootReceiverId();
4933
4934 /** receiverId holds the Id of the receiver of the corresponding stream
4935 * of the root operation. We can thus find the correct root fragment
4936 * number.
4937 */
4938 rootFrag =
4939 NdbRootFragment::receiverIdLookup(m_queryImpl.m_rootFrags,
4940 m_queryImpl.getRootFragCount(),
4941 receiverId);
4942 if (unlikely(rootFrag == NULL))
4943 {
4944 assert(false);
4945 return false;
4946 }
4947
4948 // Extract tuple correlation.
4949 tupleCorrelation = correlData.getTupleCorrelation();
4950 len -= CorrelationData::wordCount;
4951 }
4952
4953 if (traceSignals) {
4954 ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
4955 << ", fragment no: " << rootFrag->getFragNo()
4956 << ", operation no: " << getQueryOperationDef().getOpNo()
4957 << endl;
4958 }
4959
4960 // Process result values.
4961 rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
4962 rootFrag->incrOutstandingResults(-1);
4963
4964 bool ret = false;
4965 if (rootFrag->isFragBatchComplete())
4966 {
4967 ret = m_queryImpl.handleBatchComplete(*rootFrag);
4968 }
4969
4970 if (false && traceSignals) {
4971 ndbout << "NdbQueryOperationImpl::execTRANSID_AI(): returns:" << ret
4972 << ", *this=" << *this << endl;
4973 }
4974 return ret;
4975 } //NdbQueryOperationImpl::execTRANSID_AI
4976
4977
4978 bool
execTCKEYREF(const NdbApiSignal * aSignal)4979 NdbQueryOperationImpl::execTCKEYREF(const NdbApiSignal* aSignal)
4980 {
4981 if (traceSignals) {
4982 ndbout << "NdbQueryOperationImpl::execTCKEYREF()" << endl;
4983 }
4984
4985 /* The SPJ block does not forward TCKEYREFs for trees with scan roots.*/
4986 assert(!getQueryDef().isScanQuery());
4987
4988 const TcKeyRef* ref = CAST_CONSTPTR(TcKeyRef, aSignal->getDataPtr());
4989 if (!getQuery().m_transaction.checkState_TransId(ref->transId))
4990 {
4991 #ifdef NDB_NO_DROPPED_SIGNAL
4992 abort();
4993 #endif
4994 return false;
4995 }
4996
4997 // Suppress 'TupleNotFound' status for child operations.
4998 if (&getRoot() == this ||
4999 ref->errorCode != static_cast<Uint32>(Err_TupleNotFound))
5000 {
5001 if (aSignal->getLength() == TcKeyRef::SignalLength)
5002 {
5003 // Signal may contain additional error data
5004 getQuery().m_error.details = (char *)UintPtr(ref->errorData);
5005 }
5006 getQuery().setFetchTerminated(ref->errorCode,false);
5007 }
5008
5009 NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
5010
5011 /**
5012 * Error may be either a 'soft' or a 'hard' error.
5013 * 'Soft error' are regarded 'informational', and we are
5014 * allowed to continue execution of the query. A 'hard error'
5015 * will terminate query, close comminication, and further
5016 * incomming signals to this NdbReceiver will be discarded.
5017 */
5018 switch (ref->errorCode)
5019 {
5020 case Err_TupleNotFound: // 'Soft error' : Row not found
5021 case Err_FalsePredicate: // 'Soft error' : Interpreter_exit_nok
5022 {
5023 /**
5024 * Need to update 'outstanding' count:
5025 * Compensate for children results not produced.
5026 * (doSend() assumed all child results to be materialized)
5027 */
5028 Uint32 cnt = 1; // self
5029 cnt += getNoOfDescendantOperations();
5030 if (getNoOfChildOperations() > 0)
5031 {
5032 cnt += getNoOfLeafOperations();
5033 }
5034 rootFrag.incrOutstandingResults(- Int32(cnt));
5035 break;
5036 }
5037 default: // 'Hard error':
5038 rootFrag.throwRemainingResults(); // Terminate receive -> complete
5039 }
5040
5041 bool ret = false;
5042 if (rootFrag.isFragBatchComplete())
5043 {
5044 ret = m_queryImpl.handleBatchComplete(rootFrag);
5045 }
5046
5047 if (traceSignals) {
5048 ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret
5049 << ", *this=" << *this << endl;
5050 }
5051 return ret;
5052 } //NdbQueryOperationImpl::execTCKEYREF
5053
5054 bool
execSCAN_TABCONF(Uint32 tcPtrI,Uint32 rowCount,Uint32 nodeMask,NdbReceiver * receiver)5055 NdbQueryOperationImpl::execSCAN_TABCONF(Uint32 tcPtrI,
5056 Uint32 rowCount,
5057 Uint32 nodeMask,
5058 NdbReceiver* receiver)
5059 {
5060 assert((tcPtrI==RNIL && nodeMask==0) ||
5061 (tcPtrI!=RNIL && nodeMask!=0));
5062 assert(checkMagicNumber());
5063 // For now, only the root operation may be a scan.
5064 assert(&getRoot() == this);
5065 assert(m_operationDef.isScanOperation());
5066
5067 NdbRootFragment* rootFrag =
5068 NdbRootFragment::receiverIdLookup(m_queryImpl.m_rootFrags,
5069 m_queryImpl.getRootFragCount(),
5070 receiver->getId());
5071 if (unlikely(rootFrag == NULL))
5072 {
5073 assert(false);
5074 return false;
5075 }
5076 // Prepare for SCAN_NEXTREQ, tcPtrI==RNIL, nodeMask==0 -> EOF
5077 rootFrag->setConfReceived(tcPtrI);
5078 rootFrag->setRemainingSubScans(nodeMask);
5079 rootFrag->incrOutstandingResults(rowCount);
5080
5081 if(traceSignals){
5082 ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF"
5083 << " fragment no: " << rootFrag->getFragNo()
5084 << " rows " << rowCount
5085 << " nodeMask: H'" << hex << nodeMask << ")"
5086 << " tcPtrI " << tcPtrI
5087 << endl;
5088 }
5089
5090 bool ret = false;
5091 if (rootFrag->isFragBatchComplete())
5092 {
5093 /* This fragment is now complete */
5094 ret = m_queryImpl.handleBatchComplete(*rootFrag);
5095 }
5096 if (false && traceSignals) {
5097 ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
5098 << ", tcPtrI=" << tcPtrI << " rowCount=" << rowCount
5099 << " *this=" << *this << endl;
5100 }
5101 return ret;
5102 } //NdbQueryOperationImpl::execSCAN_TABCONF
5103
5104 int
setOrdering(NdbQueryOptions::ScanOrdering ordering)5105 NdbQueryOperationImpl::setOrdering(NdbQueryOptions::ScanOrdering ordering)
5106 {
5107 if (getQueryOperationDef().getType() != NdbQueryOperationDef::OrderedIndexScan)
5108 {
5109 getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5110 return -1;
5111 }
5112
5113 if (m_parallelism != Parallelism_max)
5114 {
5115 getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
5116 return -1;
5117 }
5118
5119 if(static_cast<const NdbQueryIndexScanOperationDefImpl&>
5120 (getQueryOperationDef())
5121 .getOrdering() != NdbQueryOptions::ScanOrdering_void)
5122 {
5123 getQuery().setErrorCode(QRY_SCAN_ORDER_ALREADY_SET);
5124 return -1;
5125 }
5126
5127 m_ordering = ordering;
5128 return 0;
5129 } // NdbQueryOperationImpl::setOrdering()
5130
setInterpretedCode(const NdbInterpretedCode & code)5131 int NdbQueryOperationImpl::setInterpretedCode(const NdbInterpretedCode& code)
5132 {
5133 if (code.m_instructions_length == 0)
5134 {
5135 return 0;
5136 }
5137
5138 const NdbTableImpl& table = getQueryOperationDef().getTable();
5139 // Check if operation and interpreter code use the same table
5140 if (unlikely(table.getTableId() != code.getTable()->getTableId()
5141 || table_version_major(table.getObjectVersion()) !=
5142 table_version_major(code.getTable()->getObjectVersion())))
5143 {
5144 getQuery().setErrorCode(Err_InterpretedCodeWrongTab);
5145 return -1;
5146 }
5147
5148 if (unlikely((code.m_flags & NdbInterpretedCode::Finalised)
5149 == 0))
5150 {
5151 // NdbInterpretedCode::finalise() not called.
5152 getQuery().setErrorCode(Err_FinaliseNotCalled);
5153 return -1;
5154 }
5155
5156 // Allocate an interpreted code object if we do not have one already.
5157 if (likely(m_interpretedCode == NULL))
5158 {
5159 m_interpretedCode = new NdbInterpretedCode();
5160
5161 if (unlikely(m_interpretedCode==NULL))
5162 {
5163 getQuery().setErrorCode(Err_MemoryAlloc);
5164 return -1;
5165 }
5166 }
5167
5168 /*
5169 * Make a deep copy, such that 'code' can be destroyed when this method
5170 * returns.
5171 */
5172 const int error = m_interpretedCode->copy(code);
5173 if (unlikely(error))
5174 {
5175 getQuery().setErrorCode(error);
5176 return -1;
5177 }
5178 return 0;
5179 } // NdbQueryOperationImpl::setInterpretedCode()
5180
setParallelism(Uint32 parallelism)5181 int NdbQueryOperationImpl::setParallelism(Uint32 parallelism){
5182 if (!getQueryOperationDef().isScanOperation())
5183 {
5184 getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5185 return -1;
5186 }
5187 else if (getOrdering() == NdbQueryOptions::ScanOrdering_ascending ||
5188 getOrdering() == NdbQueryOptions::ScanOrdering_descending)
5189 {
5190 getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
5191 return -1;
5192 }
5193 else if (getQueryOperationDef().getOpNo() > 0)
5194 {
5195 getQuery().setErrorCode(Err_FunctionNotImplemented);
5196 return -1;
5197 }
5198 else if (parallelism < 1 || parallelism > NDB_PARTITION_MASK)
5199 {
5200 getQuery().setErrorCode(Err_ParameterError);
5201 return -1;
5202 }
5203 m_parallelism = parallelism;
5204 return 0;
5205 }
5206
setMaxParallelism()5207 int NdbQueryOperationImpl::setMaxParallelism(){
5208 if (!getQueryOperationDef().isScanOperation())
5209 {
5210 getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5211 return -1;
5212 }
5213 m_parallelism = Parallelism_max;
5214 return 0;
5215 }
5216
setAdaptiveParallelism()5217 int NdbQueryOperationImpl::setAdaptiveParallelism(){
5218 if (!getQueryOperationDef().isScanOperation())
5219 {
5220 getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5221 return -1;
5222 }
5223 else if (getQueryOperationDef().getOpNo() == 0)
5224 {
5225 getQuery().setErrorCode(Err_FunctionNotImplemented);
5226 return -1;
5227 }
5228 m_parallelism = Parallelism_adaptive;
5229 return 0;
5230 }
5231
setBatchSize(Uint32 batchSize)5232 int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
5233 if (!getQueryOperationDef().isScanOperation())
5234 {
5235 getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5236 return -1;
5237 }
5238 if (this != &getRoot() &&
5239 batchSize < getQueryOperationDef().getTable().getFragmentCount())
5240 {
5241 /** Each SPJ block instance will scan each fragment, so the batch size
5242 * cannot be smaller than the number of fragments.*/
5243 getQuery().setErrorCode(QRY_BATCH_SIZE_TOO_SMALL);
5244 return -1;
5245 }
5246 m_maxBatchRows = batchSize;
5247 return 0;
5248 }
5249
5250 bool
hasInterpretedCode() const5251 NdbQueryOperationImpl::hasInterpretedCode() const
5252 {
5253 return (m_interpretedCode && m_interpretedCode->m_instructions_length > 0) ||
5254 (getQueryOperationDef().getInterpretedCode() != NULL);
5255 } // NdbQueryOperationImpl::hasInterpretedCode
5256
5257 int
prepareInterpretedCode(Uint32Buffer & attrInfo) const5258 NdbQueryOperationImpl::prepareInterpretedCode(Uint32Buffer& attrInfo) const
5259 {
5260 const NdbInterpretedCode* interpretedCode =
5261 (m_interpretedCode && m_interpretedCode->m_instructions_length > 0)
5262 ? m_interpretedCode
5263 : getQueryOperationDef().getInterpretedCode();
5264
5265 // There should be no subroutines in a filter.
5266 assert(interpretedCode->m_first_sub_instruction_pos==0);
5267 assert(interpretedCode->m_instructions_length > 0);
5268 assert(interpretedCode->m_instructions_length <= 0xffff);
5269
5270 // Allocate space for program and length field.
5271 Uint32* const buffer =
5272 attrInfo.alloc(1+interpretedCode->m_instructions_length);
5273 if(unlikely(buffer==NULL))
5274 {
5275 return Err_MemoryAlloc;
5276 }
5277
5278 buffer[0] = interpretedCode->m_instructions_length;
5279 memcpy(buffer+1,
5280 interpretedCode->m_buffer,
5281 interpretedCode->m_instructions_length * sizeof(Uint32));
5282 return 0;
5283 } // NdbQueryOperationImpl::prepareInterpretedCode
5284
5285
5286 Uint32
getIdOfReceiver() const5287 NdbQueryOperationImpl::getIdOfReceiver() const {
5288 NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0];
5289 return rootFrag.getResultStream(*this).getReceiver().getId();
5290 }
5291
getRowSize() const5292 Uint32 NdbQueryOperationImpl::getRowSize() const
5293 {
5294 // Check if row size has been computed yet.
5295 if (m_rowSize == 0xffffffff)
5296 {
5297 m_rowSize =
5298 NdbReceiver::ndbrecord_rowsize(m_ndbRecord, false);
5299 }
5300 return m_rowSize;
5301 }
5302
getBatchBufferSize() const5303 Uint32 NdbQueryOperationImpl::getBatchBufferSize() const
5304 {
5305 // Check if batch buffer size has been computed yet.
5306 if (m_batchBufferSize == 0xffffffff)
5307 {
5308 Uint32 batchRows = getMaxBatchRows();
5309 Uint32 batchByteSize = 0;
5310 Uint32 batchFrags = 1;
5311
5312 if (m_operationDef.isScanOperation())
5313 {
5314 const Ndb* const ndb = getQuery().getNdbTransaction().getNdb();
5315 NdbReceiver::calculate_batch_size(* ndb->theImpl,
5316 getQuery().getRootFragCount(),
5317 batchRows,
5318 batchByteSize);
5319 assert(batchRows == getMaxBatchRows());
5320
5321 /**
5322 * When LQH reads a scan batch, the size of the batch is limited
5323 * both to a maximal number of rows and a maximal number of bytes.
5324 * The latter limit is interpreted such that the batch ends when the
5325 * limit has been exceeded. Consequently, the buffer must be able to
5326 * hold max_no_of_bytes plus one extra row. In addition, when the
5327 * SPJ block executes a (pushed) child scan operation, it scans a
5328 * number of fragments (possibly all) in parallel, and divides the
5329 * row and byte limits by the number of parallel fragments.
5330 * Consequently, a child scan operation may return max_no_of_bytes,
5331 * plus one extra row for each fragment.
5332 */
5333 if (getParentOperation() != NULL)
5334 {
5335 batchFrags = getQuery().getRootFragCount();
5336 }
5337 }
5338
5339 AttributeMask readMask;
5340 if (m_ndbRecord != NULL)
5341 {
5342 m_ndbRecord->copyMask(readMask.rep.data, m_read_mask);
5343 }
5344
5345 m_batchBufferSize = NdbReceiver::result_bufsize(
5346 batchRows,
5347 batchByteSize,
5348 batchFrags,
5349 m_ndbRecord,
5350 readMask.rep.data,
5351 m_firstRecAttr,
5352 0, 0);
5353 }
5354 return m_batchBufferSize;
5355 }
5356
5357 /** For debugging.*/
operator <<(NdbOut & out,const NdbQueryOperationImpl & op)5358 NdbOut& operator<<(NdbOut& out, const NdbQueryOperationImpl& op){
5359 out << "[ this: " << &op
5360 << " m_magic: " << op.m_magic;
5361 out << " op.operationDef.getOpNo()"
5362 << op.m_operationDef.getOpNo();
5363 if (op.getParentOperation()){
5364 out << " m_parent: " << op.getParentOperation();
5365 }
5366 for(unsigned int i = 0; i<op.getNoOfChildOperations(); i++){
5367 out << " m_children[" << i << "]: " << &op.getChildOperation(i);
5368 }
5369 out << " m_queryImpl: " << &op.m_queryImpl;
5370 out << " m_operationDef: " << &op.m_operationDef;
5371 out << " m_isRowNull " << op.m_isRowNull;
5372 out << " ]";
5373 return out;
5374 }
5375
operator <<(NdbOut & out,const NdbResultStream & stream)5376 NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){
5377 out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount();
5378 return out;
5379 }
5380
5381
5382 // Compiler settings require explicit instantiation.
5383 template class Vector<NdbQueryOperationImpl*>;
5384