1 /*
2    Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <ndb_global.h>
27 #include <NdbDictionary.hpp>
28 #include <NdbIndexScanOperation.hpp>
29 #include "NdbQueryBuilder.hpp"
30 #include "NdbQueryOperation.hpp"
31 #include "API.hpp"
32 #include "NdbQueryBuilderImpl.hpp"
33 #include "NdbQueryOperationImpl.hpp"
34 #include "NdbInterpretedCode.hpp"
35 
36 #include <signaldata/TcKeyReq.hpp>
37 #include <signaldata/TcKeyRef.hpp>
38 #include <signaldata/ScanTab.hpp>
39 #include <signaldata/QueryTree.hpp>
40 #include <signaldata/DbspjErr.hpp>
41 
42 #include "AttributeHeader.hpp"
43 
44 #include <Bitmask.hpp>
45 
46 #if 0
47 #define DEBUG_CRASH() assert(false)
48 #else
49 #define DEBUG_CRASH()
50 #endif
51 
52 /** To prevent compiler warnings about variables that are only used in asserts
53  * (when building optimized version).
54  */
55 #define UNUSED(x) ((void)(x))
56 
57 // To force usage of SCAN_NEXTREQ even for small scans resultsets:
58 // - '0' is default (production) value
59 // - '4' is normally a good value for testing batch related problems
60 static const int enforcedBatchSize = 0;
61 
62 // Use double buffered ResultSets, may later change
63 // to be more adaptive based on query type
64 static const bool useDoubleBuffers = true;
65 
66 /* Various error codes that are not specific to NdbQuery. */
67 static const int Err_TupleNotFound = 626;
68 static const int Err_FalsePredicate = 899;
69 static const int Err_MemoryAlloc = 4000;
70 static const int Err_SendFailed = 4002;
71 static const int Err_FunctionNotImplemented = 4003;
72 static const int Err_UnknownColumn = 4004;
73 static const int Err_ReceiveTimedOut = 4008;
74 static const int Err_NodeFailCausedAbort = 4028;
75 static const int Err_ParameterError = 4118;
76 static const int Err_SimpleDirtyReadFailed = 4119;
77 static const int Err_WrongFieldLength = 4209;
78 static const int Err_ReadTooMuch = 4257;
79 static const int Err_InvalidRangeNo = 4286;
80 static const int Err_DifferentTabForKeyRecAndAttrRec = 4287;
81 static const int Err_KeyIsNULL = 4316;
82 static const int Err_FinaliseNotCalled = 4519;
83 static const int Err_InterpretedCodeWrongTab = 4524;
84 
85 /* A 'void' index for a tuple in internal parent / child correlation structs .*/
86 static const Uint16 tupleNotFound = 0xffff;
87 
88 /** Set to true to trace incomming signals.*/
89 static const bool traceSignals = false;
90 
91 enum
92 {
93   /**
94    * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
95    * scan parallelism should be adaptive.
96    */
97   Parallelism_adaptive = 0xffff0000,
98 
99   /**
100    * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
101    * all fragments should be scanned in parallel.
102    */
103   Parallelism_max = 0xffff0001
104 };
105 
106 /**
107  * A class for accessing the correlation data at the end of a tuple (for
108  * scan queries). These data have the following layout:
109  *
110  * Word 0: AttributeHeader
111  * Word 1, upper halfword: tuple id of parent tuple.
112  * Word 1, lower halfword: tuple id of this tuple.
113  * Word 2: Id of receiver for root operation (where the ancestor tuple of this
114  *         tuple will go).
115  *
116  * Both tuple identifiers are unique within this batch and root fragment.
117  * With these identifiers, it is possible to relate a tuple to its parent and
118  * children. That way, results for child operations can be updated correctly
119  * when the application iterates over the results of the root scan operation.
120  */
121 class TupleCorrelation
122 {
123 public:
124   static const Uint32 wordCount = 1;
125 
TupleCorrelation()126   explicit TupleCorrelation()
127   : m_correlation((tupleNotFound<<16) | tupleNotFound)
128   {}
129 
130   /** Conversion to/from Uint32 to store/fetch from buffers */
TupleCorrelation(Uint32 val)131   explicit TupleCorrelation(Uint32 val)
132   : m_correlation(val)
133   {}
134 
toUint32() const135   Uint32 toUint32() const
136   { return m_correlation; }
137 
getTupleId() const138   Uint16 getTupleId() const
139   { return m_correlation & 0xffff;}
140 
getParentTupleId() const141   Uint16 getParentTupleId() const
142   { return m_correlation >> 16;}
143 
144 private:
145   Uint32 m_correlation;
146 }; // class TupleCorrelation
147 
148 class CorrelationData
149 {
150 public:
151   static const Uint32 wordCount = 3;
152 
CorrelationData(const Uint32 * tupleData,Uint32 tupleLength)153   explicit CorrelationData(const Uint32* tupleData, Uint32 tupleLength):
154     m_corrPart(tupleData + tupleLength - wordCount)
155   {
156     assert(tupleLength >= wordCount);
157     assert(AttributeHeader(m_corrPart[0]).getAttributeId()
158            == AttributeHeader::CORR_FACTOR64);
159     assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
160     assert(getTupleCorrelation().getTupleId()<tupleNotFound);
161     assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
162   }
163 
getRootReceiverId() const164   Uint32 getRootReceiverId() const
165   { return m_corrPart[2];}
166 
getTupleCorrelation() const167   const TupleCorrelation getTupleCorrelation() const
168   { return TupleCorrelation(m_corrPart[1]); }
169 
170 private:
171   const Uint32* const m_corrPart;
172 }; // class CorrelationData
173 
174 /**
175  * If a query has a scan operation as its root, then that scan will normally
176  * read from several fragments of its target table. Each such root fragment
177  * scan, along with any child lookup operations that are spawned from it,
178  * runs independently, in the sense that:
179  * - The API will know when it has received all data from a fragment for a
180  *   given batch and all child operations spawned from it.
181  * - When one fragment is complete (for a batch) the API will make these data
182  *   avaliable to the application, even if other fragments are not yet complete.
183  * - The tuple identifiers that are used for matching children with parents are
184  *   only guaranteed to be unique within one batch, operation, and root
185  *   operation fragment. Tuples derived from different root fragments must
186  *   thus be kept apart.
187  *
188  * This class manages the state of one such read operation, from one particular
189  * fragment of the target table of the root operation. If the root operation
190  * is a lookup, then there will be only one instance of this class.
191  */
192 class NdbRootFragment {
193 public:
194   /** Build hash map for mapping from root receiver id to NdbRootFragment
195    * instance.*/
196   static void buildReciverIdMap(NdbRootFragment* frags,
197                                 Uint32 noOfFrags);
198 
199   /** Find NdbRootFragment instance corresponding to a given root receiver id.*/
200   static NdbRootFragment* receiverIdLookup(NdbRootFragment* frags,
201                                            Uint32 noOfFrags,
202                                            Uint32 receiverId);
203 
204   explicit NdbRootFragment();
205 
206   ~NdbRootFragment();
207 
208   /**
209    * Initialize object.
210    * @param query Enclosing query.
211    * @param fragNo This object manages state for reading from the fragNo'th
212    * fragment that the root operation accesses.
213    */
214   void init(NdbQueryImpl& query, Uint32 fragNo);
215 
216   static void clear(NdbRootFragment* frags, Uint32 noOfFrags);
217 
getFragNo() const218   Uint32 getFragNo() const
219   { return m_fragNo; }
220 
221   /**
222    * Prepare for receiving another batch of results.
223    */
224   void prepareNextReceiveSet();
225 
226   bool hasRequestedMore() const;
227 
228   /**
229    * Prepare for reading another batch of results.
230    */
231   void grabNextResultSet();                     // Need mutex lock
232 
233   bool hasReceivedMore() const;                 // Need mutex lock
234 
235   void setReceivedMore();                       // Need mutex lock
236 
incrOutstandingResults(Int32 delta)237   void incrOutstandingResults(Int32 delta)
238   {
239     if (traceSignals) {
240       ndbout << "incrOutstandingResults: " << m_outstandingResults
241              << ", with: " << delta
242              << endl;
243     }
244     m_outstandingResults += delta;
245     assert(!(m_confReceived && m_outstandingResults<0));
246   }
247 
throwRemainingResults()248   void throwRemainingResults()
249   {
250     if (traceSignals) {
251       ndbout << "throwRemainingResults: " << m_outstandingResults
252              << endl;
253     }
254     m_outstandingResults = 0;
255     m_confReceived = true;
256     postFetchRelease();
257   }
258 
259   void setConfReceived(Uint32 tcPtrI);
260 
261   /**
262    * The root operation will read from a number of fragments of a table.
263    * This method checks if all results for the current batch has been
264    * received for a given fragment. This includes both results for the root
265    * operation and any child operations. Note that child operations may access
266    * other fragments; the fragment number only refers to what
267    * the root operation does.
268    *
269    * @return True if current batch is complete for this fragment.
270    */
isFragBatchComplete() const271   bool isFragBatchComplete() const
272   {
273     assert(m_fragNo!=voidFragNo);
274     return m_confReceived && m_outstandingResults==0;
275   }
276 
277   /**
278    * Get the result stream that handles results derived from this root
279    * fragment for a particular operation.
280    * @param operationNo The id of the operation.
281    * @return The result stream for this root fragment.
282    */
283   NdbResultStream& getResultStream(Uint32 operationNo) const;
284 
getResultStream(const NdbQueryOperationImpl & op) const285   NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const
286   { return getResultStream(op.getQueryOperationDef().getOpNo()); }
287 
288   Uint32 getReceiverId() const;
289   Uint32 getReceiverTcPtrI() const;
290 
291   /**
292    * @return True if there are no more batches to be received for this fragment.
293    */
294   bool finalBatchReceived() const;
295 
296   /**
297    * @return True if there are no more results from this root fragment (for
298    * the current batch).
299    */
300   bool isEmpty() const;
301 
302   /**
303    * This method is used for marking which streams belonging to this
304    * NdbRootFragment which has remaining batches for a sub scan
305    * instantiated from the current batch of its parent operation.
306    */
setRemainingSubScans(Uint32 nodeMask)307   void setRemainingSubScans(Uint32 nodeMask)
308   {
309     m_remainingScans = nodeMask;
310   }
311 
312   /** Release resources after last row has been returned */
313   void postFetchRelease();
314 
315 private:
316   /** No copying.*/
317   NdbRootFragment(const NdbRootFragment&);
318   NdbRootFragment& operator=(const NdbRootFragment&);
319 
320   STATIC_CONST( voidFragNo = 0xffffffff);
321 
322   /** Enclosing query.*/
323   NdbQueryImpl* m_query;
324 
325   /** Number of the root operation fragment.*/
326   Uint32 m_fragNo;
327 
328   /** For processing results originating from this root fragment (Array of).*/
329   NdbResultStream* m_resultStreams;
330 
331   /**
332    * Number of requested (pre-)fetches which has either not completed
333    * from datanodes yet, or which are completed, but not consumed.
334    * (Which implies they are also counted in m_availResultSets)
335    */
336   Uint32 m_pendingRequests;
337 
338   /**
339    * Number of available 'm_pendingRequests' ( <= m_pendingRequests)
340    * which has been completely received. Will be made available
341    * for reading by calling ::grabNextResultSet()
342    */
343   Uint32 m_availResultSets;   // Need mutex
344 
345   /**
346    * The number of outstanding TCKEYREF or TRANSID_AI messages to receive
347    * for the fragment. This includes both messages related to the
348    * root operation and any descendant operation that was instantiated as
349    * a consequence of tuples found by the root operation.
350    * This number may temporarily be negative if e.g. TRANSID_AI arrives
351    * before SCAN_TABCONF.
352    */
353   Int32 m_outstandingResults;
354 
355   /**
356    * This is an array with one element for each fragment that the root
357    * operation accesses (i.e. one for a lookup, all for a table scan).
358    *
359    * Each element is true iff a SCAN_TABCONF (for that fragment) or
360    * TCKEYCONF message has been received
361    */
362   bool m_confReceived;
363 
364   /**
365    * A bitmask of operation id's for which we will receive more
366    * ResultSets in a NEXTREQ.
367    */
368   Uint32 m_remainingScans;
369 
370   /**
371    * Used for implementing a hash map from root receiver ids to a
372    * NdbRootFragment instance. m_idMapHead is the index of the first
373    * NdbRootFragment in the m_fragNo'th hash bucket.
374    */
375   int m_idMapHead;
376 
377   /**
378    * Used for implementing a hash map from root receiver ids to a
379    * NdbRootFragment instance. m_idMapNext is the index of the next
380    * NdbRootFragment in the same hash bucket as this one.
381    */
382   int m_idMapNext;
383 }; //NdbRootFragment
384 
385 /**
386  * 'class NdbResultSet' is a helper for 'class NdbResultStream'.
387  *  It manages the buffers which rows are received into and
388  *  read from.
389  */
390 class NdbResultSet
391 {
392   friend class NdbResultStream;
393 
394 public:
395   explicit NdbResultSet();
396 
397   void init(NdbQueryImpl& query,
398             Uint32 maxRows, Uint32 bufferSize);
399 
prepareReceive(NdbReceiver & receiver)400   void prepareReceive(NdbReceiver& receiver)
401   {
402     m_rowCount = 0;
403     receiver.prepareReceive(m_buffer);
404   }
405 
getRowCount() const406   Uint32 getRowCount() const
407   { return m_rowCount; }
408 
409 private:
410   /** No copying.*/
411   NdbResultSet(const NdbResultSet&);
412   NdbResultSet& operator=(const NdbResultSet&);
413 
414   /** The buffers which we receive the results into */
415   NdbReceiverBuffer* m_buffer;
416 
417   /** Array of TupleCorrelations for all rows in m_buffer */
418   TupleCorrelation* m_correlations;
419 
420   /** The current #rows in 'm_buffer'.*/
421   Uint32 m_rowCount;
422 
423 }; // class NdbResultSet
424 
425 /**
426  * This class manages the subset of result data for one operation that is
427  * derived from one fragment of the root operation. Note that the result tuples
428  * may come from any fragment, but they all have initial ancestors from the
429  * same fragment of the root operation.
430  * For each operation there will thus be one NdbResultStream for each fragment
431  * that the root operation reads from (one in the case of lookups.)
432  * This class has an NdbReceiver object for processing tuples as well as
433  * structures for correlating child and parent tuples.
434  */
435 class NdbResultStream {
436 public:
437 
438   /**
439    * @param operation The operation for which we will receive results.
440    * @param rootFragNo 0..n-1 when the root operation reads from n fragments.
441    */
442   explicit NdbResultStream(NdbQueryOperationImpl& operation,
443                            NdbRootFragment& rootFrag);
444 
445   ~NdbResultStream();
446 
447   /**
448    * Prepare for receiving first results.
449    */
450   void prepare();
451 
452   /** Prepare for receiving next batch of scan results. */
453   void prepareNextReceiveSet();
454 
getReceiver()455   NdbReceiver& getReceiver()
456   { return m_receiver; }
457 
getReceiver() const458   const NdbReceiver& getReceiver() const
459   { return m_receiver; }
460 
getCurrentRow()461   const char* getCurrentRow()
462   { return m_receiver.getCurrentRow(); }
463 
464   /**
465    * Process an incomming tuple for this stream. Extract parent and own tuple
466    * ids and pass it on to m_receiver.
467    *
468    * @param ptr buffer holding tuple.
469    * @param len buffer length.
470    */
471   void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
472                       TupleCorrelation correlation);
473 
474   /**
475    * A complete batch has been received for a fragment on this NdbResultStream,
476    * Update whatever required before the appl. are allowed to navigate the result.
477    * @return true if node and all its siblings have returned all rows.
478    */
479   bool prepareResultSet(Uint32 remainingScans);
480 
481   /**
482    * Navigate within the current ResultSet to resp. first and next row.
483    * For non-parent operations in the pushed query, navigation is with respect
484    * to any preceding parents which results in this ResultSet depends on.
485    * Returns either the tupleNo within TupleSet[] which we navigated to, or
486    * tupleNotFound().
487    */
488   Uint16 firstResult();
489   Uint16 nextResult();
490 
491   /**
492    * Returns true if last row matching the current parent tuple has been
493    * consumed.
494    */
isEmpty() const495   bool isEmpty() const
496   { return m_iterState == Iter_finished; }
497 
498   /**
499    * This method
500    * returns true if this result stream holds the last batch of a sub scan.
501    * This means that it is the last batch of the scan that was instantiated
502    * from the current batch of its parent operation.
503    */
isSubScanComplete(Uint32 remainingScans) const504   bool isSubScanComplete(Uint32 remainingScans) const
505   {
506     /**
507      * Find the node number seen by the SPJ block. Since a unique index
508      * operation will have two distincts nodes in the tree used by the
509      * SPJ block, this number may be different from 'opNo'.
510      */
511     const Uint32 internalOpNo = m_operation.getInternalOpNo();
512 
513     const bool complete = !((remainingScans >> internalOpNo) & 1);
514     return complete;
515   }
516 
isScanQuery() const517   bool isScanQuery() const
518   { return (m_properties & Is_Scan_Query); }
519 
isScanResult() const520   bool isScanResult() const
521   { return (m_properties & Is_Scan_Result); }
522 
isInnerJoin() const523   bool isInnerJoin() const
524   { return (m_properties & Is_Inner_Join); }
525 
526   /** For debugging.*/
527   friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
528 
529   /**
530    * TupleSet contain two logically distinct set of information:
531    *
532    *  - Child/Parent correlation set required to correlate
533    *    child tuples with its parents. Child/Tuple pairs are indexed
534    *    by tuple number which is the same as the order in which tuples
535    *    appear in the NdbReceiver buffers.
536    *
537    *  - A HashMap on 'm_parentId' used to locate tuples correlated
538    *    to a parent tuple. Indexes by hashing the parentId such that:
539    *     - [hash(parentId)].m_hash_head will then index the first
540    *       TupleSet entry potential containing the parentId to locate.
541    *     - .m_hash_next in the indexed TupleSet may index the next TupleSet
542    *       to considder.
543    *
544    * Both the child/parent correlation set and the parentId HashMap has been
545    * folded into the same structure on order to reduce number of objects
546    * being dynamically allocated.
547    * As an advantage this results in an autoscaling of the hash bucket size .
548    *
549    * Structure is only present if 'isScanQuery'
550    */
551   class TupleSet {
552   public:
553                         // Tuple ids are unique within this batch and stream
554     Uint16 m_parentId;  // Id of parent tuple which this tuple is correlated with
555     Uint16 m_tupleId;   // Id of this tuple
556 
557     Uint16 m_hash_head; // Index of first item in TupleSet[] matching a hashed parentId.
558     Uint16 m_hash_next; // 'next' index matching
559 
560     bool   m_skip;      // Skip this tuple in result processing for now
561 
562     /** If the n'th bit is set, then a matching tuple for the n,th child has been seen.
563      * This information is needed when generating left join tuples for those tuples
564      * that had no matching children.*/
565     Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
566 
TupleSet()567     explicit TupleSet() : m_hash_head(tupleNotFound)
568     {}
569 
570   private:
571     /** No copying.*/
572     TupleSet(const TupleSet&);
573     TupleSet& operator=(const TupleSet&);
574   };
575 
576 private:
577   /**
578    * This stream handles results derived from specified
579    * m_rootFrag of the root operation.
580    */
581   const NdbRootFragment& m_rootFrag;
582 
583   /** Operation to which this resultStream belong.*/
584   NdbQueryOperationImpl& m_operation;
585 
586   /** ResultStream for my parent operation, or NULL if I am root */
587   NdbResultStream* const m_parent;
588 
589   const enum properties
590   {
591     Is_Scan_Query = 0x01,
592     Is_Scan_Result = 0x02,
593     Is_Inner_Join = 0x10
594   } m_properties;
595 
596   /** The receiver object that unpacks transid_AI messages.*/
597   NdbReceiver m_receiver;
598 
599   /**
600    * ResultSets are received into and read from this stream,
601    * possibly doublebuffered,
602    */
603   NdbResultSet m_resultSets[2];
604   Uint32 m_read;  // We read from m_resultSets[m_read]
605   Uint32 m_recv;  // We receive into m_resultSets[m_recv]
606 
607   /** This is the state of the iterator used by firstResult(), nextResult().*/
608   enum
609   {
610     /** The first row has not been fetched yet.*/
611     Iter_notStarted,
612     /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/
613     Iter_started,
614     /** Last row for current ResultSet has been returned.*/
615     Iter_finished
616   } m_iterState;
617 
618   /**
619    * Tuple id of the current tuple, or 'tupleNotFound'
620    * if Iter_notStarted or Iter_finished.
621    */
622   Uint16 m_currentRow;
623 
624   /** Max #rows which this stream may recieve in its TupleSet structures */
625   Uint32 m_maxRows;
626 
627   /** TupleSet contains the correlation between parent/childs */
628   TupleSet* m_tupleSet;
629 
630   void buildResultCorrelations();
631 
getTupleId(Uint16 tupleNo) const632   Uint16 getTupleId(Uint16 tupleNo) const
633   { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
634 
getCurrentTupleId() const635   Uint16 getCurrentTupleId() const
636   { return (m_currentRow==tupleNotFound) ? tupleNotFound : getTupleId(m_currentRow); }
637 
638   Uint16 findTupleWithParentId(Uint16 parentId) const;
639 
640   Uint16 findNextTuple(Uint16 tupleNo) const;
641 
642   /** No copying.*/
643   NdbResultStream(const NdbResultStream&);
644   NdbResultStream& operator=(const NdbResultStream&);
645 }; //class NdbResultStream
646 
647 //////////////////////////////////////////////
648 /////////  NdbBulkAllocator methods ///////////
649 //////////////////////////////////////////////
650 
NdbBulkAllocator(size_t objSize)651 NdbBulkAllocator::NdbBulkAllocator(size_t objSize)
652   :m_objSize(objSize),
653    m_maxObjs(0),
654    m_buffer(NULL),
655    m_nextObjNo(0)
656 {}
657 
init(Uint32 maxObjs)658 int NdbBulkAllocator::init(Uint32 maxObjs)
659 {
660   assert(m_buffer == NULL);
661   m_maxObjs = maxObjs;
662   // Add check for buffer overrun.
663   m_buffer = new char[m_objSize*m_maxObjs+1];
664   if (unlikely(m_buffer == NULL))
665   {
666     return Err_MemoryAlloc;
667   }
668   m_buffer[m_maxObjs * m_objSize] = endMarker;
669   return 0;
670 }
671 
reset()672 void NdbBulkAllocator::reset(){
673   // Overrun check.
674   assert(m_buffer == NULL || m_buffer[m_maxObjs * m_objSize] == endMarker);
675   delete [] m_buffer;
676   m_buffer = NULL;
677   m_nextObjNo = 0;
678   m_maxObjs = 0;
679 }
680 
allocObjMem(Uint32 noOfObjs)681 void* NdbBulkAllocator::allocObjMem(Uint32 noOfObjs)
682 {
683   assert(m_nextObjNo + noOfObjs <=  m_maxObjs);
684   void * const result = m_buffer+m_objSize*m_nextObjNo;
685   m_nextObjNo += noOfObjs;
686   return m_nextObjNo > m_maxObjs ? NULL : result;
687 }
688 
689 ///////////////////////////////////////////
690 /////////  NdbResultSet methods ///////////
691 ///////////////////////////////////////////
NdbResultSet()692 NdbResultSet::NdbResultSet() :
693   m_buffer(NULL),
694   m_correlations(NULL),
695   m_rowCount(0)
696 {}
697 
698 void
init(NdbQueryImpl & query,Uint32 maxRows,Uint32 bufferSize)699 NdbResultSet::init(NdbQueryImpl& query,
700                    Uint32 maxRows,
701                    Uint32 bufferSize)
702 {
703   {
704     NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
705     Uint32 *buffer = reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(bufferSize));
706     m_buffer = NdbReceiver::initReceiveBuffer(buffer, bufferSize, maxRows);
707 
708     if (query.getQueryDef().isScanQuery())
709     {
710       m_correlations = reinterpret_cast<TupleCorrelation*>
711                          (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation)));
712     }
713   }
714 }
715 
716 //////////////////////////////////////////////
717 /////////  NdbResultStream methods ///////////
718 //////////////////////////////////////////////
719 
NdbResultStream(NdbQueryOperationImpl & operation,NdbRootFragment & rootFrag)720 NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation,
721                                  NdbRootFragment& rootFrag)
722 :
723   m_rootFrag(rootFrag),
724   m_operation(operation),
725   m_parent(operation.getParentOperation()
726         ? &rootFrag.getResultStream(*operation.getParentOperation())
727         : NULL),
728   m_properties(
729     (enum properties)
730      ((operation.getQueryDef().isScanQuery()
731        ? Is_Scan_Query : 0)
732      | (operation.getQueryOperationDef().isScanOperation()
733        ? Is_Scan_Result : 0)
734      | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
735        ? Is_Inner_Join : 0))),
736   m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
737   m_resultSets(), m_read(0xffffffff), m_recv(0),
738   m_iterState(Iter_finished),
739   m_currentRow(tupleNotFound),
740   m_maxRows(0),
741   m_tupleSet(NULL)
742 {};
743 
~NdbResultStream()744 NdbResultStream::~NdbResultStream()
745 {
746   for (int i = static_cast<int>(m_maxRows)-1; i >= 0; i--)
747   {
748     m_tupleSet[i].~TupleSet();
749   }
750 }
751 
752 void
prepare()753 NdbResultStream::prepare()
754 {
755   NdbQueryImpl &query = m_operation.getQuery();
756 
757   const Uint32 batchBufferSize = m_operation.getBatchBufferSize();
758   if (isScanQuery())
759   {
760     /* Parent / child correlation is only relevant for scan type queries
761      * Don't create a m_tupleSet with these correlation id's for lookups!
762      */
763     m_maxRows  = m_operation.getMaxBatchRows();
764     m_tupleSet =
765       new (query.getTupleSetAlloc().allocObjMem(m_maxRows))
766       TupleSet[m_maxRows];
767 
768     // Scan results may be double buffered
769     m_resultSets[0].init(query, m_maxRows, batchBufferSize);
770     m_resultSets[1].init(query, m_maxRows, batchBufferSize);
771   }
772   else
773   {
774     m_maxRows = 1;
775     m_resultSets[0].init(query, m_maxRows, batchBufferSize);
776   }
777 
778   /* Alloc buffer for unpacked NdbRecord row */
779   const Uint32 rowSize = m_operation.getRowSize();
780   assert((rowSize % sizeof(Uint32)) == 0);
781   char *rowBuffer = reinterpret_cast<char*>(query.getRowBufferAlloc().allocObjMem(rowSize));
782   assert(rowBuffer != NULL);
783 
784   m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, &m_operation);
785   m_receiver.do_setup_ndbrecord(
786                           m_operation.getNdbRecord(),
787                           rowBuffer,
788                           false, /*read_range_no*/
789                           false  /*read_key_info*/);
790 } //NdbResultStream::prepare
791 
792 /** Locate, and return 'tupleNo', of first tuple with specified parentId.
793  *  parentId == tupleNotFound is use as a special value for iterating results
794  *  from the root operation in the order which they was inserted by
795  *  ::buildResultCorrelations()
796  *
797  *  Position of 'currentRow' is *not* updated and should be modified by callee
798  *  if it want to keep the new position.
799  */
800 Uint16
findTupleWithParentId(Uint16 parentId) const801 NdbResultStream::findTupleWithParentId(Uint16 parentId) const
802 {
803   assert ((parentId==tupleNotFound) == (m_parent==NULL));
804 
805   if (likely(m_resultSets[m_read].m_rowCount>0))
806   {
807     if (m_tupleSet==NULL)
808     {
809       assert (m_resultSets[m_read].m_rowCount <= 1);
810       return 0;
811     }
812 
813     const Uint16 hash = (parentId % m_maxRows);
814     Uint16 currentRow = m_tupleSet[hash].m_hash_head;
815     while (currentRow != tupleNotFound)
816     {
817       assert(currentRow < m_maxRows);
818       if (m_tupleSet[currentRow].m_skip == false &&
819           m_tupleSet[currentRow].m_parentId == parentId)
820       {
821         return currentRow;
822       }
823       currentRow = m_tupleSet[currentRow].m_hash_next;
824     }
825   }
826   return tupleNotFound;
827 } //NdbResultStream::findTupleWithParentId()
828 
829 
830 /** Locate, and return 'tupleNo', of next tuple with same parentId as currentRow
831  *  Position of 'currentRow' is *not* updated and should be modified by callee
832  *  if it want to keep the new position.
833  */
834 Uint16
findNextTuple(Uint16 tupleNo) const835 NdbResultStream::findNextTuple(Uint16 tupleNo) const
836 {
837   if (tupleNo!=tupleNotFound && m_tupleSet!=NULL)
838   {
839     assert(tupleNo < m_maxRows);
840     Uint16 parentId = m_tupleSet[tupleNo].m_parentId;
841     Uint16 nextRow  = m_tupleSet[tupleNo].m_hash_next;
842 
843     while (nextRow != tupleNotFound)
844     {
845       assert(nextRow < m_maxRows);
846       if (m_tupleSet[nextRow].m_skip == false &&
847           m_tupleSet[nextRow].m_parentId == parentId)
848       {
849         return nextRow;
850       }
851       nextRow = m_tupleSet[nextRow].m_hash_next;
852     }
853   }
854   return tupleNotFound;
855 } //NdbResultStream::findNextTuple()
856 
857 
858 Uint16
firstResult()859 NdbResultStream::firstResult()
860 {
861   Uint16 parentId = tupleNotFound;
862   if (m_parent!=NULL)
863   {
864     parentId = m_parent->getCurrentTupleId();
865     if (parentId == tupleNotFound)
866     {
867       m_currentRow = tupleNotFound;
868       m_iterState = Iter_finished;
869       return tupleNotFound;
870     }
871   }
872 
873   if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
874   {
875     m_iterState = Iter_started;
876     const char *p = m_receiver.getRow(m_resultSets[m_read].m_buffer, m_currentRow);
877     assert(p != NULL);  ((void)p);
878     return m_currentRow;
879   }
880 
881   m_iterState = Iter_finished;
882   return tupleNotFound;
883 } //NdbResultStream::firstResult()
884 
885 Uint16
nextResult()886 NdbResultStream::nextResult()
887 {
888   // Fetch next row for this stream
889   if (m_currentRow != tupleNotFound &&
890       (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
891   {
892     m_iterState = Iter_started;
893     const char *p = m_receiver.getRow(m_resultSets[m_read].m_buffer, m_currentRow);
894     assert(p != NULL);  ((void)p);
895     return m_currentRow;
896   }
897   m_iterState = Iter_finished;
898   return tupleNotFound;
899 } //NdbResultStream::nextResult()
900 
901 /**
902  * Callback when a TRANSID_AI signal (receive row) is processed.
903  */
904 void
execTRANSID_AI(const Uint32 * ptr,Uint32 len,TupleCorrelation correlation)905 NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
906                                 TupleCorrelation correlation)
907 {
908   NdbResultSet& receiveSet = m_resultSets[m_recv];
909   if (isScanQuery())
910   {
911     /**
912      * Store TupleCorrelation.
913      */
914     receiveSet.m_correlations[receiveSet.m_rowCount] = correlation;
915   }
916 
917   m_receiver.execTRANSID_AI(ptr, len);
918   receiveSet.m_rowCount++;
919 } // NdbResultStream::execTRANSID_AI()
920 
921 /**
922  * Make preparation for another batch of results to be received.
923  * This NdbResultStream, and all its sibling will receive a batch
924  * of results from the datanodes.
925  */
926 void
prepareNextReceiveSet()927 NdbResultStream::prepareNextReceiveSet()
928 {
929   if (isScanQuery())          // Doublebuffered ResultSet[] if isScanQuery()
930   {
931     m_recv = (m_recv+1) % 2;  // Receive into next ResultSet
932     assert(m_recv != m_read);
933   }
934 
935   m_resultSets[m_recv].prepareReceive(m_receiver);
936 
937   /**
938    * If this stream will get new rows in the next batch, then so will
939    * all of its descendants.
940    */
941   for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
942        childNo++)
943   {
944     NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
945     m_rootFrag.getResultStream(child).prepareNextReceiveSet();
946   }
947 } //NdbResultStream::prepareNextReceiveSet
948 
949 /**
950  * Make preparations for another batch of result to be read:
951  *  - Advance to next NdbResultSet. (or reuse last)
952  *  - Fill in parent/child result correlations in m_tupleSet[]
953  *  - ... or reset m_tupleSet[] if we reuse the previous.
954  *  - Apply inner/outer join filtering to remove non qualifying
955  *    rows.
956  */
957 bool
prepareResultSet(Uint32 remainingScans)958 NdbResultStream::prepareResultSet(Uint32 remainingScans)
959 {
960   bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
961 
962   /**
963    * Prepare NdbResultSet for reading - either the next
964    * 'new' received from datanodes or reuse the last as has been
965    * determined by ::prepareNextReceiveSet()
966    */
967   const bool newResults = (m_read != m_recv);
968   m_read = m_recv;
969   const NdbResultSet& readResult = m_resultSets[m_read];
970 
971   if (m_tupleSet!=NULL)
972   {
973     if (newResults)
974     {
975       buildResultCorrelations();
976     }
977     else
978     {
979       // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
980       for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
981       {
982         m_tupleSet[tupleNo].m_skip = false;
983       }
984     }
985   }
986 
987   /**
988    * Recursively iterate all child results depth first.
989    * Filter away any result rows which should not be visible (yet) -
990    * Either due to incomplete child batches, or the join being an 'inner join'.
991    * Set result itterator state to 'before first' resultrow.
992    */
993   for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
994   {
995     const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
996     NdbResultStream& childStream = m_rootFrag.getResultStream(child);
997     const bool allSubScansComplete = childStream.prepareResultSet(remainingScans);
998 
999     Uint32 childId = child.getQueryOperationDef().getOpNo();
1000 
1001     /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
1002     const bool skipNonMatches = !allSubScansComplete ||      // 1)
1003                                 childStream.isInnerJoin();   // 2)
1004 
1005     if (m_tupleSet!=NULL)
1006     {
1007       for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
1008       {
1009         if (!m_tupleSet[tupleNo].m_skip)
1010         {
1011           Uint16 tupleId = getTupleId(tupleNo);
1012           if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
1013             m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
1014 
1015           /////////////////////////////////
1016           //  No child matched for this row. Making parent row visible
1017           //  will cause a NULL (outer join) row to be produced.
1018           //  Skip NULL row production when:
1019           //    1) Some child batches are not complete; they may contain later matches.
1020           //    2) Join type is 'inner join', skip as no child are matching.
1021           //    3) A match was found in a previous batch.
1022           //  Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
1023           //
1024           else if (skipNonMatches                                       // 1 & 2)
1025                ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
1026             m_tupleSet[tupleNo].m_skip = true;
1027         }
1028       }
1029     }
1030     isComplete &= allSubScansComplete;
1031   }
1032 
1033   // Set current position 'before first'
1034   m_iterState = Iter_notStarted;
1035   m_currentRow = tupleNotFound;
1036 
1037   return isComplete;
1038 } // NdbResultStream::prepareResultSet()
1039 
1040 
1041 /**
1042  * Fill m_tupleSet[] with correlation data between parent
1043  * and child tuples. The 'TupleCorrelation' is stored
1044  * in an array of TupleCorrelations in each ResultSet
1045  * by execTRANSID_AI().
1046  *
1047  * NOTE: In order to reduce work done when holding the
1048  * transporter mutex, the 'TupleCorrelation' is only stored
1049  * in the buffer when it arrives. Later (here) we build the
1050  * correlation hashMap immediately before we prepare to
1051  * read the NdbResultSet.
1052  */
1053 void
buildResultCorrelations()1054 NdbResultStream::buildResultCorrelations()
1055 {
1056   const NdbResultSet& readResult = m_resultSets[m_read];
1057 
1058 //if (m_tupleSet!=NULL)
1059   {
1060     /* Clear the hashmap structures */
1061     for (Uint32 i=0; i<m_maxRows; i++)
1062     {
1063       m_tupleSet[i].m_hash_head = tupleNotFound;
1064     }
1065 
1066     /* Rebuild correlation & hashmap from 'readResult' */
1067     for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
1068     {
1069       const Uint16 tupleId  = readResult.m_correlations[tupleNo].getTupleId();
1070       const Uint16 parentId = (m_parent!=NULL)
1071                                 ? readResult.m_correlations[tupleNo].getParentTupleId()
1072                                 : tupleNotFound;
1073 
1074       m_tupleSet[tupleNo].m_skip     = false;
1075       m_tupleSet[tupleNo].m_parentId = parentId;
1076       m_tupleSet[tupleNo].m_tupleId  = tupleId;
1077       m_tupleSet[tupleNo].m_hasMatchingChild.clear();
1078 
1079       /* Insert into parentId-hashmap */
1080       const Uint16 hash = (parentId % m_maxRows);
1081       if (m_parent==NULL)
1082       {
1083         /* Root stream: Insert sequentially in hash_next to make it
1084          * possible to use ::findTupleWithParentId() and ::findNextTuple()
1085          * to navigate even the root operation.
1086          */
1087         /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
1088         if (tupleNo==0)
1089           m_tupleSet[hash].m_hash_head  = tupleNo;
1090         else
1091           m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
1092         m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
1093       }
1094       else
1095       {
1096         /* Insert parentId in HashMap */
1097         m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
1098         m_tupleSet[hash].m_hash_head  = tupleNo;
1099       }
1100     }
1101   }
1102 } // NdbResultStream::buildResultCorrelations
1103 
1104 
1105 ///////////////////////////////////////////
1106 ////////  NdbRootFragment methods /////////
1107 ///////////////////////////////////////////
buildReciverIdMap(NdbRootFragment * frags,Uint32 noOfFrags)1108 void NdbRootFragment::buildReciverIdMap(NdbRootFragment* frags,
1109                                         Uint32 noOfFrags)
1110 {
1111   for(Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
1112   {
1113     const Uint32 receiverId = frags[fragNo].getReceiverId();
1114     /**
1115      * For reasons unknow, NdbObjectIdMap shifts ids two bits to the left,
1116      * so we must do the opposite to get a good hash distribution.
1117      */
1118     assert((receiverId & 0x3) == 0);
1119     const int hash =
1120       (receiverId >> 2) % noOfFrags;
1121     frags[fragNo].m_idMapNext = frags[hash].m_idMapHead;
1122     frags[hash].m_idMapHead = fragNo;
1123   }
1124 }
1125 
1126 //static
1127 NdbRootFragment*
receiverIdLookup(NdbRootFragment * frags,Uint32 noOfFrags,Uint32 receiverId)1128 NdbRootFragment::receiverIdLookup(NdbRootFragment* frags,
1129                                   Uint32 noOfFrags,
1130                                   Uint32 receiverId)
1131 {
1132   /**
1133    * For reasons unknow, NdbObjectIdMap shifts ids two bits to the left,
1134    * so we must do the opposite to get a good hash distribution.
1135    */
1136   assert((receiverId  & 0x3) == 0);
1137   const int hash = (receiverId >> 2) % noOfFrags;
1138   int current = frags[hash].m_idMapHead;
1139   assert(current < static_cast<int>(noOfFrags));
1140   while (current >= 0 && frags[current].getReceiverId() != receiverId)
1141   {
1142     current = frags[current].m_idMapNext;
1143     assert(current < static_cast<int>(noOfFrags));
1144   }
1145   if (unlikely (current < 0))
1146   {
1147     return NULL;
1148   }
1149   else
1150   {
1151     return frags+current;
1152   }
1153 }
1154 
1155 
NdbRootFragment()1156 NdbRootFragment::NdbRootFragment():
1157   m_query(NULL),
1158   m_fragNo(voidFragNo),
1159   m_resultStreams(NULL),
1160   m_pendingRequests(0),
1161   m_availResultSets(0),
1162   m_outstandingResults(0),
1163   m_confReceived(false),
1164   m_remainingScans(0xffffffff),
1165   m_idMapHead(-1),
1166   m_idMapNext(-1)
1167 {
1168 }
1169 
~NdbRootFragment()1170 NdbRootFragment::~NdbRootFragment()
1171 {
1172   assert(m_resultStreams==NULL);
1173 }
1174 
init(NdbQueryImpl & query,Uint32 fragNo)1175 void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo)
1176 {
1177   assert(m_fragNo==voidFragNo);
1178   m_query = &query;
1179   m_fragNo = fragNo;
1180 
1181   m_resultStreams = reinterpret_cast<NdbResultStream*>
1182      (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations()));
1183   assert(m_resultStreams!=NULL);
1184 
1185   for (unsigned opNo=0; opNo<query.getNoOfOperations(); opNo++)
1186   {
1187     NdbQueryOperationImpl& op = query.getQueryOperation(opNo);
1188     new (&m_resultStreams[opNo]) NdbResultStream(op,*this);
1189     m_resultStreams[opNo].prepare();
1190   }
1191 }
1192 
1193 /**
1194  * Release what we want need anymore after last available row has been
1195  * returned from datanodes.
1196  */
1197 void
postFetchRelease()1198 NdbRootFragment::postFetchRelease()
1199 {
1200   if (m_resultStreams != NULL)
1201   {
1202     for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
1203     {
1204       m_resultStreams[opNo].~NdbResultStream();
1205     }
1206   }
1207   /**
1208    * Don't 'delete' the object as it was in-place constructed from
1209    * ResultStreamAlloc'ed memory. Memory is released by
1210    * ResultStreamAlloc::reset().
1211    */
1212   m_resultStreams = NULL;
1213 }
1214 
1215 NdbResultStream&
getResultStream(Uint32 operationNo) const1216 NdbRootFragment::getResultStream(Uint32 operationNo) const
1217 {
1218   assert(m_resultStreams);
1219   return m_resultStreams[operationNo];
1220 }
1221 
1222 /**
1223  * Throw any pending ResultSets from specified rootFrags[]
1224  */
1225 //static
clear(NdbRootFragment * rootFrags,Uint32 noOfFrags)1226 void NdbRootFragment::clear(NdbRootFragment* rootFrags, Uint32 noOfFrags)
1227 {
1228   if (rootFrags != NULL)
1229   {
1230     for (Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
1231     {
1232       rootFrags[fragNo].m_pendingRequests = 0;
1233       rootFrags[fragNo].m_availResultSets = 0;
1234     }
1235   }
1236 }
1237 
1238 /**
1239  * Check if there has been requested more ResultSets from
1240  * this fragment which has not been consumed yet.
1241  * (This is also a candicate check for ::hasReceivedMore())
1242  */
hasRequestedMore() const1243 bool NdbRootFragment::hasRequestedMore() const
1244 {
1245   return (m_pendingRequests > 0);
1246 }
1247 
1248 /**
1249  * Signal that another complete ResultSet is available for
1250  * this NdbRootFragment.
1251  * Need mutex lock as 'm_availResultSets' is accesed both from
1252  * receiver and application thread.
1253  */
setReceivedMore()1254 void NdbRootFragment::setReceivedMore()        // Need mutex
1255 {
1256   assert(m_availResultSets==0);
1257   m_availResultSets++;
1258 }
1259 
1260 /**
1261  * Check if another ResultSets has been received and is available
1262  * for reading. It will be given to the application thread when it
1263  * call ::grabNextResultSet().
1264  * Need mutex lock as 'm_availResultSets' is accesed both from
1265  * receiver and application thread.
1266  */
hasReceivedMore() const1267 bool NdbRootFragment::hasReceivedMore() const  // Need mutex
1268 {
1269   return (m_availResultSets > 0);
1270 }
1271 
prepareNextReceiveSet()1272 void NdbRootFragment::prepareNextReceiveSet()
1273 {
1274   assert(m_fragNo!=voidFragNo);
1275   assert(m_outstandingResults == 0);
1276 
1277   for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++)
1278   {
1279     NdbResultStream& resultStream = getResultStream(opNo);
1280     if (!resultStream.isSubScanComplete(m_remainingScans))
1281     {
1282       /**
1283        * Reset resultStream and all its descendants, since all these
1284        * streams will get a new set of rows in the next batch.
1285        */
1286       resultStream.prepareNextReceiveSet();
1287     }
1288   }
1289   m_confReceived = false;
1290   m_pendingRequests++;
1291 }
1292 
1293 /**
1294  * Let the application thread takes ownership of an available
1295  * ResultSet, prepare it for reading first row.
1296  * Need mutex lock as 'm_availResultSets' is accesed both from
1297  * receiver and application thread.
1298  */
grabNextResultSet()1299 void NdbRootFragment::grabNextResultSet()  // Need mutex
1300 {
1301   assert(m_availResultSets>0);
1302   m_availResultSets--;
1303 
1304   assert(m_pendingRequests>0);
1305   m_pendingRequests--;
1306 
1307   NdbResultStream& rootStream = getResultStream(0);
1308   rootStream.prepareResultSet(m_remainingScans);
1309 
1310   /* Position at the first (sorted?) row available from this fragments.
1311    */
1312   rootStream.firstResult();
1313 }
1314 
setConfReceived(Uint32 tcPtrI)1315 void NdbRootFragment::setConfReceived(Uint32 tcPtrI)
1316 {
1317   /* For a query with a lookup root, there may be more than one TCKEYCONF
1318      message. For a scan, there should only be one SCAN_TABCONF per root
1319      fragment.
1320   */
1321   assert(!getResultStream(0).isScanQuery() || !m_confReceived);
1322   getResultStream(0).getReceiver().m_tcPtrI = tcPtrI;
1323   m_confReceived = true;
1324 }
1325 
finalBatchReceived() const1326 bool NdbRootFragment::finalBatchReceived() const
1327 {
1328   return m_confReceived && getReceiverTcPtrI()==RNIL;
1329 }
1330 
isEmpty() const1331 bool NdbRootFragment::isEmpty() const
1332 {
1333   return getResultStream(0).isEmpty();
1334 }
1335 
1336 /**
1337  * SPJ requests are identified by the receiver-id of the
1338  * *root* ResultStream for each RootFragment. Furthermore
1339  * a NEXTREQ use the tcPtrI saved in this ResultStream to
1340  * identify the 'cursor' to restart.
1341  *
1342  * We provide some convenient accessors for fetching this info
1343  */
getReceiverId() const1344 Uint32 NdbRootFragment::getReceiverId() const
1345 {
1346   return getResultStream(0).getReceiver().getId();
1347 }
1348 
getReceiverTcPtrI() const1349 Uint32 NdbRootFragment::getReceiverTcPtrI() const
1350 {
1351   return getResultStream(0).getReceiver().m_tcPtrI;
1352 }
1353 
1354 ///////////////////////////////////////////
1355 /////////  NdbQuery API methods ///////////
1356 ///////////////////////////////////////////
1357 
NdbQuery(NdbQueryImpl & impl)1358 NdbQuery::NdbQuery(NdbQueryImpl& impl):
1359   m_impl(impl)
1360 {}
1361 
~NdbQuery()1362 NdbQuery::~NdbQuery()
1363 {}
1364 
1365 Uint32
getNoOfOperations() const1366 NdbQuery::getNoOfOperations() const
1367 {
1368   return m_impl.getNoOfOperations();
1369 }
1370 
1371 NdbQueryOperation*
getQueryOperation(Uint32 index) const1372 NdbQuery::getQueryOperation(Uint32 index) const
1373 {
1374   return &m_impl.getQueryOperation(index).getInterface();
1375 }
1376 
1377 NdbQueryOperation*
getQueryOperation(const char * ident) const1378 NdbQuery::getQueryOperation(const char* ident) const
1379 {
1380   NdbQueryOperationImpl* op = m_impl.getQueryOperation(ident);
1381   return (op) ? &op->getInterface() : NULL;
1382 }
1383 
1384 Uint32
getNoOfParameters() const1385 NdbQuery::getNoOfParameters() const
1386 {
1387   return m_impl.getNoOfParameters();
1388 }
1389 
1390 const NdbParamOperand*
getParameter(const char * name) const1391 NdbQuery::getParameter(const char* name) const
1392 {
1393   return m_impl.getParameter(name);
1394 }
1395 
1396 const NdbParamOperand*
getParameter(Uint32 num) const1397 NdbQuery::getParameter(Uint32 num) const
1398 {
1399   return m_impl.getParameter(num);
1400 }
1401 
1402 int
setBound(const NdbRecord * keyRecord,const NdbIndexScanOperation::IndexBound * bound)1403 NdbQuery::setBound(const NdbRecord *keyRecord,
1404                    const NdbIndexScanOperation::IndexBound *bound)
1405 {
1406   const int error = m_impl.setBound(keyRecord,bound);
1407   if (unlikely(error)) {
1408     m_impl.setErrorCode(error);
1409     return -1;
1410   } else {
1411     return 0;
1412   }
1413 }
1414 
1415 NdbQuery::NextResultOutcome
nextResult(bool fetchAllowed,bool forceSend)1416 NdbQuery::nextResult(bool fetchAllowed, bool forceSend)
1417 {
1418   return m_impl.nextResult(fetchAllowed, forceSend);
1419 }
1420 
1421 void
close(bool forceSend)1422 NdbQuery::close(bool forceSend)
1423 {
1424   m_impl.close(forceSend);
1425 }
1426 
1427 NdbTransaction*
getNdbTransaction() const1428 NdbQuery::getNdbTransaction() const
1429 {
1430   return &m_impl.getNdbTransaction();
1431 }
1432 
1433 const NdbError&
getNdbError() const1434 NdbQuery::getNdbError() const {
1435   return m_impl.getNdbError();
1436 };
1437 
isPrunable(bool & prunable) const1438 int NdbQuery::isPrunable(bool& prunable) const
1439 {
1440   return m_impl.isPrunable(prunable);
1441 }
1442 
NdbQueryOperation(NdbQueryOperationImpl & impl)1443 NdbQueryOperation::NdbQueryOperation(NdbQueryOperationImpl& impl)
1444   :m_impl(impl)
1445 {}
~NdbQueryOperation()1446 NdbQueryOperation::~NdbQueryOperation()
1447 {}
1448 
1449 Uint32
getNoOfParentOperations() const1450 NdbQueryOperation::getNoOfParentOperations() const
1451 {
1452   return m_impl.getNoOfParentOperations();
1453 }
1454 
1455 NdbQueryOperation*
getParentOperation(Uint32 i) const1456 NdbQueryOperation::getParentOperation(Uint32 i) const
1457 {
1458   return &m_impl.getParentOperation(i).getInterface();
1459 }
1460 
1461 Uint32
getNoOfChildOperations() const1462 NdbQueryOperation::getNoOfChildOperations() const
1463 {
1464   return m_impl.getNoOfChildOperations();
1465 }
1466 
1467 NdbQueryOperation*
getChildOperation(Uint32 i) const1468 NdbQueryOperation::getChildOperation(Uint32 i) const
1469 {
1470   return &m_impl.getChildOperation(i).getInterface();
1471 }
1472 
1473 const NdbQueryOperationDef&
getQueryOperationDef() const1474 NdbQueryOperation::getQueryOperationDef() const
1475 {
1476   return m_impl.getQueryOperationDef().getInterface();
1477 }
1478 
1479 NdbQuery&
getQuery() const1480 NdbQueryOperation::getQuery() const {
1481   return m_impl.getQuery().getInterface();
1482 };
1483 
1484 NdbRecAttr*
getValue(const char * anAttrName,char * resultBuffer)1485 NdbQueryOperation::getValue(const char* anAttrName,
1486 			    char* resultBuffer)
1487 {
1488   return m_impl.getValue(anAttrName, resultBuffer);
1489 }
1490 
1491 NdbRecAttr*
getValue(Uint32 anAttrId,char * resultBuffer)1492 NdbQueryOperation::getValue(Uint32 anAttrId,
1493 			    char* resultBuffer)
1494 {
1495   return m_impl.getValue(anAttrId, resultBuffer);
1496 }
1497 
1498 NdbRecAttr*
getValue(const NdbDictionary::Column * column,char * resultBuffer)1499 NdbQueryOperation::getValue(const NdbDictionary::Column* column,
1500 			    char* resultBuffer)
1501 {
1502   if (unlikely(column==NULL)) {
1503     m_impl.getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
1504     return NULL;
1505   }
1506   return m_impl.getValue(NdbColumnImpl::getImpl(*column), resultBuffer);
1507 }
1508 
1509 int
setResultRowBuf(const NdbRecord * rec,char * resBuffer,const unsigned char * result_mask)1510 NdbQueryOperation::setResultRowBuf (
1511                        const NdbRecord *rec,
1512                        char* resBuffer,
1513                        const unsigned char* result_mask)
1514 {
1515   if (unlikely(rec==0 || resBuffer==0)) {
1516     m_impl.getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
1517     return -1;
1518   }
1519   return m_impl.setResultRowBuf(rec, resBuffer, result_mask);
1520 }
1521 
1522 int
setResultRowRef(const NdbRecord * rec,const char * & bufRef,const unsigned char * result_mask)1523 NdbQueryOperation::setResultRowRef (
1524                        const NdbRecord* rec,
1525                        const char* & bufRef,
1526                        const unsigned char* result_mask)
1527 {
1528   return m_impl.setResultRowRef(rec, bufRef, result_mask);
1529 }
1530 
1531 int
setOrdering(NdbQueryOptions::ScanOrdering ordering)1532 NdbQueryOperation::setOrdering(NdbQueryOptions::ScanOrdering ordering)
1533 {
1534   return m_impl.setOrdering(ordering);
1535 }
1536 
1537 NdbQueryOptions::ScanOrdering
getOrdering() const1538 NdbQueryOperation::getOrdering() const
1539 {
1540   return m_impl.getOrdering();
1541 }
1542 
setParallelism(Uint32 parallelism)1543 int NdbQueryOperation::setParallelism(Uint32 parallelism){
1544   return m_impl.setParallelism(parallelism);
1545 }
1546 
setMaxParallelism()1547 int NdbQueryOperation::setMaxParallelism(){
1548   return m_impl.setMaxParallelism();
1549 }
1550 
setAdaptiveParallelism()1551 int NdbQueryOperation::setAdaptiveParallelism(){
1552   return m_impl.setAdaptiveParallelism();
1553 }
1554 
setBatchSize(Uint32 batchSize)1555 int NdbQueryOperation::setBatchSize(Uint32 batchSize){
1556   return m_impl.setBatchSize(batchSize);
1557 }
1558 
setInterpretedCode(const NdbInterpretedCode & code) const1559 int NdbQueryOperation::setInterpretedCode(const NdbInterpretedCode& code) const
1560 {
1561   return m_impl.setInterpretedCode(code);
1562 }
1563 
1564 NdbQuery::NextResultOutcome
firstResult()1565 NdbQueryOperation::firstResult()
1566 {
1567   return m_impl.firstResult();
1568 }
1569 
1570 NdbQuery::NextResultOutcome
nextResult(bool fetchAllowed,bool forceSend)1571 NdbQueryOperation::nextResult(bool fetchAllowed, bool forceSend)
1572 {
1573   return m_impl.nextResult(fetchAllowed, forceSend);
1574 }
1575 
1576 bool
isRowNULL() const1577 NdbQueryOperation::isRowNULL() const
1578 {
1579   return m_impl.isRowNULL();
1580 }
1581 
1582 bool
isRowChanged() const1583 NdbQueryOperation::isRowChanged() const
1584 {
1585   return m_impl.isRowChanged();
1586 }
1587 
1588 /////////////////////////////////////////////////
1589 /////////  NdbQueryParamValue methods ///////////
1590 /////////////////////////////////////////////////
1591 
1592 enum Type
1593 {
1594   Type_NULL,
1595   Type_raw,        // Raw data formated according to bound Column format.
1596   Type_raw_shrink, // As Type_raw, except short VarChar has to be shrinked.
1597   Type_string,     // '\0' terminated C-type string, char/varchar data only
1598   Type_Uint16,
1599   Type_Uint32,
1600   Type_Uint64,
1601   Type_Double
1602 };
1603 
NdbQueryParamValue(Uint16 val)1604 NdbQueryParamValue::NdbQueryParamValue(Uint16 val) : m_type(Type_Uint16)
1605 { m_value.uint16 = val; }
1606 
NdbQueryParamValue(Uint32 val)1607 NdbQueryParamValue::NdbQueryParamValue(Uint32 val) : m_type(Type_Uint32)
1608 { m_value.uint32 = val; }
1609 
NdbQueryParamValue(Uint64 val)1610 NdbQueryParamValue::NdbQueryParamValue(Uint64 val) : m_type(Type_Uint64)
1611 { m_value.uint64 = val; }
1612 
NdbQueryParamValue(double val)1613 NdbQueryParamValue::NdbQueryParamValue(double val) : m_type(Type_Double)
1614 { m_value.dbl = val; }
1615 
1616 // C-type string, terminated by '\0'
NdbQueryParamValue(const char * val)1617 NdbQueryParamValue::NdbQueryParamValue(const char* val) : m_type(Type_string)
1618 { m_value.string = val; }
1619 
1620 // Raw data
NdbQueryParamValue(const void * val,bool shrinkVarChar)1621 NdbQueryParamValue::NdbQueryParamValue(const void* val, bool shrinkVarChar)
1622  : m_type(shrinkVarChar ? Type_raw_shrink : Type_raw)
1623 { m_value.raw = val; }
1624 
1625 // NULL-value, also used as optional end marker
NdbQueryParamValue()1626 NdbQueryParamValue::NdbQueryParamValue() : m_type(Type_NULL)
1627 {}
1628 
1629 
1630 int
serializeValue(const class NdbColumnImpl & column,Uint32Buffer & dst,Uint32 & len,bool & isNull) const1631 NdbQueryParamValue::serializeValue(const class NdbColumnImpl& column,
1632                                    Uint32Buffer& dst,
1633                                    Uint32& len,
1634                                    bool& isNull) const
1635 {
1636   const Uint32 maxSize = column.getSizeInBytes();
1637   isNull = false;
1638   // Start at (32-bit) word boundary.
1639   dst.skipRestOfWord();
1640 
1641   // Fetch parameter value and length.
1642   // Rudimentary typecheck of paramvalue: At least length should be as expected:
1643   //  - Extend with more types if required
1644   //  - Considder to add simple type conversion, ex: Int64 -> Int32
1645   //  - Or
1646   //     -- Represent all exact numeric as Int64 and convert to 'smaller' int
1647   //     -- Represent all floats as Double and convert to smaller floats
1648   //
1649   switch(m_type)
1650   {
1651     case Type_NULL:
1652       isNull = true;
1653       len = 0;
1654       break;
1655 
1656     case Type_Uint16:
1657       if (unlikely(column.getType() != NdbDictionary::Column::Smallint &&
1658                    column.getType() != NdbDictionary::Column::Smallunsigned))
1659         return QRY_PARAMETER_HAS_WRONG_TYPE;
1660 
1661       len = static_cast<Uint32>(sizeof(m_value.uint16));
1662       assert(len == maxSize);
1663       dst.appendBytes(&m_value.uint16, len);
1664       break;
1665 
1666     case Type_Uint32:
1667       if (unlikely(column.getType() != NdbDictionary::Column::Int &&
1668                    column.getType() != NdbDictionary::Column::Unsigned))
1669         return QRY_PARAMETER_HAS_WRONG_TYPE;
1670 
1671       len = static_cast<Uint32>(sizeof(m_value.uint32));
1672       assert(len == maxSize);
1673       dst.appendBytes(&m_value.uint32, len);
1674       break;
1675 
1676     case Type_Uint64:
1677       if (unlikely(column.getType() != NdbDictionary::Column::Bigint &&
1678                    column.getType() != NdbDictionary::Column::Bigunsigned))
1679         return QRY_PARAMETER_HAS_WRONG_TYPE;
1680 
1681       len = static_cast<Uint32>(sizeof(m_value.uint64));
1682       assert(len == maxSize);
1683       dst.appendBytes(&m_value.uint64, len);
1684       break;
1685 
1686     case Type_Double:
1687       if (unlikely(column.getType() != NdbDictionary::Column::Double))
1688         return QRY_PARAMETER_HAS_WRONG_TYPE;
1689 
1690       len = static_cast<Uint32>(sizeof(m_value.dbl));
1691       assert(len == maxSize);
1692       dst.appendBytes(&m_value.dbl, len);
1693       break;
1694 
1695     case Type_string:
1696       if (unlikely(column.getType() != NdbDictionary::Column::Char &&
1697                    column.getType() != NdbDictionary::Column::Varchar &&
1698                    column.getType() != NdbDictionary::Column::Longvarchar))
1699         return QRY_PARAMETER_HAS_WRONG_TYPE;
1700       {
1701         len  = static_cast<Uint32>(strlen(m_value.string));
1702         if (unlikely(len > maxSize))
1703           return QRY_CHAR_PARAMETER_TRUNCATED;
1704 
1705         dst.appendBytes(m_value.string, len);
1706       }
1707       break;
1708 
1709     case Type_raw:
1710       // 'Raw' data is readily formated according to the bound column
1711       if (likely(column.m_arrayType == NDB_ARRAYTYPE_FIXED))
1712       {
1713         len = maxSize;
1714         dst.appendBytes(m_value.raw, maxSize);
1715       }
1716       else if (column.m_arrayType == NDB_ARRAYTYPE_SHORT_VAR)
1717       {
1718         len  = 1+*((Uint8*)(m_value.raw));
1719 
1720         assert(column.getType() == NdbDictionary::Column::Varchar ||
1721                column.getType() == NdbDictionary::Column::Varbinary);
1722         if (unlikely(len > 1+static_cast<Uint32>(column.getLength())))
1723           return QRY_CHAR_PARAMETER_TRUNCATED;
1724 
1725         dst.appendBytes(m_value.raw, len);
1726       }
1727       else if (column.m_arrayType == NDB_ARRAYTYPE_MEDIUM_VAR)
1728       {
1729         len  = 2+uint2korr((Uint8*)m_value.raw);
1730 
1731         assert(column.getType() == NdbDictionary::Column::Longvarchar ||
1732                column.getType() == NdbDictionary::Column::Longvarbinary);
1733         if (unlikely(len > 2+static_cast<Uint32>(column.getLength())))
1734           return QRY_CHAR_PARAMETER_TRUNCATED;
1735         dst.appendBytes(m_value.raw, len);
1736       }
1737       else
1738       {
1739         assert(0);
1740       }
1741       break;
1742 
1743     case Type_raw_shrink:
1744       // Only short VarChar can be shrinked
1745       if (unlikely(column.m_arrayType != NDB_ARRAYTYPE_SHORT_VAR))
1746         return QRY_PARAMETER_HAS_WRONG_TYPE;
1747 
1748       assert(column.getType() == NdbDictionary::Column::Varchar ||
1749              column.getType() == NdbDictionary::Column::Varbinary);
1750 
1751       {
1752         // Convert from two-byte to one-byte length field.
1753         len = 1+uint2korr((Uint8*)m_value.raw);
1754         assert(len <= 0x100);
1755 
1756         if (unlikely(len > 1+static_cast<Uint32>(column.getLength())))
1757           return QRY_CHAR_PARAMETER_TRUNCATED;
1758 
1759         const Uint8 shortLen = static_cast<Uint8>(len-1);
1760         dst.appendBytes(&shortLen, 1);
1761         dst.appendBytes(((Uint8*)m_value.raw)+2, shortLen);
1762       }
1763       break;
1764 
1765     default:
1766       assert(false);
1767   }
1768   if (unlikely(dst.isMemoryExhausted())) {
1769     return Err_MemoryAlloc;
1770   }
1771   return 0;
1772 } // NdbQueryParamValue::serializeValue
1773 
1774 ///////////////////////////////////////////
1775 /////////  NdbQueryImpl methods ///////////
1776 ///////////////////////////////////////////
1777 
NdbQueryImpl(NdbTransaction & trans,const NdbQueryDefImpl & queryDef)1778 NdbQueryImpl::NdbQueryImpl(NdbTransaction& trans,
1779                            const NdbQueryDefImpl& queryDef):
1780   m_interface(*this),
1781   m_state(Initial),
1782   m_tcState(Inactive),
1783   m_next(NULL),
1784   m_queryDef(&queryDef),
1785   m_error(),
1786   m_errorReceived(0),
1787   m_transaction(trans),
1788   m_scanTransaction(NULL),
1789   m_operations(0),
1790   m_countOperations(0),
1791   m_globalCursor(0),
1792   m_pendingFrags(0),
1793   m_rootFragCount(0),
1794   m_rootFrags(NULL),
1795   m_applFrags(),
1796   m_finalBatchFrags(0),
1797   m_num_bounds(0),
1798   m_shortestBound(0xffffffff),
1799   m_attrInfo(),
1800   m_keyInfo(),
1801   m_startIndicator(false),
1802   m_commitIndicator(false),
1803   m_prunability(Prune_No),
1804   m_pruneHashVal(0),
1805   m_operationAlloc(sizeof(NdbQueryOperationImpl)),
1806   m_tupleSetAlloc(sizeof(NdbResultStream::TupleSet)),
1807   m_resultStreamAlloc(sizeof(NdbResultStream)),
1808   m_pointerAlloc(sizeof(void*)),
1809   m_rowBufferAlloc(sizeof(char))
1810 {
1811   // Allocate memory for all m_operations[] in a single chunk
1812   m_countOperations = queryDef.getNoOfOperations();
1813   const int error = m_operationAlloc.init(m_countOperations);
1814   if (unlikely(error != 0))
1815   {
1816     setErrorCode(error);
1817     return;
1818   }
1819   m_operations = reinterpret_cast<NdbQueryOperationImpl*>
1820     (m_operationAlloc.allocObjMem(m_countOperations));
1821 
1822   // Then; use placement new to construct each individual
1823   // NdbQueryOperationImpl object in m_operations
1824   for (Uint32 i=0; i<m_countOperations; ++i)
1825   {
1826     const NdbQueryOperationDefImpl& def = queryDef.getQueryOperation(i);
1827     new(&m_operations[i]) NdbQueryOperationImpl(*this, def);
1828     // Failed to create NdbQueryOperationImpl object.
1829     if (m_error.code != 0)
1830     {
1831       // Destroy those objects that we have already constructed.
1832       for (int j = static_cast<int>(i)-1; j>= 0; j--)
1833       {
1834         m_operations[j].~NdbQueryOperationImpl();
1835       }
1836       m_operations = NULL;
1837       return;
1838     }
1839   }
1840 
1841   // Serialized QueryTree definition is first part of ATTRINFO.
1842   m_attrInfo.append(queryDef.getSerialized());
1843 }
1844 
~NdbQueryImpl()1845 NdbQueryImpl::~NdbQueryImpl()
1846 {
1847   /** BEWARE:
1848    *  Don't refer NdbQueryDef or NdbQueryOperationDefs after
1849    *  NdbQuery::close() as at this stage the appliaction is
1850    *  allowed to destruct the Def's.
1851    */
1852   assert(m_state==Closed);
1853   assert(m_rootFrags==NULL);
1854 
1855   // NOTE: m_operations[] was allocated as a single memory chunk with
1856   // placement new construction of each operation.
1857   // Requires explicit call to d'tor of each operation before memory is free'ed.
1858   if (m_operations != NULL) {
1859     for (int i=m_countOperations-1; i>=0; --i)
1860     { m_operations[i].~NdbQueryOperationImpl();
1861     }
1862     m_operations = NULL;
1863   }
1864   m_state = Destructed;
1865 }
1866 
1867 void
postFetchRelease()1868 NdbQueryImpl::postFetchRelease()
1869 {
1870   if (m_rootFrags != NULL)
1871   {
1872     for (unsigned i=0; i<m_rootFragCount; i++)
1873     { m_rootFrags[i].postFetchRelease();
1874     }
1875   }
1876   if (m_operations != NULL)
1877   {
1878     for (unsigned i=0; i<m_countOperations; i++)
1879     { m_operations[i].postFetchRelease();
1880     }
1881   }
1882   delete[] m_rootFrags;
1883   m_rootFrags = NULL;
1884 
1885   m_rowBufferAlloc.reset();
1886   m_tupleSetAlloc.reset();
1887   m_resultStreamAlloc.reset();
1888 }
1889 
1890 
1891 //static
1892 NdbQueryImpl*
buildQuery(NdbTransaction & trans,const NdbQueryDefImpl & queryDef)1893 NdbQueryImpl::buildQuery(NdbTransaction& trans,
1894                          const NdbQueryDefImpl& queryDef)
1895 {
1896   assert(queryDef.getNoOfOperations() > 0);
1897   // Check for online upgrade/downgrade.
1898   if (unlikely(!ndb_join_pushdown(trans.getNdb()->getMinDbNodeVersion())))
1899   {
1900     trans.setOperationErrorCodeAbort(Err_FunctionNotImplemented);
1901     return NULL;
1902   }
1903   NdbQueryImpl* const query = new NdbQueryImpl(trans, queryDef);
1904   if (unlikely(query==NULL)) {
1905     trans.setOperationErrorCodeAbort(Err_MemoryAlloc);
1906     return NULL;
1907   }
1908   if (unlikely(query->m_error.code != 0))
1909   {
1910     // Transaction error code set already.
1911     query->release();
1912     return NULL;
1913   }
1914   assert(query->m_state==Initial);
1915   return query;
1916 }
1917 
1918 
1919 /** Assign supplied parameter values to the parameter placeholders
1920  *  Created when the query was defined.
1921  *  Values are *copied* into this NdbQueryImpl object:
1922  *  Memory location used as source for parameter values don't have
1923  *  to be valid after this assignment.
1924  */
1925 int
assignParameters(const NdbQueryParamValue paramValues[])1926 NdbQueryImpl::assignParameters(const NdbQueryParamValue paramValues[])
1927 {
1928   /**
1929    * Immediately build the serialized parameter representation in order
1930    * to avoid storing param values elsewhere until query is executed.
1931    * Also calculates prunable property, and possibly its hashValue.
1932    */
1933   // Build explicit key/filter/bounds for root operation, possibly refering paramValues
1934   const int error = getRoot().prepareKeyInfo(m_keyInfo, paramValues);
1935   if (unlikely(error != 0))
1936   {
1937     setErrorCode(error);
1938     return -1;
1939   }
1940 
1941   // Serialize parameter values for the other (non-root) operations
1942   // (No need to serialize for root (i==0) as root key is part of keyInfo above)
1943   for (Uint32 i=1; i<getNoOfOperations(); ++i)
1944   {
1945     if (getQueryDef().getQueryOperation(i).getNoOfParameters() > 0)
1946     {
1947       const int error = getQueryOperation(i).serializeParams(paramValues);
1948       if (unlikely(error != 0))
1949       {
1950         setErrorCode(error);
1951         return -1;
1952       }
1953     }
1954   }
1955   assert(m_state<Defined);
1956   m_state = Defined;
1957   return 0;
1958 } // NdbQueryImpl::assignParameters
1959 
1960 
1961 static int
insert_bound(Uint32Buffer & keyInfo,const NdbRecord * key_record,Uint32 column_index,const char * row,Uint32 bound_type)1962 insert_bound(Uint32Buffer& keyInfo, const NdbRecord *key_record,
1963                                               Uint32 column_index,
1964                                               const char *row,
1965                                               Uint32 bound_type)
1966 {
1967   char buf[NdbRecord::Attr::SHRINK_VARCHAR_BUFFSIZE];
1968   const NdbRecord::Attr *column= &key_record->columns[column_index];
1969 
1970   bool is_null= column->is_null(row);
1971   Uint32 len= 0;
1972   const void *aValue= row+column->offset;
1973 
1974   if (!is_null)
1975   {
1976     bool len_ok;
1977     /* Support for special mysqld varchar format in keys. */
1978     if (column->flags & NdbRecord::IsMysqldShrinkVarchar)
1979     {
1980       len_ok= column->shrink_varchar(row, len, buf);
1981       aValue= buf;
1982     }
1983     else
1984     {
1985       len_ok= column->get_var_length(row, len);
1986     }
1987     if (!len_ok) {
1988       return Err_WrongFieldLength;
1989     }
1990   }
1991 
1992   AttributeHeader ah(column->index_attrId, len);
1993   keyInfo.append(bound_type);
1994   keyInfo.append(ah.m_value);
1995   keyInfo.appendBytes(aValue,len);
1996 
1997   return 0;
1998 }
1999 
2000 
2001 int
setBound(const NdbRecord * key_record,const NdbIndexScanOperation::IndexBound * bound)2002 NdbQueryImpl::setBound(const NdbRecord *key_record,
2003                        const NdbIndexScanOperation::IndexBound *bound)
2004 {
2005   m_prunability = Prune_Unknown;
2006   if (unlikely(key_record == NULL || bound==NULL))
2007     return QRY_REQ_ARG_IS_NULL;
2008 
2009   if (unlikely(getRoot().getQueryOperationDef().getType()
2010                != NdbQueryOperationDef::OrderedIndexScan))
2011   {
2012     return QRY_WRONG_OPERATION_TYPE;
2013   }
2014 
2015   assert(m_state >= Defined);
2016   if (m_state != Defined)
2017   {
2018     return QRY_ILLEGAL_STATE;
2019   }
2020 
2021   int startPos = m_keyInfo.getSize();
2022 
2023   // We don't handle both NdbQueryIndexBound defined in ::scanIndex()
2024   // in combination with a later ::setBound(NdbIndexScanOperation::IndexBound)
2025 //assert (m_bound.lowKeys==0 && m_bound.highKeys==0);
2026 
2027   if (unlikely(bound->range_no != m_num_bounds ||
2028                bound->range_no > NdbIndexScanOperation::MaxRangeNo))
2029   {
2030  // setErrorCodeAbort(4286);
2031     return Err_InvalidRangeNo;
2032   }
2033 
2034   Uint32 key_count= bound->low_key_count;
2035   Uint32 common_key_count= key_count;
2036   if (key_count < bound->high_key_count)
2037     key_count= bound->high_key_count;
2038   else
2039     common_key_count= bound->high_key_count;
2040 
2041   if (m_shortestBound > common_key_count)
2042   {
2043     m_shortestBound = common_key_count;
2044   }
2045   /* Has the user supplied an open range (no bounds)? */
2046   const bool openRange= ((bound->low_key == NULL || bound->low_key_count == 0) &&
2047                          (bound->high_key == NULL || bound->high_key_count == 0));
2048   if (likely(!openRange))
2049   {
2050     /* If low and high key pointers are the same and key counts are
2051      * the same, we send as an Eq bound to save bandwidth.
2052      * This will not send an EQ bound if :
2053      *   - Different numbers of high and low keys are EQ
2054      *   - High and low keys are EQ, but use different ptrs
2055      */
2056     const bool isEqRange=
2057       (bound->low_key == bound->high_key) &&
2058       (bound->low_key_count == bound->high_key_count) &&
2059       (bound->low_inclusive && bound->high_inclusive); // Does this matter?
2060 
2061     if (isEqRange)
2062     {
2063       /* Using BoundEQ will result in bound being sent only once */
2064       for (unsigned j= 0; j<key_count; j++)
2065       {
2066         const int error=
2067           insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2068                                 bound->low_key, NdbIndexScanOperation::BoundEQ);
2069         if (unlikely(error))
2070           return error;
2071       }
2072     }
2073     else
2074     {
2075       /* Distinct upper and lower bounds, must specify them independently */
2076       /* Note :  Protocol allows individual columns to be specified as EQ
2077        * or some prefix of columns.  This is not currently supported from
2078        * NDBAPI.
2079        */
2080       for (unsigned j= 0; j<key_count; j++)
2081       {
2082         Uint32 bound_type;
2083         /* If key is part of lower bound */
2084         if (bound->low_key && j<bound->low_key_count)
2085         {
2086           /* Inclusive if defined, or matching rows can include this value */
2087           bound_type= bound->low_inclusive  || j+1 < bound->low_key_count ?
2088             NdbIndexScanOperation::BoundLE : NdbIndexScanOperation::BoundLT;
2089           const int error=
2090             insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2091                                   bound->low_key, bound_type);
2092           if (unlikely(error))
2093             return error;
2094         }
2095         /* If key is part of upper bound */
2096         if (bound->high_key && j<bound->high_key_count)
2097         {
2098           /* Inclusive if defined, or matching rows can include this value */
2099           bound_type= bound->high_inclusive  || j+1 < bound->high_key_count ?
2100             NdbIndexScanOperation::BoundGE : NdbIndexScanOperation::BoundGT;
2101           const int error=
2102             insert_bound(m_keyInfo, key_record, key_record->key_indexes[j],
2103                                   bound->high_key, bound_type);
2104           if (unlikely(error))
2105             return error;
2106         }
2107       }
2108     }
2109   }
2110   else
2111   {
2112     /* Open range - all rows must be returned.
2113      * To encode this, we'll request all rows where the first
2114      * key column value is >= NULL
2115      */
2116     AttributeHeader ah(0, 0);
2117     m_keyInfo.append(NdbIndexScanOperation::BoundLE);
2118     m_keyInfo.append(ah.m_value);
2119   }
2120 
2121   Uint32 length = m_keyInfo.getSize()-startPos;
2122   if (unlikely(m_keyInfo.isMemoryExhausted())) {
2123     return Err_MemoryAlloc;
2124   } else if (unlikely(length > 0xFFFF)) {
2125     return QRY_DEFINITION_TOO_LARGE; // Query definition too large.
2126   } else if (likely(length > 0)) {
2127     m_keyInfo.put(startPos, m_keyInfo.get(startPos) | (length << 16) | (bound->range_no << 4));
2128   }
2129 
2130 #ifdef TRACE_SERIALIZATION
2131   ndbout << "Serialized KEYINFO w/ bounds for indexScan root : ";
2132   for (Uint32 i = startPos; i < m_keyInfo.getSize(); i++) {
2133     char buf[12];
2134     sprintf(buf, "%.8x", m_keyInfo.get(i));
2135     ndbout << buf << " ";
2136   }
2137   ndbout << endl;
2138 #endif
2139 
2140   m_num_bounds++;
2141   return 0;
2142 } // NdbQueryImpl::setBound()
2143 
2144 
2145 Uint32
getNoOfOperations() const2146 NdbQueryImpl::getNoOfOperations() const
2147 {
2148   return m_countOperations;
2149 }
2150 
2151 Uint32
getNoOfLeafOperations() const2152 NdbQueryImpl::getNoOfLeafOperations() const
2153 {
2154   return getQueryOperation(Uint32(0)).getNoOfLeafOperations();
2155 }
2156 
2157 NdbQueryOperationImpl&
getQueryOperation(Uint32 index) const2158 NdbQueryImpl::getQueryOperation(Uint32 index) const
2159 {
2160   assert(index<m_countOperations);
2161   return m_operations[index];
2162 }
2163 
2164 NdbQueryOperationImpl*
getQueryOperation(const char * ident) const2165 NdbQueryImpl::getQueryOperation(const char* ident) const
2166 {
2167   for(Uint32 i = 0; i<m_countOperations; i++){
2168     if(strcmp(m_operations[i].getQueryOperationDef().getName(), ident) == 0){
2169       return &m_operations[i];
2170     }
2171   }
2172   return NULL;
2173 }
2174 
2175 Uint32
getNoOfParameters() const2176 NdbQueryImpl::getNoOfParameters() const
2177 {
2178   return 0;  // FIXME
2179 }
2180 
2181 const NdbParamOperand*
getParameter(const char * name) const2182 NdbQueryImpl::getParameter(const char* name) const
2183 {
2184   return NULL; // FIXME
2185 }
2186 
2187 const NdbParamOperand*
getParameter(Uint32 num) const2188 NdbQueryImpl::getParameter(Uint32 num) const
2189 {
2190   return NULL; // FIXME
2191 }
2192 
2193 /**
2194  * NdbQueryImpl::nextResult() - The 'global' cursor on the query results
2195  *
2196  * Will itterate and fetch results for all combinations of results from the NdbOperations
2197  * which this query consists of. Except for the root operations which will follow any
2198  * optinal ScanOrdering, we have no control of the ordering which the results from the
2199  * QueryOperations appear in.
2200  */
2201 
2202 NdbQuery::NextResultOutcome
nextResult(bool fetchAllowed,bool forceSend)2203 NdbQueryImpl::nextResult(bool fetchAllowed, bool forceSend)
2204 {
2205   if (unlikely(m_state < Executing || m_state >= Closed)) {
2206     assert (m_state >= Initial && m_state < Destructed);
2207     if (m_state == Failed)
2208       setErrorCode(QRY_IN_ERROR_STATE);
2209     else
2210       setErrorCode(QRY_ILLEGAL_STATE);
2211     DEBUG_CRASH();
2212     return NdbQuery::NextResult_error;
2213   }
2214 
2215   assert (m_globalCursor < getNoOfOperations());
2216 
2217   while (m_state != EndOfData)  // Or likely:  return when 'gotRow'
2218   {
2219     NdbQuery::NextResultOutcome res =
2220       getQueryOperation(m_globalCursor).nextResult(fetchAllowed,forceSend);
2221 
2222     if (unlikely(res == NdbQuery::NextResult_error))
2223       return res;
2224 
2225     else if (res == NdbQuery::NextResult_scanComplete)
2226     {
2227       if (m_globalCursor == 0)  // Completed reading all results from root
2228         break;
2229       m_globalCursor--;         // Get 'next' from  ancestor
2230     }
2231 
2232     else if (res == NdbQuery::NextResult_gotRow)
2233     {
2234       // Position to 'firstResult()' for all childs.
2235       // Update m_globalCursor to itterate from last operation with results next time
2236       //
2237       for (uint child=m_globalCursor+1; child<getNoOfOperations(); child++)
2238       {
2239         res = getQueryOperation(child).firstResult();
2240         if (unlikely(res == NdbQuery::NextResult_error))
2241           return res;
2242         else if (res == NdbQuery::NextResult_gotRow)
2243           m_globalCursor = child;
2244       }
2245       return NdbQuery::NextResult_gotRow;
2246     }
2247     else
2248     {
2249       assert (res == NdbQuery::NextResult_bufferEmpty);
2250       return res;
2251     }
2252   }
2253 
2254   assert (m_state == EndOfData);
2255   return NdbQuery::NextResult_scanComplete;
2256 
2257 } //NdbQueryImpl::nextResult()
2258 
2259 
2260 /**
2261  * Local cursor component which implements the special case of 'next' on the
2262  * root operation of entire NdbQuery. In addition to fetch 'next' result from
2263  * the root operation, we should also retrieve more results from the datanodes
2264  * if required and allowed.
2265  */
2266 NdbQuery::NextResultOutcome
nextRootResult(bool fetchAllowed,bool forceSend)2267 NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend)
2268 {
2269   /* To minimize lock contention, each query has the separate root fragment
2270    * conatiner 'm_applFrags'. m_applFrags is only accessed by the application
2271    * thread, so it is safe to use it without locks.
2272    */
2273   while (m_state != EndOfData)  // Or likely:  return when 'gotRow' or error
2274   {
2275     const NdbRootFragment* rootFrag = m_applFrags.getCurrent();
2276     if (unlikely(rootFrag==NULL))
2277     {
2278       /* m_applFrags is empty, so we cannot get more results without
2279        * possibly blocking.
2280        *
2281        * ::awaitMoreResults() will either copy fragments that are already
2282        * complete (under mutex protection), or block until data
2283        * previously requested arrives.
2284        */
2285       const FetchResult fetchResult = awaitMoreResults(forceSend);
2286       switch (fetchResult) {
2287 
2288       case FetchResult_ok:          // OK - got data wo/ error
2289         assert(m_state != Failed);
2290         rootFrag = m_applFrags.getCurrent();
2291         assert (rootFrag!=NULL);
2292         break;
2293 
2294       case FetchResult_noMoreData:  // No data, no error
2295         assert(m_state != Failed);
2296         assert (m_applFrags.getCurrent()==NULL);
2297         getRoot().nullifyResult();
2298         m_state = EndOfData;
2299         postFetchRelease();
2300         return NdbQuery::NextResult_scanComplete;
2301 
2302       case FetchResult_noMoreCache: // No cached data, no error
2303         assert(m_state != Failed);
2304         assert (m_applFrags.getCurrent()==NULL);
2305         getRoot().nullifyResult();
2306         if (fetchAllowed)
2307         {
2308           break;  // ::sendFetchMore() may request more results
2309         }
2310         return NdbQuery::NextResult_bufferEmpty;
2311 
2312       case FetchResult_gotError:    // Error in 'm_error.code'
2313         assert (m_error.code != 0);
2314         return NdbQuery::NextResult_error;
2315 
2316       default:
2317         assert(false);
2318       }
2319     }
2320     else
2321     {
2322       rootFrag->getResultStream(0).nextResult();   // Consume current
2323       m_applFrags.reorganize();                    // Calculate new current
2324       // Reorg. may update 'current' RootFragment
2325       rootFrag = m_applFrags.getCurrent();
2326     }
2327 
2328     /**
2329      * If allowed to request more rows from datanodes, we do this asynch
2330      * and request more rows as soon as we have consumed all rows from a
2331      * fragment. ::awaitMoreResults() may eventually block and wait for these
2332      * when required.
2333      */
2334     if (fetchAllowed)
2335     {
2336       // Ask for a new batch if we emptied some.
2337       NdbRootFragment** frags;
2338       const Uint32 cnt = m_applFrags.getFetchMore(frags);
2339       if (cnt > 0 && sendFetchMore(frags, cnt, forceSend) != 0)
2340       {
2341         return NdbQuery::NextResult_error;
2342       }
2343     }
2344 
2345     if (rootFrag!=NULL)
2346     {
2347       getRoot().fetchRow(rootFrag->getResultStream(0));
2348       return NdbQuery::NextResult_gotRow;
2349     }
2350   } // m_state != EndOfData
2351 
2352   assert (m_state == EndOfData);
2353   return NdbQuery::NextResult_scanComplete;
2354 } //NdbQueryImpl::nextRootResult()
2355 
2356 
2357 /**
2358  * Wait for more scan results which already has been REQuested to arrive.
2359  * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
2360  * and 1 of there are no more rows to receive.
2361  */
2362 NdbQueryImpl::FetchResult
awaitMoreResults(bool forceSend)2363 NdbQueryImpl::awaitMoreResults(bool forceSend)
2364 {
2365   assert(m_applFrags.getCurrent() == NULL);
2366 
2367   /* Check if there are any more completed fragments available.*/
2368   if (getQueryDef().isScanQuery())
2369   {
2370     assert (m_scanTransaction);
2371     assert (m_state==Executing);
2372 
2373     NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
2374     {
2375       /* This part needs to be done under mutex due to synchronization with
2376        * receiver thread.
2377        */
2378       PollGuard poll_guard(*ndb);
2379 
2380       /* There may be pending (asynchronous received, mutex protected) errors
2381        * from TC / datanodes. Propogate these into m_error.code in 'API space'.
2382        */
2383       while (likely(!hasReceivedError()))
2384       {
2385         /* Scan m_rootFrags (under mutex protection) for fragments
2386          * which has received a complete batch. Add these to m_applFrags.
2387          */
2388         m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
2389         if (m_applFrags.getCurrent() != NULL)
2390         {
2391           return FetchResult_ok;
2392         }
2393 
2394         /* There are no more available fragment results available without
2395          * first waiting for more to be received from the datanodes
2396          */
2397         if (m_pendingFrags == 0)
2398         {
2399           // 'No more *pending* results', ::sendFetchMore() may make more available
2400           return (m_finalBatchFrags < getRootFragCount()) ? FetchResult_noMoreCache
2401                                                           : FetchResult_noMoreData;
2402         }
2403 
2404         const Uint32 timeout  = ndb->get_waitfor_timeout();
2405         const Uint32 nodeId   = m_transaction.getConnectedNodeId();
2406         const Uint32 seq      = m_transaction.theNodeSequence;
2407 
2408         /* More results are on the way, so we wait for them.*/
2409         const FetchResult waitResult = static_cast<FetchResult>
2410           (poll_guard.wait_scan(3*timeout,
2411                                 nodeId,
2412                                 forceSend));
2413 
2414         if (ndb->getNodeSequence(nodeId) != seq)
2415           setFetchTerminated(Err_NodeFailCausedAbort,false);
2416         else if (likely(waitResult == FetchResult_ok))
2417           continue;
2418         else if (waitResult == FetchResult_timeOut)
2419           setFetchTerminated(Err_ReceiveTimedOut,false);
2420         else
2421           setFetchTerminated(Err_NodeFailCausedAbort,false);
2422 
2423         assert (m_state != Failed);
2424       } // while(!hasReceivedError())
2425     } // Terminates scope of 'PollGuard'
2426 
2427     // Fall through only if ::hasReceivedError()
2428     assert (m_error.code);
2429     return FetchResult_gotError;
2430   }
2431   else // is a Lookup query
2432   {
2433     /* The root operation is a lookup. Lookups are guaranteed to be complete
2434      * before NdbTransaction::execute() returns. Therefore we do not set
2435      * the lock, because we know that the signal receiver thread will not
2436      * be accessing m_rootFrags at this time.
2437      */
2438     m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
2439     if (m_applFrags.getCurrent() != NULL)
2440     {
2441       return FetchResult_ok;
2442     }
2443 
2444     /* Getting here means that either:
2445      *  - No results was returned (TCKEYREF)
2446      *  - There was no matching row for an inner join.
2447      *  - or, the application called nextResult() twice for a lookup query.
2448      */
2449     assert(m_pendingFrags == 0);
2450     assert(m_finalBatchFrags == getRootFragCount());
2451     return FetchResult_noMoreData;
2452   } // if(getQueryDef().isScanQuery())
2453 
2454 } //NdbQueryImpl::awaitMoreResults
2455 
2456 
2457 /*
2458   ::handleBatchComplete() is intended to be called when receiving signals only.
2459   The PollGuard mutex is then set and the shared 'm_pendingFrags' and
2460   'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled.
2461 
2462   returns: 'true' when application thread should be resumed.
2463 */
2464 bool
handleBatchComplete(NdbRootFragment & rootFrag)2465 NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag)
2466 {
2467   if (traceSignals) {
2468     ndbout << "NdbQueryImpl::handleBatchComplete"
2469            << ", fragNo=" << rootFrag.getFragNo()
2470            << ", pendingFrags=" << (m_pendingFrags-1)
2471            << ", finalBatchFrags=" << m_finalBatchFrags
2472            <<  endl;
2473   }
2474   assert(rootFrag.isFragBatchComplete());
2475 
2476   /* May received fragment data after a SCANREF() (timeout?)
2477    * terminated the scan.  We are about to close this query,
2478    * and didn't expect any more data - ignore it!
2479    */
2480   if (likely(m_errorReceived == 0))
2481   {
2482     assert(m_pendingFrags > 0);                // Check against underflow.
2483     assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
2484     m_pendingFrags--;
2485 
2486     if (rootFrag.finalBatchReceived())
2487     {
2488       m_finalBatchFrags++;
2489       assert(m_finalBatchFrags <= m_rootFragCount);
2490     }
2491 
2492     /* When application thread ::awaitMoreResults() it will later be
2493      * added to m_applFrags under mutex protection.
2494      */
2495     rootFrag.setReceivedMore();
2496     return true;
2497   }
2498   else if (!getQueryDef().isScanQuery())  // A failed lookup query
2499   {
2500     /**
2501      * A lookup query will retrieve the rows as part of ::execute().
2502      * -> Error must be visible through API before we return control
2503      *    to the application.
2504      */
2505     setErrorCode(m_errorReceived);
2506     return true;
2507   }
2508 
2509   return false;
2510 } // NdbQueryImpl::handleBatchComplete
2511 
2512 int
close(bool forceSend)2513 NdbQueryImpl::close(bool forceSend)
2514 {
2515   int res = 0;
2516 
2517   assert (m_state >= Initial && m_state < Destructed);
2518   if (m_state != Closed)
2519   {
2520     if (m_tcState != Inactive)
2521     {
2522       /* We have started a scan, but we have not yet received the last batch
2523        * for all root fragments. We must therefore close the scan to release
2524        * the scan context at TC.*/
2525       res = closeTcCursor(forceSend);
2526     }
2527 
2528     // Throw any pending results
2529     NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
2530     m_applFrags.clear();
2531 
2532     Ndb* const ndb = m_transaction.getNdb();
2533     if (m_scanTransaction != NULL)
2534     {
2535       assert (m_state != Closed);
2536       assert (m_scanTransaction->m_scanningQuery == this);
2537       m_scanTransaction->m_scanningQuery = NULL;
2538       ndb->closeTransaction(m_scanTransaction);
2539       ndb->theRemainingStartTransactions--;  // Compensate; m_scanTransaction was not a real Txn
2540       m_scanTransaction = NULL;
2541     }
2542 
2543     postFetchRelease();
2544     m_state = Closed;  // Even if it was previously 'Failed' it is closed now!
2545   }
2546 
2547   /** BEWARE:
2548    *  Don't refer NdbQueryDef or its NdbQueryOperationDefs after ::close()
2549    *  as the application is allowed to destruct the Def's after this point.
2550    */
2551   m_queryDef= NULL;
2552 
2553   return res;
2554 } //NdbQueryImpl::close
2555 
2556 
2557 void
release()2558 NdbQueryImpl::release()
2559 {
2560   assert (m_state >= Initial && m_state < Destructed);
2561   if (m_state != Closed) {
2562     close(true);  // Ignore any errors, explicit ::close() first if errors are of interest
2563   }
2564 
2565   delete this;
2566 }
2567 
2568 void
setErrorCode(int aErrorCode)2569 NdbQueryImpl::setErrorCode(int aErrorCode)
2570 {
2571   assert (aErrorCode!=0);
2572   m_error.code = aErrorCode;
2573   m_transaction.theErrorLine = 0;
2574   m_transaction.theErrorOperation = NULL;
2575 
2576   switch(aErrorCode)
2577   {
2578     // Not realy an error. A root lookup found no match.
2579   case Err_TupleNotFound:
2580     // Simple or dirty read failed due to node failure. Transaction will be aborted.
2581   case Err_SimpleDirtyReadFailed:
2582     m_transaction.setOperationErrorCode(aErrorCode);
2583     break;
2584 
2585     // For any other error, abort the transaction.
2586   default:
2587     m_state = Failed;
2588     m_transaction.setOperationErrorCodeAbort(aErrorCode);
2589     break;
2590   }
2591 }
2592 
2593 /*
2594  * ::setFetchTerminated() Should only be called with mutex locked.
2595  * Register result fetching as completed (possibly prematurely, w/ errorCode).
2596  */
2597 void
setFetchTerminated(int errorCode,bool needClose)2598 NdbQueryImpl::setFetchTerminated(int errorCode, bool needClose)
2599 {
2600   assert(m_finalBatchFrags < getRootFragCount());
2601   if (!needClose)
2602   {
2603     m_finalBatchFrags = getRootFragCount();
2604   }
2605   if (errorCode!=0)
2606   {
2607     m_errorReceived = errorCode;
2608   }
2609   m_pendingFrags = 0;
2610 } // NdbQueryImpl::setFetchTerminated()
2611 
2612 
2613 /* There may be pending (asynchronous received, mutex protected) errors
2614  * from TC / datanodes. Propogate these into 'API space'.
2615  * ::hasReceivedError() Should only be called with mutex locked
2616  */
2617 bool
hasReceivedError()2618 NdbQueryImpl::hasReceivedError()
2619 {
2620   if (unlikely(m_errorReceived))
2621   {
2622     setErrorCode(m_errorReceived);
2623     return true;
2624   }
2625   return false;
2626 } // NdbQueryImpl::hasReceivedError
2627 
2628 
2629 bool
execTCKEYCONF()2630 NdbQueryImpl::execTCKEYCONF()
2631 {
2632   if (traceSignals) {
2633     ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl;
2634   }
2635   assert(!getQueryDef().isScanQuery());
2636   NdbRootFragment& rootFrag = m_rootFrags[0];
2637 
2638   // We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
2639   rootFrag.setConfReceived(RNIL);
2640   rootFrag.incrOutstandingResults(-1);
2641 
2642   bool ret = false;
2643   if (rootFrag.isFragBatchComplete())
2644   {
2645     ret = handleBatchComplete(rootFrag);
2646   }
2647 
2648   if (traceSignals) {
2649     ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret
2650            << ", m_pendingFrags=" << m_pendingFrags
2651            << ", rootStream= {" << rootFrag.getResultStream(0) << "}"
2652            << endl;
2653   }
2654   return ret;
2655 } // NdbQueryImpl::execTCKEYCONF
2656 
2657 void
execCLOSE_SCAN_REP(int errorCode,bool needClose)2658 NdbQueryImpl::execCLOSE_SCAN_REP(int errorCode, bool needClose)
2659 {
2660   if (traceSignals)
2661   {
2662     ndbout << "NdbQueryImpl::execCLOSE_SCAN_REP()" << endl;
2663   }
2664   setFetchTerminated(errorCode,needClose);
2665 }
2666 
2667 int
prepareSend()2668 NdbQueryImpl::prepareSend()
2669 {
2670   if (unlikely(m_state != Defined)) {
2671     assert (m_state >= Initial && m_state < Destructed);
2672     if (m_state == Failed)
2673       setErrorCode(QRY_IN_ERROR_STATE);
2674     else
2675       setErrorCode(QRY_ILLEGAL_STATE);
2676     DEBUG_CRASH();
2677     return -1;
2678   }
2679 
2680   // Determine execution parameters 'batch size'.
2681   // May be user specified (TODO), and/or,  limited/specified by config values
2682   //
2683   if (getQueryDef().isScanQuery())
2684   {
2685     /* For the first batch, we read from all fragments for both ordered
2686      * and unordered scans.*/
2687     if (getQueryOperation(0U).m_parallelism == Parallelism_max)
2688     {
2689       m_rootFragCount
2690         = getRoot().getQueryOperationDef().getTable().getFragmentCount();
2691     }
2692     else
2693     {
2694       assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
2695       m_rootFragCount
2696         = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
2697               getQueryOperation(0U).m_parallelism);
2698     }
2699     Ndb* const ndb = m_transaction.getNdb();
2700 
2701     /** Scan operations need a own sub-transaction object associated with each
2702      *  query.
2703      */
2704     ndb->theRemainingStartTransactions++; // Compensate; does not start a real Txn
2705     NdbTransaction *scanTxn = ndb->hupp(&m_transaction);
2706     if (scanTxn==NULL) {
2707       ndb->theRemainingStartTransactions--;
2708       m_transaction.setOperationErrorCodeAbort(ndb->getNdbError().code);
2709       return -1;
2710     }
2711     scanTxn->theMagicNumber = 0x37412619;
2712     scanTxn->m_scanningQuery = this;
2713     this->m_scanTransaction = scanTxn;
2714   }
2715   else  // Lookup query
2716   {
2717     m_rootFragCount = 1;
2718   }
2719 
2720   int error = m_resultStreamAlloc.init(m_rootFragCount * getNoOfOperations());
2721   if (error != 0)
2722   {
2723     setErrorCode(error);
2724     return -1;
2725   }
2726   // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
2727   error = m_pointerAlloc.init(m_rootFragCount *
2728                               (OrderedFragSet::pointersPerFragment));
2729   if (error != 0)
2730   {
2731     setErrorCode(error);
2732     return -1;
2733   }
2734 
2735   // Some preparation for later batchsize calculations pr. (sub) scan
2736   getRoot().calculateBatchedRows(NULL);
2737   getRoot().setBatchedRows(1);
2738 
2739   /**
2740    * Calculate total amount of row buffer space for all operations and
2741    * fragments.
2742    */
2743   Uint32 totalBuffSize = 0;
2744   for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
2745   {
2746     const NdbQueryOperationImpl& op = getQueryOperation(opNo);
2747 
2748     // Add space for batchBuffer & m_correlations
2749     Uint32 opBuffSize = op.getBatchBufferSize();
2750     if (getQueryDef().isScanQuery())
2751     {
2752       opBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows());
2753       opBuffSize *= 2;              // Scans are double buffered
2754     }
2755     opBuffSize += op.getRowSize();  // Unpacked row from buffers
2756     totalBuffSize += opBuffSize;
2757   }
2758   m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize);
2759 
2760   if (getQueryDef().isScanQuery())
2761   {
2762     Uint32 totalRows = 0;
2763     for (Uint32 i = 0; i<getNoOfOperations(); i++)
2764     {
2765       totalRows += getQueryOperation(i).getMaxBatchRows();
2766     }
2767     error = m_tupleSetAlloc.init(2 * m_rootFragCount * totalRows);
2768     if (unlikely(error != 0))
2769     {
2770       setErrorCode(error);
2771       return -1;
2772     }
2773   }
2774 
2775   /**
2776    * Allocate and initialize fragment state variables.
2777    * Will also cause a ResultStream object containing a
2778    * NdbReceiver to be constructed for each operation in QueryTree
2779    */
2780   m_rootFrags = new NdbRootFragment[m_rootFragCount];
2781   if (m_rootFrags == NULL)
2782   {
2783     setErrorCode(Err_MemoryAlloc);
2784     return -1;
2785   }
2786   for (Uint32 i = 0; i<m_rootFragCount; i++)
2787   {
2788     m_rootFrags[i].init(*this, i); // Set fragment number.
2789   }
2790 
2791   // Fill in parameters (into ATTRINFO) for QueryTree.
2792   for (Uint32 i = 0; i < m_countOperations; i++) {
2793     const int error = m_operations[i].prepareAttrInfo(m_attrInfo);
2794     if (unlikely(error))
2795     {
2796       setErrorCode(error);
2797       return -1;
2798     }
2799   }
2800 
2801   if (unlikely(m_attrInfo.isMemoryExhausted() || m_keyInfo.isMemoryExhausted())) {
2802     setErrorCode(Err_MemoryAlloc);
2803     return -1;
2804   }
2805 
2806   if (unlikely(m_attrInfo.getSize() > ScanTabReq::MaxTotalAttrInfo  ||
2807                m_keyInfo.getSize()  > ScanTabReq::MaxTotalAttrInfo)) {
2808     setErrorCode(Err_ReadTooMuch); // TODO: find a more suitable errorcode,
2809     return -1;
2810   }
2811 
2812   // Setup m_applStreams and m_fullStreams for receiving results
2813   const NdbRecord* keyRec = NULL;
2814   if(getRoot().getQueryOperationDef().getIndex()!=NULL)
2815   {
2816     /* keyRec is needed for comparing records when doing ordered index scans.*/
2817     keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
2818     assert(keyRec!=NULL);
2819   }
2820   m_applFrags.prepare(m_pointerAlloc,
2821                       getRoot().getOrdering(),
2822                       m_rootFragCount,
2823                       keyRec,
2824                       getRoot().m_ndbRecord);
2825 
2826   if (getQueryDef().isScanQuery())
2827   {
2828     NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
2829   }
2830 
2831 #ifdef TRACE_SERIALIZATION
2832   ndbout << "Serialized ATTRINFO : ";
2833   for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){
2834     char buf[12];
2835     sprintf(buf, "%.8x", m_attrInfo.get(i));
2836     ndbout << buf << " ";
2837   }
2838   ndbout << endl;
2839 #endif
2840 
2841   assert (m_pendingFrags==0);
2842   m_state = Prepared;
2843   return 0;
2844 } // NdbQueryImpl::prepareSend
2845 
2846 
2847 
2848 /** This iterator is used for inserting a sequence of receiver ids
2849  * for the initial batch of a scan into a section via a GenericSectionPtr.*/
2850 class InitialReceiverIdIterator: public GenericSectionIterator
2851 {
2852 public:
2853 
InitialReceiverIdIterator(NdbRootFragment rootFrags[],Uint32 cnt)2854   InitialReceiverIdIterator(NdbRootFragment rootFrags[],
2855                             Uint32 cnt)
2856     :m_rootFrags(rootFrags),
2857      m_fragCount(cnt),
2858      m_currFragNo(0)
2859   {}
2860 
~InitialReceiverIdIterator()2861   virtual ~InitialReceiverIdIterator() {};
2862 
2863   /**
2864    * Get next batch of receiver ids.
2865    * @param sz This will be set to the number of receiver ids that have been
2866    * put in the buffer (0 if end has been reached.)
2867    * @return Array of receiver ids (or NULL if end reached.
2868    */
2869   virtual const Uint32* getNextWords(Uint32& sz);
2870 
reset()2871   virtual void reset()
2872   { m_currFragNo = 0;};
2873 
2874 private:
2875   /**
2876    * Size of internal receiver id buffer. This value is arbitrary, but
2877    * a larger buffer would mean fewer calls to getNextWords(), possibly
2878    * improving efficiency.
2879    */
2880   static const Uint32 bufSize = 16;
2881 
2882   /** Set of root fragments which we want to itterate receiver ids for.*/
2883   NdbRootFragment* m_rootFrags;
2884   const Uint32 m_fragCount;
2885 
2886   /** The next fragment numnber to be processed. (Range for 0 to no of
2887    * fragments.)*/
2888   Uint32 m_currFragNo;
2889   /** Buffer for storing one batch of receiver ids.*/
2890   Uint32 m_receiverIds[bufSize];
2891 };
2892 
getNextWords(Uint32 & sz)2893 const Uint32* InitialReceiverIdIterator::getNextWords(Uint32& sz)
2894 {
2895   /**
2896    * For the initial batch, we want to retrieve one batch for each fragment
2897    * whether it is a sorted scan or not.
2898    */
2899   if (m_currFragNo >= m_fragCount)
2900   {
2901     sz = 0;
2902     return NULL;
2903   }
2904   else
2905   {
2906     Uint32 cnt = 0;
2907     while (cnt < bufSize && m_currFragNo < m_fragCount)
2908     {
2909       m_receiverIds[cnt] = m_rootFrags[m_currFragNo].getReceiverId();
2910       cnt++;
2911       m_currFragNo++;
2912     }
2913     sz = cnt;
2914     return m_receiverIds;
2915   }
2916 }
2917 
2918 /** This iterator is used for inserting a sequence of 'TcPtrI'
2919  * for a NEXTREQ to a single or multiple fragments via a GenericSectionPtr.*/
2920 class FetchMoreTcIdIterator: public GenericSectionIterator
2921 {
2922 public:
FetchMoreTcIdIterator(NdbRootFragment * rootFrags[],Uint32 cnt)2923   FetchMoreTcIdIterator(NdbRootFragment* rootFrags[],
2924                         Uint32 cnt)
2925     :m_rootFrags(rootFrags),
2926      m_fragCount(cnt),
2927      m_currFragNo(0)
2928   {}
2929 
~FetchMoreTcIdIterator()2930   virtual ~FetchMoreTcIdIterator() {};
2931 
2932   /**
2933    * Get next batch of receiver ids.
2934    * @param sz This will be set to the number of receiver ids that have been
2935    * put in the buffer (0 if end has been reached.)
2936    * @return Array of receiver ids (or NULL if end reached.
2937    */
2938   virtual const Uint32* getNextWords(Uint32& sz);
2939 
reset()2940   virtual void reset()
2941   { m_currFragNo = 0;};
2942 
2943 private:
2944   /**
2945    * Size of internal receiver id buffer. This value is arbitrary, but
2946    * a larger buffer would mean fewer calls to getNextWords(), possibly
2947    * improving efficiency.
2948    */
2949   static const Uint32 bufSize = 16;
2950 
2951   /** Set of root fragments which we want to itterate TcPtrI ids for.*/
2952   NdbRootFragment** m_rootFrags;
2953   const Uint32 m_fragCount;
2954 
2955   /** The next fragment numnber to be processed. (Range for 0 to no of
2956    * fragments.)*/
2957   Uint32 m_currFragNo;
2958   /** Buffer for storing one batch of receiver ids.*/
2959   Uint32 m_receiverIds[bufSize];
2960 };
2961 
getNextWords(Uint32 & sz)2962 const Uint32* FetchMoreTcIdIterator::getNextWords(Uint32& sz)
2963 {
2964   /**
2965    * For the initial batch, we want to retrieve one batch for each fragment
2966    * whether it is a sorted scan or not.
2967    */
2968   if (m_currFragNo >= m_fragCount)
2969   {
2970     sz = 0;
2971     return NULL;
2972   }
2973   else
2974   {
2975     Uint32 cnt = 0;
2976     while (cnt < bufSize && m_currFragNo < m_fragCount)
2977     {
2978       m_receiverIds[cnt] = m_rootFrags[m_currFragNo]->getReceiverTcPtrI();
2979       cnt++;
2980       m_currFragNo++;
2981     }
2982     sz = cnt;
2983     return m_receiverIds;
2984   }
2985 }
2986 
2987 /******************************************************************************
2988 int doSend()    Send serialized queryTree and parameters encapsulated in
2989                 either a SCAN_TABREQ or TCKEYREQ to TC.
2990 
2991 NOTE:           The TransporterFacade mutex is already set by callee.
2992 
2993 Return Value:   Return >0 : send was succesful, returns number of signals sent
2994                 Return -1: In all other case.
2995 Parameters:     nodeId: Receiving processor node
2996 Remark:         Send a TCKEYREQ or SCAN_TABREQ (long) signal depending of
2997                 the query being either a lookup or scan type.
2998                 KEYINFO and ATTRINFO are included as part of the long signal
2999 ******************************************************************************/
3000 int
doSend(int nodeId,bool lastFlag)3001 NdbQueryImpl::doSend(int nodeId, bool lastFlag)
3002 {
3003   if (unlikely(m_state != Prepared)) {
3004     assert (m_state >= Initial && m_state < Destructed);
3005     if (m_state == Failed)
3006       setErrorCode(QRY_IN_ERROR_STATE);
3007     else
3008       setErrorCode(QRY_ILLEGAL_STATE);
3009     DEBUG_CRASH();
3010     return -1;
3011   }
3012 
3013   Ndb& ndb = *m_transaction.getNdb();
3014   NdbImpl * impl = ndb.theImpl;
3015 
3016   const NdbQueryOperationImpl& root = getRoot();
3017   const NdbQueryOperationDefImpl& rootDef = root.getQueryOperationDef();
3018   const NdbTableImpl* const rootTable = rootDef.getIndex()
3019     ? rootDef.getIndex()->getIndexTable()
3020     : &rootDef.getTable();
3021 
3022   Uint32 tTableId = rootTable->m_id;
3023   Uint32 tSchemaVersion = rootTable->m_version;
3024 
3025   for (Uint32 i=0; i<m_rootFragCount; i++)
3026   {
3027     m_rootFrags[i].prepareNextReceiveSet();
3028   }
3029 
3030   if (rootDef.isScanOperation())
3031   {
3032     Uint32 scan_flags = 0;  // TODO: Specify with ScanOptions::SO_SCANFLAGS
3033 
3034     bool tupScan = (scan_flags & NdbScanOperation::SF_TupScan);
3035     bool rangeScan = false;
3036 
3037     bool dummy;
3038     const int error = isPrunable(dummy);
3039     if (unlikely(error != 0))
3040       return error;
3041 
3042     /* Handle IndexScan specifics */
3043     if ( (int) rootTable->m_indexType ==
3044          (int) NdbDictionary::Index::OrderedIndex )
3045     {
3046       rangeScan = true;
3047       tupScan = false;
3048     }
3049     const Uint32 descending =
3050       root.getOrdering()==NdbQueryOptions::ScanOrdering_descending ? 1 : 0;
3051     assert(descending==0 || (int) rootTable->m_indexType ==
3052            (int) NdbDictionary::Index::OrderedIndex);
3053 
3054     assert (root.getMaxBatchRows() > 0);
3055 
3056     NdbApiSignal tSignal(&ndb);
3057     tSignal.setSignal(GSN_SCAN_TABREQ, refToBlock(m_scanTransaction->m_tcRef));
3058 
3059     ScanTabReq * const scanTabReq = CAST_PTR(ScanTabReq, tSignal.getDataPtrSend());
3060     Uint32 reqInfo = 0;
3061 
3062     const Uint64 transId = m_scanTransaction->getTransactionId();
3063 
3064     scanTabReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3065     scanTabReq->buddyConPtr = m_scanTransaction->theBuddyConPtr; // 'buddy' refers 'real-transaction'->theTCConPtr
3066     scanTabReq->spare = 0;  // Unused in later protocoll versions
3067     scanTabReq->tableId = tTableId;
3068     scanTabReq->tableSchemaVersion = tSchemaVersion;
3069     scanTabReq->storedProcId = 0xFFFF;
3070     scanTabReq->transId1 = (Uint32) transId;
3071     scanTabReq->transId2 = (Uint32) (transId >> 32);
3072 
3073     Uint32 batchRows = root.getMaxBatchRows();
3074     Uint32 batchByteSize;
3075     NdbReceiver::calculate_batch_size(* ndb.theImpl,
3076                                       getRootFragCount(),
3077                                       batchRows,
3078                                       batchByteSize);
3079     assert(batchRows==root.getMaxBatchRows());
3080     assert(batchRows<=batchByteSize);
3081 
3082     /**
3083      * Check if query is a sorted scan-scan.
3084      * Ordering can then only be guarented by restricting
3085      * parent batch to contain single rows.
3086      * (Child scans will have 'normal' batch size).
3087      */
3088     if (root.getOrdering() != NdbQueryOptions::ScanOrdering_unordered &&
3089         getQueryDef().getQueryType() == NdbQueryDef::MultiScanQuery)
3090     {
3091       batchRows = 1;
3092     }
3093     ScanTabReq::setScanBatch(reqInfo, batchRows);
3094     scanTabReq->batch_byte_size = batchByteSize;
3095     scanTabReq->first_batch_size = batchRows;
3096 
3097     ScanTabReq::setViaSPJFlag(reqInfo, 1);
3098     ScanTabReq::setPassAllConfsFlag(reqInfo, 1);
3099 
3100     Uint32 nodeVersion = impl->getNodeNdbVersion(nodeId);
3101     if (!ndbd_scan_tabreq_implicit_parallelism(nodeVersion))
3102     {
3103       // Implicit parallelism implies support for greater
3104       // parallelism than storable explicitly in old reqInfo.
3105       Uint32 fragments = getRootFragCount();
3106       if (fragments > PARALLEL_MASK)
3107       {
3108         setErrorCode(Err_SendFailed /* TODO: TooManyFragments, to too old cluster version */);
3109         return -1;
3110       }
3111       ScanTabReq::setParallelism(reqInfo, fragments);
3112     }
3113 
3114     ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
3115     ScanTabReq::setDescendingFlag(reqInfo, descending);
3116     ScanTabReq::setTupScanFlag(reqInfo, tupScan);
3117     ScanTabReq::setNoDiskFlag(reqInfo, !root.diskInUserProjection());
3118     ScanTabReq::set4WordConf(reqInfo, 1);
3119 
3120     // Assume LockMode LM_ReadCommited, set related lock flags
3121     ScanTabReq::setLockMode(reqInfo, false);  // not exclusive
3122     ScanTabReq::setHoldLockFlag(reqInfo, false);
3123     ScanTabReq::setReadCommittedFlag(reqInfo, true);
3124 
3125 //  m_keyInfo = (scan_flags & NdbScanOperation::SF_KeyInfo) ? 1 : 0;
3126 
3127     // If scan is pruned, use optional 'distributionKey' to hold hashvalue
3128     if (m_prunability == Prune_Yes)
3129     {
3130 //    printf("Build pruned SCANREQ, w/ hashValue:%d\n", hashValue);
3131       ScanTabReq::setDistributionKeyFlag(reqInfo, 1);
3132       scanTabReq->distributionKey= m_pruneHashVal;
3133       tSignal.setLength(ScanTabReq::StaticLength + 1);
3134     } else {
3135       tSignal.setLength(ScanTabReq::StaticLength);
3136     }
3137     scanTabReq->requestInfo = reqInfo;
3138 
3139     /**
3140      * Then send the signal:
3141      *
3142      * SCANTABREQ always has 2 mandatory sections and an optional
3143      * third section
3144      * Section 0 : List of receiver Ids NDBAPI has allocated
3145      *             for the scan
3146      * Section 1 : ATTRINFO section
3147      * Section 2 : Optional KEYINFO section
3148      */
3149     GenericSectionPtr secs[3];
3150     InitialReceiverIdIterator receiverIdIter(m_rootFrags, m_rootFragCount);
3151     LinearSectionIterator attrInfoIter(m_attrInfo.addr(), m_attrInfo.getSize());
3152     LinearSectionIterator keyInfoIter(m_keyInfo.addr(), m_keyInfo.getSize());
3153 
3154     secs[0].sectionIter= &receiverIdIter;
3155     secs[0].sz= getRootFragCount();
3156 
3157     secs[1].sectionIter= &attrInfoIter;
3158     secs[1].sz= m_attrInfo.getSize();
3159 
3160     Uint32 numSections= 2;
3161     if (m_keyInfo.getSize() > 0)
3162     {
3163       secs[2].sectionIter= &keyInfoIter;
3164       secs[2].sz= m_keyInfo.getSize();
3165       numSections= 3;
3166     }
3167 
3168     /* Send Fragmented as SCAN_TABREQ can be large */
3169     const int res = impl->sendFragmentedSignal(&tSignal, nodeId, secs, numSections);
3170     if (unlikely(res == -1))
3171     {
3172       setErrorCode(Err_SendFailed);  // Error: 'Send to NDB failed'
3173       return FetchResult_sendFail;
3174     }
3175     m_tcState = Active;
3176 
3177   } else {  // Lookup query
3178 
3179     NdbApiSignal tSignal(&ndb);
3180     tSignal.setSignal(GSN_TCKEYREQ, refToBlock(m_transaction.m_tcRef));
3181 
3182     TcKeyReq * const tcKeyReq = CAST_PTR(TcKeyReq, tSignal.getDataPtrSend());
3183 
3184     const Uint64 transId = m_transaction.getTransactionId();
3185     tcKeyReq->apiConnectPtr   = m_transaction.theTCConPtr;
3186     tcKeyReq->apiOperationPtr = root.getIdOfReceiver();
3187     tcKeyReq->tableId = tTableId;
3188     tcKeyReq->tableSchemaVersion = tSchemaVersion;
3189     tcKeyReq->transId1 = (Uint32) transId;
3190     tcKeyReq->transId2 = (Uint32) (transId >> 32);
3191 
3192     Uint32 attrLen = 0;
3193     tcKeyReq->setAttrinfoLen(attrLen, 0); // Not required for long signals.
3194     tcKeyReq->attrLen = attrLen;
3195 
3196     Uint32 reqInfo = 0;
3197     Uint32 interpretedFlag= root.hasInterpretedCode() &&
3198                             rootDef.getType() == NdbQueryOperationDef::PrimaryKeyAccess;
3199 
3200     TcKeyReq::setOperationType(reqInfo, NdbOperation::ReadRequest);
3201     TcKeyReq::setViaSPJFlag(reqInfo, true);
3202     TcKeyReq::setKeyLength(reqInfo, 0);            // This is a long signal
3203     TcKeyReq::setAIInTcKeyReq(reqInfo, 0);         // Not needed
3204     TcKeyReq::setInterpretedFlag(reqInfo, interpretedFlag);
3205     TcKeyReq::setStartFlag(reqInfo, m_startIndicator);
3206     TcKeyReq::setExecuteFlag(reqInfo, lastFlag);
3207     TcKeyReq::setNoDiskFlag(reqInfo, !root.diskInUserProjection());
3208     TcKeyReq::setAbortOption(reqInfo, NdbOperation::AO_IgnoreError);
3209 
3210     TcKeyReq::setDirtyFlag(reqInfo, true);
3211     TcKeyReq::setSimpleFlag(reqInfo, true);
3212     TcKeyReq::setCommitFlag(reqInfo, m_commitIndicator);
3213     tcKeyReq->requestInfo = reqInfo;
3214 
3215     tSignal.setLength(TcKeyReq::StaticLength);
3216 
3217 /****
3218     // Unused optional part located after TcKeyReq::StaticLength
3219     tcKeyReq->scanInfo = 0;
3220     tcKeyReq->distrGroupHashValue = 0;
3221     tcKeyReq->distributionKeySize = 0;
3222     tcKeyReq->storedProcId = 0xFFFF;
3223 ***/
3224 
3225 /**** TODO ... maybe - from NdbOperation::prepareSendNdbRecord(AbortOption ao)
3226     Uint8 abortOption= (ao == DefaultAbortOption) ?
3227       (Uint8) m_abortOption : (Uint8) ao;
3228 
3229     m_abortOption= theSimpleIndicator && theOperationType==ReadRequest ?
3230       (Uint8) AO_IgnoreError : (Uint8) abortOption;
3231 
3232     TcKeyReq::setAbortOption(reqInfo, m_abortOption);
3233     TcKeyReq::setCommitFlag(tcKeyReq->requestInfo, theCommitIndicator);
3234 *****/
3235 
3236     LinearSectionPtr secs[2];
3237     secs[TcKeyReq::KeyInfoSectionNum].p= m_keyInfo.addr();
3238     secs[TcKeyReq::KeyInfoSectionNum].sz= m_keyInfo.getSize();
3239     Uint32 numSections= 1;
3240 
3241     if (m_attrInfo.getSize() > 0)
3242     {
3243       secs[TcKeyReq::AttrInfoSectionNum].p= m_attrInfo.addr();
3244       secs[TcKeyReq::AttrInfoSectionNum].sz= m_attrInfo.getSize();
3245       numSections= 2;
3246     }
3247 
3248     const int res = impl->sendSignal(&tSignal, nodeId, secs, numSections);
3249     if (unlikely(res == -1))
3250     {
3251       setErrorCode(Err_SendFailed);  // Error: 'Send to NDB failed'
3252       return FetchResult_sendFail;
3253     }
3254     m_transaction.OpSent();
3255     m_rootFrags[0].incrOutstandingResults(1 + getNoOfOperations() +
3256                                           getNoOfLeafOperations());
3257   } // if
3258 
3259   assert (m_pendingFrags==0);
3260   m_pendingFrags = m_rootFragCount;
3261 
3262   // Shrink memory footprint by removing structures not required after ::execute()
3263   m_keyInfo.releaseExtend();
3264   m_attrInfo.releaseExtend();
3265 
3266   // TODO: Release m_interpretedCode now?
3267 
3268   /* Todo : Consider calling NdbOperation::postExecuteRelease()
3269    * Ideally it should be called outside TP mutex, so not added
3270    * here yet
3271    */
3272 
3273   m_state = Executing;
3274   return 1;
3275 } // NdbQueryImpl::doSend()
3276 
3277 
3278 /******************************************************************************
3279 int sendFetchMore() - Fetch another scan batch, optionaly closing the scan
3280 
3281                 Request another batch of rows to be retrieved from the scan.
3282 
3283 Return Value:   0 if send succeeded, -1 otherwise.
3284 Parameters:     emptyFrag: Root frgament for which to ask for another batch.
3285 Remark:
3286 ******************************************************************************/
3287 int
sendFetchMore(NdbRootFragment * rootFrags[],Uint32 cnt,bool forceSend)3288 NdbQueryImpl::sendFetchMore(NdbRootFragment* rootFrags[],
3289                             Uint32 cnt,
3290                             bool forceSend)
3291 {
3292   assert(getQueryDef().isScanQuery());
3293 
3294   for (Uint32 i=0; i<cnt; i++)
3295   {
3296     NdbRootFragment* rootFrag = rootFrags[i];
3297     assert(rootFrag->isFragBatchComplete());
3298     assert(!rootFrag->finalBatchReceived());
3299     rootFrag->prepareNextReceiveSet();
3300   }
3301 
3302   Ndb& ndb = *getNdbTransaction().getNdb();
3303   NdbApiSignal tSignal(&ndb);
3304   tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(m_scanTransaction->m_tcRef));
3305   ScanNextReq * const scanNextReq =
3306     CAST_PTR(ScanNextReq, tSignal.getDataPtrSend());
3307 
3308   assert (m_scanTransaction);
3309   const Uint64 transId = m_scanTransaction->getTransactionId();
3310 
3311   scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3312   scanNextReq->stopScan = 0;
3313   scanNextReq->transId1 = (Uint32) transId;
3314   scanNextReq->transId2 = (Uint32) (transId >> 32);
3315   tSignal.setLength(ScanNextReq::SignalLength);
3316 
3317   FetchMoreTcIdIterator receiverIdIter(rootFrags, cnt);
3318 
3319   GenericSectionPtr secs[1];
3320   secs[ScanNextReq::ReceiverIdsSectionNum].sectionIter = &receiverIdIter;
3321   secs[ScanNextReq::ReceiverIdsSectionNum].sz = cnt;
3322 
3323   NdbImpl * impl = ndb.theImpl;
3324   Uint32 nodeId = m_transaction.getConnectedNodeId();
3325   Uint32 seq    = m_transaction.theNodeSequence;
3326 
3327   /* This part needs to be done under mutex due to synchronization with
3328    * receiver thread.
3329    */
3330   PollGuard poll_guard(* impl);
3331 
3332   if (unlikely(hasReceivedError()))
3333   {
3334     // Errors arrived inbetween ::await released mutex, and sendFetchMore grabbed it
3335     return -1;
3336   }
3337   if (impl->getNodeSequence(nodeId) != seq ||
3338       impl->sendSignal(&tSignal, nodeId, secs, 1) != 0)
3339   {
3340     setErrorCode(Err_NodeFailCausedAbort);
3341     return -1;
3342   }
3343   impl->do_forceSend(forceSend);
3344 
3345   m_pendingFrags += cnt;
3346   assert(m_pendingFrags <= getRootFragCount());
3347 
3348   return 0;
3349 } // NdbQueryImpl::sendFetchMore()
3350 
3351 int
closeTcCursor(bool forceSend)3352 NdbQueryImpl::closeTcCursor(bool forceSend)
3353 {
3354   assert (getQueryDef().isScanQuery());
3355 
3356   NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
3357   const Uint32 timeout  = ndb->get_waitfor_timeout();
3358   const Uint32 nodeId   = m_transaction.getConnectedNodeId();
3359   const Uint32 seq      = m_transaction.theNodeSequence;
3360 
3361   /* This part needs to be done under mutex due to synchronization with
3362    * receiver thread.
3363    */
3364   PollGuard poll_guard(*ndb);
3365 
3366   if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3367   {
3368     setErrorCode(Err_NodeFailCausedAbort);
3369     return -1;  // Transporter disconnected and reconnected, no need to close
3370   }
3371 
3372   /* Wait for outstanding scan results from current batch fetch */
3373   while (m_pendingFrags > 0)
3374   {
3375     const FetchResult result = static_cast<FetchResult>
3376         (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
3377 
3378     if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3379       setFetchTerminated(Err_NodeFailCausedAbort,false);
3380     else if (unlikely(result != FetchResult_ok))
3381     {
3382       if (result == FetchResult_timeOut)
3383         setFetchTerminated(Err_ReceiveTimedOut,false);
3384       else
3385         setFetchTerminated(Err_NodeFailCausedAbort,false);
3386     }
3387     if (hasReceivedError())
3388     {
3389       break;
3390     }
3391   } // while
3392 
3393   assert(m_pendingFrags==0);
3394   NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
3395   m_errorReceived = 0;                         // Clear errors caused by previous fetching
3396   m_error.code = 0;
3397 
3398   if (m_finalBatchFrags < getRootFragCount())  // TC has an open scan cursor.
3399   {
3400     /* Send SCAN_NEXTREQ(close) */
3401     const int error = sendClose(m_transaction.getConnectedNodeId());
3402     if (unlikely(error))
3403       return error;
3404 
3405     assert(m_finalBatchFrags+m_pendingFrags==getRootFragCount());
3406 
3407     /* Wait for close to be confirmed: */
3408     while (m_pendingFrags > 0)
3409     {
3410       const FetchResult result = static_cast<FetchResult>
3411           (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
3412 
3413       if (unlikely(ndb->getNodeSequence(nodeId) != seq))
3414         setFetchTerminated(Err_NodeFailCausedAbort,false);
3415       else if (unlikely(result != FetchResult_ok))
3416       {
3417         if (result == FetchResult_timeOut)
3418           setFetchTerminated(Err_ReceiveTimedOut,false);
3419         else
3420           setFetchTerminated(Err_NodeFailCausedAbort,false);
3421       }
3422       if (hasReceivedError())
3423       {
3424         break;
3425       }
3426     } // while
3427   } // if
3428 
3429   return 0;
3430 } //NdbQueryImpl::closeTcCursor
3431 
3432 
3433 /*
3434  * This method is called with the PollGuard mutex held on the transporter.
3435  */
3436 int
sendClose(int nodeId)3437 NdbQueryImpl::sendClose(int nodeId)
3438 {
3439   assert(m_finalBatchFrags < getRootFragCount());
3440   m_pendingFrags = getRootFragCount() - m_finalBatchFrags;
3441 
3442   Ndb& ndb = *m_transaction.getNdb();
3443   NdbApiSignal tSignal(&ndb);
3444   tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(m_scanTransaction->m_tcRef));
3445   ScanNextReq * const scanNextReq = CAST_PTR(ScanNextReq, tSignal.getDataPtrSend());
3446 
3447   assert (m_scanTransaction);
3448   const Uint64 transId = m_scanTransaction->getTransactionId();
3449 
3450   scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
3451   scanNextReq->stopScan = true;
3452   scanNextReq->transId1 = (Uint32) transId;
3453   scanNextReq->transId2 = (Uint32) (transId >> 32);
3454   tSignal.setLength(ScanNextReq::SignalLength);
3455 
3456   NdbImpl * impl = ndb.theImpl;
3457   return impl->sendSignal(&tSignal, nodeId);
3458 
3459 } // NdbQueryImpl::sendClose()
3460 
3461 
isPrunable(bool & prunable)3462 int NdbQueryImpl::isPrunable(bool& prunable)
3463 {
3464   if (m_prunability == Prune_Unknown)
3465   {
3466     const int error = getRoot().getQueryOperationDef()
3467       .checkPrunable(m_keyInfo, m_shortestBound, prunable, m_pruneHashVal);
3468     if (unlikely(error != 0))
3469     {
3470       prunable = false;
3471       setErrorCode(error);
3472       return -1;
3473     }
3474     m_prunability = prunable ? Prune_Yes : Prune_No;
3475   }
3476   prunable = (m_prunability == Prune_Yes);
3477   return 0;
3478 }
3479 
3480 
3481 /****************
3482  * NdbQueryImpl::OrderedFragSet methods.
3483  ***************/
3484 
OrderedFragSet()3485 NdbQueryImpl::OrderedFragSet::OrderedFragSet():
3486   m_capacity(0),
3487   m_activeFragCount(0),
3488   m_fetchMoreFragCount(0),
3489   m_finalFragReceivedCount(0),
3490   m_finalFragConsumedCount(0),
3491   m_ordering(NdbQueryOptions::ScanOrdering_void),
3492   m_keyRecord(NULL),
3493   m_resultRecord(NULL),
3494   m_activeFrags(NULL),
3495   m_fetchMoreFrags(NULL)
3496 {
3497 }
3498 
~OrderedFragSet()3499 NdbQueryImpl::OrderedFragSet::~OrderedFragSet()
3500 {
3501   m_activeFrags = NULL;
3502   m_fetchMoreFrags = NULL;
3503 }
3504 
clear()3505 void NdbQueryImpl::OrderedFragSet::clear()
3506 {
3507   m_activeFragCount = 0;
3508   m_fetchMoreFragCount = 0;
3509 }
3510 
3511 void
prepare(NdbBulkAllocator & allocator,NdbQueryOptions::ScanOrdering ordering,int capacity,const NdbRecord * keyRecord,const NdbRecord * resultRecord)3512 NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
3513                                       NdbQueryOptions::ScanOrdering ordering,
3514                                       int capacity,
3515                                       const NdbRecord* keyRecord,
3516                                       const NdbRecord* resultRecord)
3517 {
3518   assert(m_activeFrags==NULL);
3519   assert(m_capacity==0);
3520   assert(ordering!=NdbQueryOptions::ScanOrdering_void);
3521 
3522   if (capacity > 0)
3523   {
3524     m_capacity = capacity;
3525 
3526     m_activeFrags =
3527       reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
3528     bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
3529 
3530     m_fetchMoreFrags =
3531       reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
3532     bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*));
3533   }
3534   m_ordering = ordering;
3535   m_keyRecord = keyRecord;
3536   m_resultRecord = resultRecord;
3537 } // OrderedFragSet::prepare()
3538 
3539 
3540 /**
3541  *  Get current RootFragment which to return results from.
3542  *  Logic relies on that ::reorganize() is called whenever the current
3543  *  RootFragment is advanced to next result. This will eliminate
3544  *  empty RootFragments from the OrderedFragSet object
3545  *
3546  */
3547 NdbRootFragment*
getCurrent() const3548 NdbQueryImpl::OrderedFragSet::getCurrent() const
3549 {
3550   if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
3551   {
3552     /**
3553      * Must have tuples for each (non-completed) fragment when doing ordered
3554      * scan.
3555      */
3556     if (unlikely(m_activeFragCount+m_finalFragConsumedCount < m_capacity))
3557     {
3558       return NULL;
3559     }
3560   }
3561 
3562   if (unlikely(m_activeFragCount==0))
3563   {
3564     return NULL;
3565   }
3566   else
3567   {
3568     assert(!m_activeFrags[m_activeFragCount-1]->isEmpty());
3569     return m_activeFrags[m_activeFragCount-1];
3570   }
3571 } // OrderedFragSet::getCurrent()
3572 
3573 /**
3574  *  Keep the FragSet ordered, both with respect to specified ScanOrdering, and
3575  *  such that RootFragments which becomes empty are removed from
3576  *  m_activeFrags[].
3577  *  Thus,  ::getCurrent() should be as lightweight as possible and only has
3578  *  to return the 'next' available from array wo/ doing any housekeeping.
3579  */
3580 void
reorganize()3581 NdbQueryImpl::OrderedFragSet::reorganize()
3582 {
3583   assert(m_activeFragCount > 0);
3584   NdbRootFragment* const frag = m_activeFrags[m_activeFragCount-1];
3585 
3586   // Remove the current fragment if the batch has been emptied.
3587   if (frag->isEmpty())
3588   {
3589     /**
3590      * MT-note: Although ::finalBatchReceived() normally requires mutex,
3591      * its safe to call it here wo/ mutex as:
3592      *
3593      *  - 'not hasRequestedMore()' guaranty that there can't be any
3594      *     receiver thread simultaneously accessing the mutex protected members.
3595      *  -  As this fragment has already been added to (the mutex protected)
3596      *     class OrderedFragSet, we know that the mutex has been
3597      *     previously set for this 'frag'. This would have resolved
3598      *     any cache coherency problems related to mt'ed access to
3599      *     'frag->finalBatchReceived()'.
3600      */
3601     if (!frag->hasRequestedMore() && frag->finalBatchReceived())
3602     {
3603       assert(m_finalFragReceivedCount > m_finalFragConsumedCount);
3604       m_finalFragConsumedCount++;
3605     }
3606 
3607     /**
3608      * Without doublebuffering we can't 'fetchMore' for fragments until
3609      * the current ResultSet has been consumed bu application.
3610      * (Compared to how ::prepareMoreResults() immediately 'fetchMore')
3611      */
3612     else if (!useDoubleBuffers)
3613     {
3614       m_fetchMoreFrags[m_fetchMoreFragCount++] = frag;
3615     }
3616     m_activeFragCount--;
3617   }
3618 
3619   // Reorder fragments if add'ed nonEmpty fragment to a sorted scan.
3620   else if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
3621   {
3622     /**
3623      * This is a sorted scan. There are more data to be read from
3624      * m_activeFrags[m_activeFragCount-1]. Move it to its proper place.
3625      *
3626      * Use binary search to find the largest record that is smaller than or
3627      * equal to m_activeFrags[m_activeFragCount-1].
3628      */
3629     int first = 0;
3630     int last = m_activeFragCount-1;
3631     int middle = (first+last)/2;
3632 
3633     while (first<last)
3634     {
3635       assert(middle<m_activeFragCount);
3636       const int cmpRes = compare(*frag, *m_activeFrags[middle]);
3637       if (cmpRes < 0)
3638       {
3639         first = middle + 1;
3640       }
3641       else if (cmpRes == 0)
3642       {
3643         last = first = middle;
3644       }
3645       else
3646       {
3647         last = middle;
3648       }
3649       middle = (first+last)/2;
3650     }
3651 
3652     // Move into correct sorted position
3653     if (middle < m_activeFragCount-1)
3654     {
3655       assert(compare(*frag, *m_activeFrags[middle]) >= 0);
3656       memmove(m_activeFrags+middle+1,
3657               m_activeFrags+middle,
3658               (m_activeFragCount - middle - 1) * sizeof(NdbRootFragment*));
3659       m_activeFrags[middle] = frag;
3660     }
3661     assert(verifySortOrder());
3662   }
3663   assert(m_activeFragCount+m_finalFragConsumedCount    <= m_capacity);
3664   assert(m_fetchMoreFragCount+m_finalFragReceivedCount <= m_capacity);
3665 } // OrderedFragSet::reorganize()
3666 
3667 void
add(NdbRootFragment & frag)3668 NdbQueryImpl::OrderedFragSet::add(NdbRootFragment& frag)
3669 {
3670   assert(m_activeFragCount+m_finalFragConsumedCount < m_capacity);
3671 
3672   m_activeFrags[m_activeFragCount++] = &frag;  // Add avail fragment
3673   reorganize();                                // Move into position
3674 } // OrderedFragSet::add()
3675 
3676 /**
3677  * Scan rootFrags[] for fragments which has received a ResultSet batch.
3678  * Add these to m_applFrags (Require mutex protection)
3679  */
3680 void
prepareMoreResults(NdbRootFragment rootFrags[],Uint32 cnt)3681 NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt)
3682 {
3683   for (Uint32 fragNo = 0; fragNo < cnt; fragNo++)
3684   {
3685     NdbRootFragment& rootFrag = rootFrags[fragNo];
3686     if (rootFrag.isEmpty() &&         // Current ResultSet is empty
3687         rootFrag.hasReceivedMore())   // Another ResultSet is available
3688     {
3689       if (rootFrag.finalBatchReceived())
3690       {
3691         m_finalFragReceivedCount++;
3692       }
3693       /**
3694        * When doublebuffered fetch is active:
3695        * Received fragment is a candidates for immediate prefetch.
3696        */
3697       else if (useDoubleBuffers)
3698       {
3699         m_fetchMoreFrags[m_fetchMoreFragCount++] = &rootFrag;
3700       } // useDoubleBuffers
3701 
3702       rootFrag.grabNextResultSet();   // Get new ResultSet.
3703       add(rootFrag);                  // Make avail. to appl. thread
3704     }
3705   } // for all 'rootFrags[]'
3706 
3707   assert(m_activeFragCount+m_finalFragConsumedCount    <= m_capacity);
3708   assert(m_fetchMoreFragCount+m_finalFragReceivedCount <= m_capacity);
3709 } // OrderedFragSet::prepareMoreResults()
3710 
3711 /**
3712  * Determine if a ::sendFetchMore() should be requested at this point.
3713  */
3714 Uint32
getFetchMore(NdbRootFragment ** & frags)3715 NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
3716 {
3717   /**
3718    * Decides (pre-)fetch strategy:
3719    *
3720    *  1) No doublebuffered ResultSets: Immediately request prefetch.
3721    *     (This is fetches related to 'isEmpty' fragments)
3722    *  2) If ordered ResultSets; Immediately request prefetch.
3723    *     (Need rows from all fragments to do sort-merge)
3724    *  3) When unordered, reduce #NEXTREQs to TC by avoid prefetch
3725    *     until there are pending request to all datanodes having more
3726    *     ResultSets
3727    */
3728   if (m_fetchMoreFragCount > 0 &&
3729       (!useDoubleBuffers  ||                                         // 1)
3730        m_ordering != NdbQueryOptions::ScanOrdering_unordered ||      // 2)
3731        m_fetchMoreFragCount+m_finalFragReceivedCount >= m_capacity)) // 3)
3732   {
3733     const int cnt = m_fetchMoreFragCount;
3734     frags = m_fetchMoreFrags;
3735     m_fetchMoreFragCount = 0;
3736     return cnt;
3737   }
3738   return 0;
3739 }
3740 
3741 bool
verifySortOrder() const3742 NdbQueryImpl::OrderedFragSet::verifySortOrder() const
3743 {
3744   for (int i = 0; i<m_activeFragCount-1; i++)
3745   {
3746     if (compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
3747     {
3748       assert(false);
3749       return false;
3750     }
3751   }
3752   return true;
3753 }
3754 
3755 /**
3756  * Compare frags such that f1<f2 if f1 is empty but f2 is not.
3757  * - Othewise compare record contents.
3758  * @return negative if frag1<frag2, 0 if frag1 == frag2, otherwise positive.
3759 */
3760 int
compare(const NdbRootFragment & frag1,const NdbRootFragment & frag2) const3761 NdbQueryImpl::OrderedFragSet::compare(const NdbRootFragment& frag1,
3762                                       const NdbRootFragment& frag2) const
3763 {
3764   assert(m_ordering!=NdbQueryOptions::ScanOrdering_unordered);
3765 
3766   /* f1<f2 if f1 is empty but f2 is not.*/
3767   if(frag1.isEmpty())
3768   {
3769     if(!frag2.isEmpty())
3770     {
3771       return -1;
3772     }
3773     else
3774     {
3775       return 0;
3776     }
3777   }
3778 
3779   /* Neither stream is empty so we must compare records.*/
3780   return compare_ndbrecord(&frag1.getResultStream(0).getReceiver(),
3781                            &frag2.getResultStream(0).getReceiver(),
3782                            m_keyRecord,
3783                            m_resultRecord,
3784                            m_ordering
3785                            == NdbQueryOptions::ScanOrdering_descending,
3786                            false);
3787 }
3788 
3789 
3790 
3791 ////////////////////////////////////////////////////
3792 /////////  NdbQueryOperationImpl methods ///////////
3793 ////////////////////////////////////////////////////
3794 
NdbQueryOperationImpl(NdbQueryImpl & queryImpl,const NdbQueryOperationDefImpl & def)3795 NdbQueryOperationImpl::NdbQueryOperationImpl(
3796            NdbQueryImpl& queryImpl,
3797            const NdbQueryOperationDefImpl& def):
3798   m_interface(*this),
3799   m_magic(MAGIC),
3800   m_queryImpl(queryImpl),
3801   m_operationDef(def),
3802   m_parent(NULL),
3803   m_children(0),
3804   m_maxBatchRows(0),   // >0: User specified prefered value, ==0: Use default CFG values
3805   m_params(),
3806   m_resultBuffer(NULL),
3807   m_resultRef(NULL),
3808   m_isRowNull(true),
3809   m_ndbRecord(NULL),
3810   m_read_mask(NULL),
3811   m_firstRecAttr(NULL),
3812   m_lastRecAttr(NULL),
3813   m_ordering(NdbQueryOptions::ScanOrdering_unordered),
3814   m_interpretedCode(NULL),
3815   m_diskInUserProjection(false),
3816   m_parallelism(def.getOpNo() == 0
3817                 ? Parallelism_max : Parallelism_adaptive),
3818   m_rowSize(0xffffffff),
3819   m_batchBufferSize(0xffffffff)
3820 {
3821   if (m_children.expand(def.getNoOfChildOperations()))
3822   {
3823     // Memory allocation during Vector::expand() failed.
3824     queryImpl.setErrorCode(Err_MemoryAlloc);
3825     return;
3826   }
3827   // Fill in operations parent refs, and append it as child of its parent
3828   const NdbQueryOperationDefImpl* parent = def.getParentOperation();
3829   if (parent != NULL)
3830   {
3831     const Uint32 ix = parent->getOpNo();
3832     assert (ix < m_queryImpl.getNoOfOperations());
3833     m_parent = &m_queryImpl.getQueryOperation(ix);
3834     const int res = m_parent->m_children.push_back(this);
3835     UNUSED(res);
3836     /**
3837       Enough memory should have been allocated when creating
3838       m_parent->m_children, so res!=0 should never happen.
3839     */
3840     assert(res == 0);
3841   }
3842   if (def.getType()==NdbQueryOperationDef::OrderedIndexScan)
3843   {
3844     const NdbQueryOptions::ScanOrdering defOrdering =
3845       static_cast<const NdbQueryIndexScanOperationDefImpl&>(def).getOrdering();
3846     if (defOrdering != NdbQueryOptions::ScanOrdering_void)
3847     {
3848       // Use value from definition, if one was set.
3849       m_ordering = defOrdering;
3850     }
3851   }
3852 }
3853 
~NdbQueryOperationImpl()3854 NdbQueryOperationImpl::~NdbQueryOperationImpl()
3855 {
3856   /**
3857    * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
3858    * Either by fetching through last row, or calling ::close() which forcefully terminates fetch
3859    */
3860   assert (m_firstRecAttr == NULL);
3861   assert (m_interpretedCode == NULL);
3862 } //NdbQueryOperationImpl::~NdbQueryOperationImpl()
3863 
3864 /**
3865  * Release what we want need anymore after last available row has been
3866  * returned from datanodes.
3867  */
3868 void
postFetchRelease()3869 NdbQueryOperationImpl::postFetchRelease()
3870 {
3871   Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
3872   NdbRecAttr* recAttr = m_firstRecAttr;
3873   while (recAttr != NULL) {
3874     NdbRecAttr* saveRecAttr = recAttr;
3875     recAttr = recAttr->next();
3876     ndb->releaseRecAttr(saveRecAttr);
3877   }
3878   m_firstRecAttr = NULL;
3879 
3880   // Set API exposed info to indicate NULL-row
3881   m_isRowNull = true;
3882   if (m_resultRef!=NULL) {
3883     *m_resultRef = NULL;
3884   }
3885 
3886   // TODO: Consider if interpretedCode can be deleted imm. after ::doSend
3887   delete m_interpretedCode;
3888   m_interpretedCode = NULL;
3889 } //NdbQueryOperationImpl::postFetchRelease()
3890 
3891 
3892 Uint32
getNoOfParentOperations() const3893 NdbQueryOperationImpl::getNoOfParentOperations() const
3894 {
3895   return (m_parent) ? 1 : 0;
3896 }
3897 
3898 NdbQueryOperationImpl&
getParentOperation(Uint32 i) const3899 NdbQueryOperationImpl::getParentOperation(Uint32 i) const
3900 {
3901   assert(i==0 && m_parent!=NULL);
3902   return *m_parent;
3903 }
3904 NdbQueryOperationImpl*
getParentOperation() const3905 NdbQueryOperationImpl::getParentOperation() const
3906 {
3907   return m_parent;
3908 }
3909 
3910 Uint32
getNoOfChildOperations() const3911 NdbQueryOperationImpl::getNoOfChildOperations() const
3912 {
3913   return m_children.size();
3914 }
3915 
3916 NdbQueryOperationImpl&
getChildOperation(Uint32 i) const3917 NdbQueryOperationImpl::getChildOperation(Uint32 i) const
3918 {
3919   return *m_children[i];
3920 }
3921 
getNoOfDescendantOperations() const3922 Int32 NdbQueryOperationImpl::getNoOfDescendantOperations() const
3923 {
3924   Int32 children = 0;
3925 
3926   for (unsigned i = 0; i < getNoOfChildOperations(); i++)
3927     children += 1 + getChildOperation(i).getNoOfDescendantOperations();
3928 
3929   return children;
3930 }
3931 
3932 Uint32
getNoOfLeafOperations() const3933 NdbQueryOperationImpl::getNoOfLeafOperations() const
3934 {
3935   if (getNoOfChildOperations() == 0)
3936   {
3937     return 1;
3938   }
3939   else
3940   {
3941     Uint32 sum = 0;
3942     for (unsigned i = 0; i < getNoOfChildOperations(); i++)
3943       sum += getChildOperation(i).getNoOfLeafOperations();
3944 
3945     return sum;
3946   }
3947 }
3948 
3949 NdbRecAttr*
getValue(const char * anAttrName,char * resultBuffer)3950 NdbQueryOperationImpl::getValue(
3951                             const char* anAttrName,
3952                             char* resultBuffer)
3953 {
3954   if (unlikely(anAttrName == NULL)) {
3955     getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
3956     return NULL;
3957   }
3958   const NdbColumnImpl* const column
3959     = m_operationDef.getTable().getColumn(anAttrName);
3960   if(unlikely(column==NULL)){
3961     getQuery().setErrorCode(Err_UnknownColumn);
3962     return NULL;
3963   } else {
3964     return getValue(*column, resultBuffer);
3965   }
3966 }
3967 
3968 NdbRecAttr*
getValue(Uint32 anAttrId,char * resultBuffer)3969 NdbQueryOperationImpl::getValue(
3970                             Uint32 anAttrId,
3971                             char* resultBuffer)
3972 {
3973   const NdbColumnImpl* const column
3974     = m_operationDef.getTable().getColumn(anAttrId);
3975   if(unlikely(column==NULL)){
3976     getQuery().setErrorCode(Err_UnknownColumn);
3977     return NULL;
3978   } else {
3979     return getValue(*column, resultBuffer);
3980   }
3981 }
3982 
3983 NdbRecAttr*
getValue(const NdbColumnImpl & column,char * resultBuffer)3984 NdbQueryOperationImpl::getValue(
3985                             const NdbColumnImpl& column,
3986                             char* resultBuffer)
3987 {
3988   if (unlikely(getQuery().m_state != NdbQueryImpl::Defined)) {
3989     int state = getQuery().m_state;
3990     assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
3991 
3992     if (state == NdbQueryImpl::Failed)
3993       getQuery().setErrorCode(QRY_IN_ERROR_STATE);
3994     else
3995       getQuery().setErrorCode(QRY_ILLEGAL_STATE);
3996     DEBUG_CRASH();
3997     return NULL;
3998   }
3999   Ndb* const ndb = getQuery().getNdbTransaction().getNdb();
4000   NdbRecAttr* const recAttr = ndb->getRecAttr();
4001   if(unlikely(recAttr == NULL)) {
4002     getQuery().setErrorCode(Err_MemoryAlloc);
4003     return NULL;
4004   }
4005   if(unlikely(recAttr->setup(&column, resultBuffer))) {
4006     ndb->releaseRecAttr(recAttr);
4007     getQuery().setErrorCode(Err_MemoryAlloc);
4008     return NULL;
4009   }
4010   // Append to tail of list.
4011   if(m_firstRecAttr == NULL){
4012     m_firstRecAttr = recAttr;
4013   }else{
4014     m_lastRecAttr->next(recAttr);
4015   }
4016   m_lastRecAttr = recAttr;
4017   assert(recAttr->next()==NULL);
4018   return recAttr;
4019 }
4020 
4021 int
setResultRowBuf(const NdbRecord * rec,char * resBuffer,const unsigned char * result_mask)4022 NdbQueryOperationImpl::setResultRowBuf (
4023                        const NdbRecord *rec,
4024                        char* resBuffer,
4025                        const unsigned char* result_mask)
4026 {
4027   if (unlikely(rec==0)) {
4028     getQuery().setErrorCode(QRY_REQ_ARG_IS_NULL);
4029     return -1;
4030   }
4031   if (unlikely(getQuery().m_state != NdbQueryImpl::Defined)) {
4032     int state = getQuery().m_state;
4033     assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
4034 
4035     if (state == NdbQueryImpl::Failed)
4036       getQuery().setErrorCode(QRY_IN_ERROR_STATE);
4037     else
4038       getQuery().setErrorCode(QRY_ILLEGAL_STATE);
4039     DEBUG_CRASH();
4040     return -1;
4041   }
4042   if (rec->tableId !=
4043       static_cast<Uint32>(m_operationDef.getTable().getTableId())){
4044     /* The key_record and attribute_record in primary key operation do not
4045        belong to the same table.*/
4046     getQuery().setErrorCode(Err_DifferentTabForKeyRecAndAttrRec);
4047     return -1;
4048   }
4049   if (unlikely(m_ndbRecord != NULL)) {
4050     getQuery().setErrorCode(QRY_RESULT_ROW_ALREADY_DEFINED);
4051     return -1;
4052   }
4053   m_ndbRecord = rec;
4054   m_read_mask = result_mask;
4055   m_resultBuffer = resBuffer;
4056   return 0;
4057 }
4058 
4059 int
setResultRowRef(const NdbRecord * rec,const char * & bufRef,const unsigned char * result_mask)4060 NdbQueryOperationImpl::setResultRowRef (
4061                        const NdbRecord* rec,
4062                        const char* & bufRef,
4063                        const unsigned char* result_mask)
4064 {
4065   m_resultRef = &bufRef;
4066   *m_resultRef = NULL; // No result row yet
4067   return setResultRowBuf(rec, NULL, result_mask);
4068 }
4069 
4070 NdbQuery::NextResultOutcome
firstResult()4071 NdbQueryOperationImpl::firstResult()
4072 {
4073   if (unlikely(getQuery().m_state < NdbQueryImpl::Executing ||
4074                getQuery().m_state >= NdbQueryImpl::Closed)) {
4075     int state = getQuery().m_state;
4076     assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
4077     if (state == NdbQueryImpl::Failed)
4078       getQuery().setErrorCode(QRY_IN_ERROR_STATE);
4079     else
4080       getQuery().setErrorCode(QRY_ILLEGAL_STATE);
4081     DEBUG_CRASH();
4082     return NdbQuery::NextResult_error;
4083   }
4084 
4085   const NdbRootFragment* rootFrag;
4086 
4087 #if 0  // TODO ::firstResult() on root operation is unused, incomplete & untested
4088   if (unlikely(getParentOperation()==NULL))
4089   {
4090     // Reset *all* ResultStreams, optionaly order them, and find new current among them
4091     for( Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++)
4092     {
4093       m_resultStreams[i]->firstResult();
4094     }
4095     rootFrag = m_queryImpl.m_applFrags.reorganize();
4096     assert(rootFrag==NULL || rootFrag==m_queryImpl.m_applFrags.getCurrent());
4097   }
4098   else
4099 #endif
4100 
4101   {
4102     assert(getParentOperation()!=NULL);  // TODO, See above
4103     rootFrag = m_queryImpl.m_applFrags.getCurrent();
4104   }
4105 
4106   if (rootFrag != NULL)
4107   {
4108     NdbResultStream& resultStream = rootFrag->getResultStream(*this);
4109     if (resultStream.firstResult() != tupleNotFound)
4110     {
4111       fetchRow(resultStream);
4112       return NdbQuery::NextResult_gotRow;
4113     }
4114   }
4115   nullifyResult();
4116   return NdbQuery::NextResult_scanComplete;
4117 } //NdbQueryOperationImpl::firstResult()
4118 
4119 
4120 NdbQuery::NextResultOutcome
nextResult(bool fetchAllowed,bool forceSend)4121 NdbQueryOperationImpl::nextResult(bool fetchAllowed, bool forceSend)
4122 {
4123   if (unlikely(getQuery().m_state < NdbQueryImpl::Executing ||
4124                getQuery().m_state >= NdbQueryImpl::Closed)) {
4125     int state = getQuery().m_state;
4126     assert (state >= NdbQueryImpl::Initial && state < NdbQueryImpl::Destructed);
4127     if (state == NdbQueryImpl::Failed)
4128       getQuery().setErrorCode(QRY_IN_ERROR_STATE);
4129     else
4130       getQuery().setErrorCode(QRY_ILLEGAL_STATE);
4131     DEBUG_CRASH();
4132     return NdbQuery::NextResult_error;
4133   }
4134 
4135   if (this == &getRoot())
4136   {
4137     return m_queryImpl.nextRootResult(fetchAllowed,forceSend);
4138   }
4139   /**
4140    * 'next' will never be able to return anything for a lookup operation.
4141    *  NOTE: This is a pure optimization shortcut!
4142    */
4143   else if (m_operationDef.isScanOperation())
4144   {
4145     const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent();
4146     if (rootFrag!=NULL)
4147     {
4148       NdbResultStream& resultStream = rootFrag->getResultStream(*this);
4149       if (resultStream.nextResult() != tupleNotFound)
4150       {
4151         fetchRow(resultStream);
4152         return NdbQuery::NextResult_gotRow;
4153       }
4154     }
4155   }
4156   nullifyResult();
4157   return NdbQuery::NextResult_scanComplete;
4158 } //NdbQueryOperationImpl::nextResult()
4159 
4160 
4161 void
fetchRow(NdbResultStream & resultStream)4162 NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
4163 {
4164   const char* buff = resultStream.getCurrentRow();
4165   assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
4166 
4167   m_isRowNull = false;
4168   if (m_firstRecAttr != NULL)
4169   {
4170     // Retrieve any RecAttr (getValues()) for current row
4171     const int retVal = resultStream.getReceiver().get_AttrValues(m_firstRecAttr);
4172     assert(retVal==0);  ((void)retVal);
4173   }
4174   if (m_ndbRecord != NULL)
4175   {
4176     if (m_resultRef!=NULL)
4177     {
4178       // Set application pointer to point into internal buffer.
4179       *m_resultRef = buff;
4180     }
4181     else
4182     {
4183       assert(m_resultBuffer!=NULL);
4184       // Copy result to buffer supplied by application.
4185       memcpy(m_resultBuffer, buff, m_ndbRecord->m_row_size);
4186     }
4187   }
4188 } // NdbQueryOperationImpl::fetchRow
4189 
4190 
4191 void
nullifyResult()4192 NdbQueryOperationImpl::nullifyResult()
4193 {
4194   if (!m_isRowNull)
4195   {
4196     /* This operation gave no result for the current row.*/
4197     m_isRowNull = true;
4198     if (m_resultRef!=NULL)
4199     {
4200       // Set the pointer supplied by the application to NULL.
4201       *m_resultRef = NULL;
4202     }
4203     /* We should not give any results for the descendants either.*/
4204     for (Uint32 i = 0; i<getNoOfChildOperations(); i++)
4205     {
4206       getChildOperation(i).nullifyResult();
4207    }
4208   }
4209 } // NdbQueryOperationImpl::nullifyResult
4210 
4211 bool
isRowNULL() const4212 NdbQueryOperationImpl::isRowNULL() const
4213 {
4214   return m_isRowNull;
4215 }
4216 
4217 bool
isRowChanged() const4218 NdbQueryOperationImpl::isRowChanged() const
4219 {
4220   // FIXME: Need to be implemented as scan linked with scan is now implemented.
4221   return true;
4222 }
4223 
isSetInMask(const unsigned char * mask,int bitNo)4224 static bool isSetInMask(const unsigned char* mask, int bitNo)
4225 {
4226   return mask[bitNo>>3] & 1<<(bitNo&7);
4227 }
4228 
4229 int
serializeProject(Uint32Buffer & attrInfo)4230 NdbQueryOperationImpl::serializeProject(Uint32Buffer& attrInfo)
4231 {
4232   Uint32 startPos = attrInfo.getSize();
4233   attrInfo.append(0U);  // Temp write firste 'length' word, update later
4234 
4235   /**
4236    * If the columns in the projections are specified as
4237    * in NdbRecord format, attrId are assumed to be ordered ascending.
4238    * In this form the projection spec. can be packed as
4239    * a single bitmap.
4240    */
4241   if (m_ndbRecord != NULL) {
4242     Bitmask<MAXNROFATTRIBUTESINWORDS> readMask;
4243     Uint32 requestedCols= 0;
4244     Uint32 maxAttrId= 0;
4245 
4246     for (Uint32 i= 0; i<m_ndbRecord->noOfColumns; i++)
4247     {
4248       const NdbRecord::Attr* const col= &m_ndbRecord->columns[i];
4249       Uint32 attrId= col->attrId;
4250 
4251       if (m_read_mask == NULL || isSetInMask(m_read_mask, i))
4252       { if (attrId > maxAttrId)
4253           maxAttrId= attrId;
4254 
4255         readMask.set(attrId);
4256         requestedCols++;
4257 
4258         const NdbColumnImpl* const column = getQueryOperationDef().getTable()
4259           .getColumn(col->column_no);
4260         if (column->getStorageType() == NDB_STORAGETYPE_DISK)
4261         {
4262           m_diskInUserProjection = true;
4263         }
4264       }
4265     }
4266 
4267     // Test for special case, get all columns:
4268     if (requestedCols == (unsigned)m_operationDef.getTable().getNoOfColumns()) {
4269       Uint32 ah;
4270       AttributeHeader::init(&ah, AttributeHeader::READ_ALL, requestedCols);
4271       attrInfo.append(ah);
4272     } else if (requestedCols > 0) {
4273       /* Serialize projection as a bitmap.*/
4274       const Uint32 wordCount = 1+maxAttrId/32; // Size of mask.
4275       Uint32* dst = attrInfo.alloc(wordCount+1);
4276       AttributeHeader::init(dst,
4277                             AttributeHeader::READ_PACKED, 4*wordCount);
4278       memcpy(dst+1, &readMask, 4*wordCount);
4279     }
4280   } // if (m_ndbRecord...)
4281 
4282   /** Projection is specified in RecAttr format.
4283    *  This may also be combined with the NdbRecord format.
4284    */
4285   const NdbRecAttr* recAttr = m_firstRecAttr;
4286   /* Serialize projection as a list of Attribute id's.*/
4287   while (recAttr) {
4288     Uint32 ah;
4289     AttributeHeader::init(&ah,
4290                           recAttr->attrId(),
4291                           0);
4292     attrInfo.append(ah);
4293     if (recAttr->getColumn()->getStorageType() == NDB_STORAGETYPE_DISK)
4294     {
4295       m_diskInUserProjection = true;
4296     }
4297     recAttr = recAttr->next();
4298   }
4299 
4300   bool withCorrelation = getRoot().getQueryDef().isScanQuery();
4301   if (withCorrelation) {
4302     Uint32 ah;
4303     AttributeHeader::init(&ah, AttributeHeader::CORR_FACTOR64, 0);
4304     attrInfo.append(ah);
4305   }
4306 
4307   // Size of projection in words.
4308   Uint32 length = attrInfo.getSize() - startPos - 1 ;
4309   attrInfo.put(startPos, length);
4310   return 0;
4311 } // NdbQueryOperationImpl::serializeProject
4312 
serializeParams(const NdbQueryParamValue * paramValues)4313 int NdbQueryOperationImpl::serializeParams(const NdbQueryParamValue* paramValues)
4314 {
4315   if (unlikely(paramValues == NULL))
4316   {
4317     return QRY_REQ_ARG_IS_NULL;
4318   }
4319 
4320   const NdbQueryOperationDefImpl& def = getQueryOperationDef();
4321   for (Uint32 i=0; i<def.getNoOfParameters(); i++)
4322   {
4323     const NdbParamOperandImpl& paramDef = def.getParameter(i);
4324     const NdbQueryParamValue& paramValue = paramValues[paramDef.getParamIx()];
4325 
4326     /**
4327      *  Add parameter value to serialized data.
4328      *  Each value has a Uint32 length field (in bytes), followed by
4329      *  the actuall value. Allocation is in Uint32 units with unused bytes
4330      *  zero padded.
4331      **/
4332     const Uint32 oldSize = m_params.getSize();
4333     m_params.append(0); // Place holder for length.
4334     bool null;
4335     Uint32 len;
4336     const int error =
4337       paramValue.serializeValue(*paramDef.getColumn(), m_params, len, null);
4338     if (unlikely(error))
4339       return error;
4340     if (unlikely(null))
4341       return Err_KeyIsNULL;
4342 
4343     if(unlikely(m_params.isMemoryExhausted())){
4344       return Err_MemoryAlloc;
4345     }
4346     // Back patch length field.
4347     m_params.put(oldSize, len);
4348   }
4349   return 0;
4350 } // NdbQueryOperationImpl::serializeParams
4351 
4352 Uint32
4353 NdbQueryOperationImpl
calculateBatchedRows(const NdbQueryOperationImpl * closestScan)4354 ::calculateBatchedRows(const NdbQueryOperationImpl* closestScan)
4355 {
4356   const NdbQueryOperationImpl* myClosestScan;
4357   if (m_operationDef.isScanOperation())
4358   {
4359     myClosestScan = this;
4360   }
4361   else
4362   {
4363     myClosestScan = closestScan;
4364   }
4365 
4366   Uint32 maxBatchRows = 0;
4367   if (myClosestScan != NULL)
4368   {
4369     // To force usage of SCAN_NEXTREQ even for small scans resultsets
4370     if (DBUG_EVALUATE_IF("max_4rows_in_spj_batches", true, false))
4371     {
4372       m_maxBatchRows = 4;
4373     }
4374     else if (DBUG_EVALUATE_IF("max_64rows_in_spj_batches", true, false))
4375     {
4376       m_maxBatchRows = 64;
4377     }
4378     else if (enforcedBatchSize)
4379     {
4380       m_maxBatchRows = enforcedBatchSize;
4381     }
4382 
4383     const Ndb& ndb = *getQuery().getNdbTransaction().getNdb();
4384 
4385     /**
4386      * For each batch, a lookup operation must be able to receive as many rows
4387      * as the closest ancestor scan operation.
4388      * We must thus make sure that we do not set a batch size for the scan
4389      * that exceeds what any of its scan descendants can use.
4390      *
4391      * Ignore calculated 'batchByteSize'
4392      * here - Recalculated when building signal after max-batchRows has been
4393      * determined.
4394      */
4395     Uint32 batchByteSize;
4396     /**
4397      * myClosestScan->m_maxBatchRows may be zero to indicate that we
4398      * should use default values, or non-zero if the application had an
4399      * explicit preference.
4400      */
4401     maxBatchRows = myClosestScan->m_maxBatchRows;
4402     NdbReceiver::calculate_batch_size(* ndb.theImpl,
4403                                       getRoot().m_parallelism
4404                                       == Parallelism_max
4405                                       ? m_queryImpl.getRootFragCount()
4406                                       : getRoot().m_parallelism,
4407                                       maxBatchRows,
4408                                       batchByteSize);
4409     assert(maxBatchRows > 0);
4410     assert(maxBatchRows <= batchByteSize);
4411   }
4412 
4413   // Find the largest value that is acceptable to all lookup descendants.
4414   for (Uint32 i = 0; i < m_children.size(); i++)
4415   {
4416     const Uint32 childMaxBatchRows =
4417       m_children[i]->calculateBatchedRows(myClosestScan);
4418     maxBatchRows = MIN(maxBatchRows, childMaxBatchRows);
4419   }
4420 
4421   if (m_operationDef.isScanOperation())
4422   {
4423     // Use this value for current op and all lookup descendants.
4424     m_maxBatchRows = maxBatchRows;
4425     // Return max(Unit32) to avoid interfering with batch size calculation
4426     // for parent.
4427     return 0xffffffff;
4428   }
4429   else
4430   {
4431     return maxBatchRows;
4432   }
4433 } // NdbQueryOperationImpl::calculateBatchedRows
4434 
4435 
4436 void
setBatchedRows(Uint32 batchedRows)4437 NdbQueryOperationImpl::setBatchedRows(Uint32 batchedRows)
4438 {
4439   if (!m_operationDef.isScanOperation())
4440   {
4441     /** Lookup operations should handle the same number of rows as
4442      * the closest scan ancestor.
4443      */
4444     m_maxBatchRows = batchedRows;
4445   }
4446 
4447   for (Uint32 i = 0; i < m_children.size(); i++)
4448   {
4449     m_children[i]->setBatchedRows(m_maxBatchRows);
4450   }
4451 }
4452 
4453 int
prepareAttrInfo(Uint32Buffer & attrInfo)4454 NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo)
4455 {
4456   const NdbQueryOperationDefImpl& def = getQueryOperationDef();
4457 
4458   /**
4459    * Serialize parameters refered by this NdbQueryOperation.
4460    * Params for the complete NdbQuery is collected in a single
4461    * serializedParams chunk. Each operations params are
4462    * proceeded by 'length' for this operation.
4463    */
4464   if (def.getType() == NdbQueryOperationDef::UniqueIndexAccess)
4465   {
4466     // Reserve memory for LookupParameters, fill in contents later when
4467     // 'length' and 'requestInfo' has been calculated.
4468     Uint32 startPos = attrInfo.getSize();
4469     attrInfo.alloc(QN_LookupParameters::NodeSize);
4470     Uint32 requestInfo = 0;
4471 
4472     if (m_params.getSize() > 0)
4473     {
4474       // parameter values has been serialized as part of NdbTransaction::createQuery()
4475       // Only need to append it to rest of the serialized arguments
4476       requestInfo |= DABits::PI_KEY_PARAMS;
4477       attrInfo.append(m_params);
4478     }
4479 
4480     QN_LookupParameters* param = reinterpret_cast<QN_LookupParameters*>(attrInfo.addr(startPos));
4481     if (unlikely(param==NULL))
4482        return Err_MemoryAlloc;
4483 
4484     param->requestInfo = requestInfo;
4485     param->resultData = getIdOfReceiver();
4486     Uint32 length = attrInfo.getSize() - startPos;
4487     if (unlikely(length > 0xFFFF)) {
4488       return QRY_DEFINITION_TOO_LARGE; //Query definition too large.
4489     }
4490     QueryNodeParameters::setOpLen(param->len,
4491                                   QueryNodeParameters::QN_LOOKUP,
4492                                   length);
4493 
4494 #ifdef __TRACE_SERIALIZATION
4495     ndbout << "Serialized params for index node "
4496            << getInternalOpNo()-1 << " : ";
4497     for(Uint32 i = startPos; i < attrInfo.getSize(); i++){
4498       char buf[12];
4499       sprintf(buf, "%.8x", attrInfo.get(i));
4500       ndbout << buf << " ";
4501     }
4502     ndbout << endl;
4503 #endif
4504   } // if (UniqueIndexAccess ...
4505 
4506   // Reserve memory for LookupParameters, fill in contents later when
4507   // 'length' and 'requestInfo' has been calculated.
4508   Uint32 startPos = attrInfo.getSize();
4509   Uint32 requestInfo = 0;
4510   bool isRoot = (def.getOpNo()==0);
4511 
4512   QueryNodeParameters::OpType paramType =
4513        !def.isScanOperation() ? QueryNodeParameters::QN_LOOKUP
4514            : (isRoot) ? QueryNodeParameters::QN_SCAN_FRAG
4515                       : QueryNodeParameters::QN_SCAN_INDEX;
4516 
4517   if (paramType == QueryNodeParameters::QN_SCAN_INDEX)
4518     attrInfo.alloc(QN_ScanIndexParameters::NodeSize);
4519   else if (paramType == QueryNodeParameters::QN_SCAN_FRAG)
4520     attrInfo.alloc(QN_ScanFragParameters::NodeSize);
4521   else
4522     attrInfo.alloc(QN_LookupParameters::NodeSize);
4523 
4524   // SPJ block assume PARAMS to be supplied before ATTR_LIST
4525   if (m_params.getSize() > 0 &&
4526       def.getType() != NdbQueryOperationDef::UniqueIndexAccess)
4527   {
4528     // parameter values has been serialized as part of NdbTransaction::createQuery()
4529     // Only need to append it to rest of the serialized arguments
4530     requestInfo |= DABits::PI_KEY_PARAMS;
4531     attrInfo.append(m_params);
4532   }
4533 
4534   if (hasInterpretedCode())
4535   {
4536     requestInfo |= DABits::PI_ATTR_INTERPRET;
4537     const int error= prepareInterpretedCode(attrInfo);
4538     if (unlikely(error))
4539     {
4540       return error;
4541     }
4542   }
4543 
4544   if (m_ndbRecord==NULL && m_firstRecAttr==NULL)
4545   {
4546     // Leaf operations with empty projections are not supported.
4547     if (getNoOfChildOperations() == 0)
4548     {
4549       return QRY_EMPTY_PROJECTION;
4550     }
4551   }
4552   else
4553   {
4554     requestInfo |= DABits::PI_ATTR_LIST;
4555     const int error = serializeProject(attrInfo);
4556     if (unlikely(error)) {
4557       return error;
4558     }
4559   }
4560 
4561   if (diskInUserProjection())
4562   {
4563     requestInfo |= DABits::PI_DISK_ATTR;
4564   }
4565 
4566   Uint32 length = attrInfo.getSize() - startPos;
4567   if (unlikely(length > 0xFFFF)) {
4568     return QRY_DEFINITION_TOO_LARGE; //Query definition too large.
4569   }
4570 
4571   if (paramType == QueryNodeParameters::QN_SCAN_INDEX)
4572   {
4573     QN_ScanIndexParameters* param = reinterpret_cast<QN_ScanIndexParameters*>(attrInfo.addr(startPos));
4574     if (unlikely(param==NULL))
4575       return Err_MemoryAlloc;
4576 
4577     Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
4578 
4579     Uint32 batchRows = getMaxBatchRows();
4580     Uint32 batchByteSize;
4581     NdbReceiver::calculate_batch_size(* ndb.theImpl,
4582                                       m_queryImpl.getRootFragCount(),
4583                                       batchRows,
4584                                       batchByteSize);
4585     assert(batchRows == getMaxBatchRows());
4586     assert(batchRows <= batchByteSize);
4587     assert(m_parallelism == Parallelism_max ||
4588            m_parallelism == Parallelism_adaptive);
4589     if (m_parallelism == Parallelism_max)
4590     {
4591       requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
4592     }
4593     if (def.hasParamInPruneKey())
4594     {
4595       requestInfo |= QN_ScanIndexParameters::SIP_PRUNE_PARAMS;
4596     }
4597     param->requestInfo = requestInfo;
4598     // Check that both values fit in param->batchSize.
4599     assert(getMaxBatchRows() < (1<<QN_ScanIndexParameters::BatchRowBits));
4600     assert(batchByteSize < (1 << (sizeof param->batchSize * 8
4601                                   - QN_ScanIndexParameters::BatchRowBits)));
4602     param->batchSize = (batchByteSize << 11) | getMaxBatchRows();
4603     param->resultData = getIdOfReceiver();
4604     QueryNodeParameters::setOpLen(param->len, paramType, length);
4605   }
4606   else if (paramType == QueryNodeParameters::QN_SCAN_FRAG)
4607   {
4608     QN_ScanFragParameters* param = reinterpret_cast<QN_ScanFragParameters*>(attrInfo.addr(startPos));
4609     if (unlikely(param==NULL))
4610       return Err_MemoryAlloc;
4611 
4612     param->requestInfo = requestInfo;
4613     param->resultData = getIdOfReceiver();
4614     QueryNodeParameters::setOpLen(param->len, paramType, length);
4615   }
4616   else
4617   {
4618     assert(paramType == QueryNodeParameters::QN_LOOKUP);
4619     QN_LookupParameters* param = reinterpret_cast<QN_LookupParameters*>(attrInfo.addr(startPos));
4620     if (unlikely(param==NULL))
4621       return Err_MemoryAlloc;
4622 
4623     param->requestInfo = requestInfo;
4624     param->resultData = getIdOfReceiver();
4625     QueryNodeParameters::setOpLen(param->len, paramType, length);
4626   }
4627 
4628 #ifdef __TRACE_SERIALIZATION
4629   ndbout << "Serialized params for node "
4630          << getInternalOpNo() << " : ";
4631   for(Uint32 i = startPos; i < attrInfo.getSize(); i++){
4632     char buf[12];
4633     sprintf(buf, "%.8x", attrInfo.get(i));
4634     ndbout << buf << " ";
4635   }
4636   ndbout << endl;
4637 #endif
4638 
4639   // Parameter values was appended to AttrInfo, shrink param buffer
4640   // to reduce memory footprint.
4641   m_params.releaseExtend();
4642 
4643   return 0;
4644 } // NdbQueryOperationImpl::prepareAttrInfo
4645 
4646 
4647 int
prepareKeyInfo(Uint32Buffer & keyInfo,const NdbQueryParamValue * actualParam)4648 NdbQueryOperationImpl::prepareKeyInfo(
4649                      Uint32Buffer& keyInfo,
4650                      const NdbQueryParamValue* actualParam)
4651 {
4652   assert(this == &getRoot()); // Should only be called for root operation.
4653 #ifdef TRACE_SERIALIZATION
4654   int startPos = keyInfo.getSize();
4655 #endif
4656 
4657   const NdbQueryOperationDefImpl::IndexBound* bounds = m_operationDef.getBounds();
4658   if (bounds)
4659   {
4660     const int error = prepareIndexKeyInfo(keyInfo, bounds, actualParam);
4661     if (unlikely(error))
4662       return error;
4663   }
4664 
4665   const NdbQueryOperandImpl* const* keys = m_operationDef.getKeyOperands();
4666   if (keys)
4667   {
4668     const int error = prepareLookupKeyInfo(keyInfo, keys, actualParam);
4669     if (unlikely(error))
4670       return error;
4671   }
4672 
4673   if (unlikely(keyInfo.isMemoryExhausted())) {
4674     return Err_MemoryAlloc;
4675   }
4676 
4677 #ifdef TRACE_SERIALIZATION
4678   ndbout << "Serialized KEYINFO for NdbQuery root : ";
4679   for (Uint32 i = startPos; i < keyInfo.getSize(); i++) {
4680     char buf[12];
4681     sprintf(buf, "%.8x", keyInfo.get(i));
4682     ndbout << buf << " ";
4683   }
4684   ndbout << endl;
4685 #endif
4686 
4687   return 0;
4688 } // NdbQueryOperationImpl::prepareKeyInfo
4689 
4690 
4691 /**
4692  * Convert constant operand into sequence of words that may be sent to data
4693  * nodes.
4694  * @param constOp Operand to convert.
4695  * @param buffer Destination buffer.
4696  * @param len Will be set to length in bytes.
4697  * @return 0 if ok, otherwise error code.
4698  */
4699 static int
serializeConstOp(const NdbConstOperandImpl & constOp,Uint32Buffer & buffer,Uint32 & len)4700 serializeConstOp(const NdbConstOperandImpl& constOp,
4701                  Uint32Buffer& buffer,
4702                  Uint32& len)
4703 {
4704   // Check that column->shrink_varchar() not specified, only used by mySQL
4705   // assert (!(column->flags & NdbDictionary::RecMysqldShrinkVarchar));
4706   buffer.skipRestOfWord();
4707   len = constOp.getSizeInBytes();
4708   Uint8 shortLen[2];
4709   switch (constOp.getColumn()->getArrayType()) {
4710     case NdbDictionary::Column::ArrayTypeFixed:
4711       buffer.appendBytes(constOp.getAddr(), len);
4712       break;
4713 
4714     case NdbDictionary::Column::ArrayTypeShortVar:
4715       // Such errors should have been caught in convert2ColumnType().
4716       assert(len <= 0xFF);
4717       shortLen[0] = (unsigned char)len;
4718       buffer.appendBytes(shortLen, 1);
4719       buffer.appendBytes(constOp.getAddr(), len);
4720       len+=1;
4721       break;
4722 
4723     case NdbDictionary::Column::ArrayTypeMediumVar:
4724       // Such errors should have been caught in convert2ColumnType().
4725       assert(len <= 0xFFFF);
4726       shortLen[0] = (unsigned char)(len & 0xFF);
4727       shortLen[1] = (unsigned char)(len >> 8);
4728       buffer.appendBytes(shortLen, 2);
4729       buffer.appendBytes(constOp.getAddr(), len);
4730       len+=2;
4731       break;
4732 
4733     default:
4734       assert(false);
4735   }
4736   if (unlikely(buffer.isMemoryExhausted())) {
4737     return Err_MemoryAlloc;
4738   }
4739   return 0;
4740 } // static serializeConstOp
4741 
4742 static int
appendBound(Uint32Buffer & keyInfo,NdbIndexScanOperation::BoundType type,const NdbQueryOperandImpl * bound,const NdbQueryParamValue * actualParam)4743 appendBound(Uint32Buffer& keyInfo,
4744             NdbIndexScanOperation::BoundType type, const NdbQueryOperandImpl* bound,
4745             const NdbQueryParamValue* actualParam)
4746 {
4747   Uint32 len = 0;
4748 
4749   keyInfo.append(type);
4750   const Uint32 oldSize = keyInfo.getSize();
4751   keyInfo.append(0); // Place holder for AttributeHeader
4752 
4753   switch(bound->getKind()){
4754   case NdbQueryOperandImpl::Const:
4755   {
4756     const NdbConstOperandImpl& constOp =
4757       static_cast<const NdbConstOperandImpl&>(*bound);
4758 
4759     const int error = serializeConstOp(constOp, keyInfo, len);
4760     if (unlikely(error))
4761       return error;
4762 
4763     break;
4764   }
4765   case NdbQueryOperandImpl::Param:
4766   {
4767     const NdbParamOperandImpl* const paramOp
4768       = static_cast<const NdbParamOperandImpl*>(bound);
4769     const int paramNo = paramOp->getParamIx();
4770     assert(actualParam != NULL);
4771 
4772     bool null;
4773     const int error =
4774       actualParam[paramNo].serializeValue(*paramOp->getColumn(), keyInfo,
4775                                           len, null);
4776     if (unlikely(error))
4777       return error;
4778     if (unlikely(null))
4779       return Err_KeyIsNULL;
4780     break;
4781   }
4782   case NdbQueryOperandImpl::Linked:    // Root operation cannot have linked operands.
4783   default:
4784     assert(false);
4785   }
4786 
4787   // Back patch attribute header.
4788   keyInfo.put(oldSize,
4789               AttributeHeader(bound->getColumn()->m_attrId, len).m_value);
4790 
4791   return 0;
4792 } // static appendBound()
4793 
4794 
4795 int
prepareIndexKeyInfo(Uint32Buffer & keyInfo,const NdbQueryOperationDefImpl::IndexBound * bounds,const NdbQueryParamValue * actualParam)4796 NdbQueryOperationImpl::prepareIndexKeyInfo(
4797                      Uint32Buffer& keyInfo,
4798                      const NdbQueryOperationDefImpl::IndexBound* bounds,
4799                      const NdbQueryParamValue* actualParam)
4800 {
4801   int startPos = keyInfo.getSize();
4802   if (bounds->lowKeys==0 && bounds->highKeys==0)  // No Bounds defined
4803     return 0;
4804 
4805   const unsigned key_count =
4806      (bounds->lowKeys >= bounds->highKeys) ? bounds->lowKeys : bounds->highKeys;
4807 
4808   for (unsigned keyNo = 0; keyNo < key_count; keyNo++)
4809   {
4810     NdbIndexScanOperation::BoundType bound_type;
4811 
4812     /* If upper and lower limit is equal, a single BoundEQ is sufficient */
4813     if (keyNo < bounds->lowKeys  &&
4814         keyNo < bounds->highKeys &&
4815         bounds->low[keyNo] == bounds->high[keyNo])
4816     {
4817       /* Inclusive if defined, or matching rows can include this value */
4818       bound_type= NdbIndexScanOperation::BoundEQ;
4819       int error = appendBound(keyInfo, bound_type, bounds->low[keyNo], actualParam);
4820       if (unlikely(error))
4821         return error;
4822 
4823     } else {
4824 
4825       /* If key is part of lower bound */
4826       if (keyNo < bounds->lowKeys)
4827       {
4828         /* Inclusive if defined, or matching rows can include this value */
4829         bound_type= bounds->lowIncl  || keyNo+1 < bounds->lowKeys ?
4830             NdbIndexScanOperation::BoundLE : NdbIndexScanOperation::BoundLT;
4831 
4832         int error = appendBound(keyInfo, bound_type, bounds->low[keyNo], actualParam);
4833         if (unlikely(error))
4834           return error;
4835       }
4836 
4837       /* If key is part of upper bound */
4838       if (keyNo < bounds->highKeys)
4839       {
4840         /* Inclusive if defined, or matching rows can include this value */
4841         bound_type= bounds->highIncl  || keyNo+1 < bounds->highKeys ?
4842             NdbIndexScanOperation::BoundGE : NdbIndexScanOperation::BoundGT;
4843 
4844         int error = appendBound(keyInfo, bound_type, bounds->high[keyNo], actualParam);
4845         if (unlikely(error))
4846           return error;
4847       }
4848     }
4849   }
4850 
4851   Uint32 length = keyInfo.getSize()-startPos;
4852   if (unlikely(keyInfo.isMemoryExhausted())) {
4853     return Err_MemoryAlloc;
4854   } else if (unlikely(length > 0xFFFF)) {
4855     return QRY_DEFINITION_TOO_LARGE; // Query definition too large.
4856   } else if (likely(length > 0)) {
4857     keyInfo.put(startPos, keyInfo.get(startPos) | (length << 16));
4858   }
4859 
4860   m_queryImpl.m_shortestBound =(bounds->lowKeys <= bounds->highKeys) ? bounds->lowKeys : bounds->highKeys;
4861   return 0;
4862 } // NdbQueryOperationImpl::prepareIndexKeyInfo
4863 
4864 
4865 int
prepareLookupKeyInfo(Uint32Buffer & keyInfo,const NdbQueryOperandImpl * const keys[],const NdbQueryParamValue * actualParam)4866 NdbQueryOperationImpl::prepareLookupKeyInfo(
4867                      Uint32Buffer& keyInfo,
4868                      const NdbQueryOperandImpl* const keys[],
4869                      const NdbQueryParamValue* actualParam)
4870 {
4871   const int keyCount = m_operationDef.getIndex()!=NULL ?
4872     static_cast<int>(m_operationDef.getIndex()->getNoOfColumns()) :
4873     m_operationDef.getTable().getNoOfPrimaryKeys();
4874 
4875   for (int keyNo = 0; keyNo<keyCount; keyNo++)
4876   {
4877     Uint32 dummy;
4878 
4879     switch(keys[keyNo]->getKind()){
4880     case NdbQueryOperandImpl::Const:
4881     {
4882       const NdbConstOperandImpl* const constOp
4883         = static_cast<const NdbConstOperandImpl*>(keys[keyNo]);
4884       const int error =
4885         serializeConstOp(*constOp, keyInfo, dummy);
4886       if (unlikely(error))
4887         return error;
4888 
4889       break;
4890     }
4891     case NdbQueryOperandImpl::Param:
4892     {
4893       const NdbParamOperandImpl* const paramOp
4894         = static_cast<const NdbParamOperandImpl*>(keys[keyNo]);
4895       int paramNo = paramOp->getParamIx();
4896       assert(actualParam != NULL);
4897 
4898       bool null;
4899       const int error =
4900         actualParam[paramNo].serializeValue(*paramOp->getColumn(), keyInfo,
4901                                             dummy, null);
4902 
4903       if (unlikely(error))
4904         return error;
4905       if (unlikely(null))
4906         return Err_KeyIsNULL;
4907       break;
4908     }
4909     case NdbQueryOperandImpl::Linked:    // Root operation cannot have linked operands.
4910     default:
4911       assert(false);
4912     }
4913   }
4914 
4915   if (unlikely(keyInfo.isMemoryExhausted())) {
4916     return Err_MemoryAlloc;
4917   }
4918 
4919   return 0;
4920 } // NdbQueryOperationImpl::prepareLookupKeyInfo
4921 
4922 
4923 bool
execTRANSID_AI(const Uint32 * ptr,Uint32 len)4924 NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
4925 {
4926   TupleCorrelation tupleCorrelation;
4927   NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
4928 
4929   if (getQueryDef().isScanQuery())
4930   {
4931     const CorrelationData correlData(ptr, len);
4932     const Uint32 receiverId = correlData.getRootReceiverId();
4933 
4934     /** receiverId holds the Id of the receiver of the corresponding stream
4935      * of the root operation. We can thus find the correct root fragment
4936      * number.
4937      */
4938     rootFrag =
4939       NdbRootFragment::receiverIdLookup(m_queryImpl.m_rootFrags,
4940                                         m_queryImpl.getRootFragCount(),
4941                                         receiverId);
4942     if (unlikely(rootFrag == NULL))
4943     {
4944       assert(false);
4945       return false;
4946     }
4947 
4948     // Extract tuple correlation.
4949     tupleCorrelation = correlData.getTupleCorrelation();
4950     len -= CorrelationData::wordCount;
4951   }
4952 
4953   if (traceSignals) {
4954     ndbout << "NdbQueryOperationImpl::execTRANSID_AI()"
4955            << ", fragment no: " << rootFrag->getFragNo()
4956            << ", operation no: " << getQueryOperationDef().getOpNo()
4957            << endl;
4958   }
4959 
4960   // Process result values.
4961   rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
4962   rootFrag->incrOutstandingResults(-1);
4963 
4964   bool ret = false;
4965   if (rootFrag->isFragBatchComplete())
4966   {
4967     ret = m_queryImpl.handleBatchComplete(*rootFrag);
4968   }
4969 
4970   if (false && traceSignals) {
4971     ndbout << "NdbQueryOperationImpl::execTRANSID_AI(): returns:" << ret
4972            << ", *this=" << *this <<  endl;
4973   }
4974   return ret;
4975 } //NdbQueryOperationImpl::execTRANSID_AI
4976 
4977 
4978 bool
execTCKEYREF(const NdbApiSignal * aSignal)4979 NdbQueryOperationImpl::execTCKEYREF(const NdbApiSignal* aSignal)
4980 {
4981   if (traceSignals) {
4982     ndbout << "NdbQueryOperationImpl::execTCKEYREF()" <<  endl;
4983   }
4984 
4985   /* The SPJ block does not forward TCKEYREFs for trees with scan roots.*/
4986   assert(!getQueryDef().isScanQuery());
4987 
4988   const TcKeyRef* ref = CAST_CONSTPTR(TcKeyRef, aSignal->getDataPtr());
4989   if (!getQuery().m_transaction.checkState_TransId(ref->transId))
4990   {
4991 #ifdef NDB_NO_DROPPED_SIGNAL
4992     abort();
4993 #endif
4994     return false;
4995   }
4996 
4997   // Suppress 'TupleNotFound' status for child operations.
4998   if (&getRoot() == this ||
4999       ref->errorCode != static_cast<Uint32>(Err_TupleNotFound))
5000   {
5001     if (aSignal->getLength() == TcKeyRef::SignalLength)
5002     {
5003       // Signal may contain additional error data
5004       getQuery().m_error.details = (char *)UintPtr(ref->errorData);
5005     }
5006     getQuery().setFetchTerminated(ref->errorCode,false);
5007   }
5008 
5009   NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
5010 
5011   /**
5012    * Error may be either a 'soft' or a 'hard' error.
5013    * 'Soft error' are regarded 'informational', and we are
5014    * allowed to continue execution of the query. A 'hard error'
5015    * will terminate query, close comminication, and further
5016    * incomming signals to this NdbReceiver will be discarded.
5017    */
5018   switch (ref->errorCode)
5019   {
5020   case Err_TupleNotFound:  // 'Soft error' : Row not found
5021   case Err_FalsePredicate: // 'Soft error' : Interpreter_exit_nok
5022   {
5023     /**
5024      * Need to update 'outstanding' count:
5025      * Compensate for children results not produced.
5026      * (doSend() assumed all child results to be materialized)
5027      */
5028     Uint32 cnt = 1; // self
5029     cnt += getNoOfDescendantOperations();
5030     if (getNoOfChildOperations() > 0)
5031     {
5032       cnt += getNoOfLeafOperations();
5033     }
5034     rootFrag.incrOutstandingResults(- Int32(cnt));
5035     break;
5036   }
5037   default:                             // 'Hard error':
5038     rootFrag.throwRemainingResults();  // Terminate receive -> complete
5039   }
5040 
5041   bool ret = false;
5042   if (rootFrag.isFragBatchComplete())
5043   {
5044     ret = m_queryImpl.handleBatchComplete(rootFrag);
5045   }
5046 
5047   if (traceSignals) {
5048     ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret
5049            << ", *this=" << *this <<  endl;
5050   }
5051   return ret;
5052 } //NdbQueryOperationImpl::execTCKEYREF
5053 
5054 bool
execSCAN_TABCONF(Uint32 tcPtrI,Uint32 rowCount,Uint32 nodeMask,NdbReceiver * receiver)5055 NdbQueryOperationImpl::execSCAN_TABCONF(Uint32 tcPtrI,
5056                                         Uint32 rowCount,
5057                                         Uint32 nodeMask,
5058                                         NdbReceiver* receiver)
5059 {
5060   assert((tcPtrI==RNIL && nodeMask==0) ||
5061          (tcPtrI!=RNIL && nodeMask!=0));
5062   assert(checkMagicNumber());
5063   // For now, only the root operation may be a scan.
5064   assert(&getRoot() == this);
5065   assert(m_operationDef.isScanOperation());
5066 
5067   NdbRootFragment* rootFrag =
5068     NdbRootFragment::receiverIdLookup(m_queryImpl.m_rootFrags,
5069                                       m_queryImpl.getRootFragCount(),
5070                                       receiver->getId());
5071   if (unlikely(rootFrag == NULL))
5072   {
5073     assert(false);
5074     return false;
5075   }
5076   // Prepare for SCAN_NEXTREQ, tcPtrI==RNIL, nodeMask==0 -> EOF
5077   rootFrag->setConfReceived(tcPtrI);
5078   rootFrag->setRemainingSubScans(nodeMask);
5079   rootFrag->incrOutstandingResults(rowCount);
5080 
5081   if(traceSignals){
5082     ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF"
5083            << " fragment no: " << rootFrag->getFragNo()
5084            << " rows " << rowCount
5085            << " nodeMask: H'" << hex << nodeMask << ")"
5086            << " tcPtrI " << tcPtrI
5087            << endl;
5088   }
5089 
5090   bool ret = false;
5091   if (rootFrag->isFragBatchComplete())
5092   {
5093     /* This fragment is now complete */
5094     ret = m_queryImpl.handleBatchComplete(*rootFrag);
5095   }
5096   if (false && traceSignals) {
5097     ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
5098            << ", tcPtrI=" << tcPtrI << " rowCount=" << rowCount
5099            << " *this=" << *this << endl;
5100   }
5101   return ret;
5102 } //NdbQueryOperationImpl::execSCAN_TABCONF
5103 
5104 int
setOrdering(NdbQueryOptions::ScanOrdering ordering)5105 NdbQueryOperationImpl::setOrdering(NdbQueryOptions::ScanOrdering ordering)
5106 {
5107   if (getQueryOperationDef().getType() != NdbQueryOperationDef::OrderedIndexScan)
5108   {
5109     getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5110     return -1;
5111   }
5112 
5113   if (m_parallelism != Parallelism_max)
5114   {
5115     getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
5116     return -1;
5117   }
5118 
5119   if(static_cast<const NdbQueryIndexScanOperationDefImpl&>
5120        (getQueryOperationDef())
5121      .getOrdering() != NdbQueryOptions::ScanOrdering_void)
5122   {
5123     getQuery().setErrorCode(QRY_SCAN_ORDER_ALREADY_SET);
5124     return -1;
5125   }
5126 
5127   m_ordering = ordering;
5128   return 0;
5129 } // NdbQueryOperationImpl::setOrdering()
5130 
setInterpretedCode(const NdbInterpretedCode & code)5131 int NdbQueryOperationImpl::setInterpretedCode(const NdbInterpretedCode& code)
5132 {
5133   if (code.m_instructions_length == 0)
5134   {
5135     return 0;
5136   }
5137 
5138   const NdbTableImpl& table = getQueryOperationDef().getTable();
5139   // Check if operation and interpreter code use the same table
5140   if (unlikely(table.getTableId() != code.getTable()->getTableId()
5141                || table_version_major(table.getObjectVersion()) !=
5142                table_version_major(code.getTable()->getObjectVersion())))
5143   {
5144     getQuery().setErrorCode(Err_InterpretedCodeWrongTab);
5145     return -1;
5146   }
5147 
5148   if (unlikely((code.m_flags & NdbInterpretedCode::Finalised)
5149                == 0))
5150   {
5151     //  NdbInterpretedCode::finalise() not called.
5152     getQuery().setErrorCode(Err_FinaliseNotCalled);
5153     return -1;
5154   }
5155 
5156   // Allocate an interpreted code object if we do not have one already.
5157   if (likely(m_interpretedCode == NULL))
5158   {
5159     m_interpretedCode = new NdbInterpretedCode();
5160 
5161     if (unlikely(m_interpretedCode==NULL))
5162     {
5163       getQuery().setErrorCode(Err_MemoryAlloc);
5164       return -1;
5165     }
5166   }
5167 
5168   /*
5169    * Make a deep copy, such that 'code' can be destroyed when this method
5170    * returns.
5171    */
5172   const int error = m_interpretedCode->copy(code);
5173   if (unlikely(error))
5174   {
5175     getQuery().setErrorCode(error);
5176     return -1;
5177   }
5178   return 0;
5179 } // NdbQueryOperationImpl::setInterpretedCode()
5180 
setParallelism(Uint32 parallelism)5181 int NdbQueryOperationImpl::setParallelism(Uint32 parallelism){
5182   if (!getQueryOperationDef().isScanOperation())
5183   {
5184     getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5185     return -1;
5186   }
5187   else if (getOrdering() == NdbQueryOptions::ScanOrdering_ascending ||
5188            getOrdering() == NdbQueryOptions::ScanOrdering_descending)
5189   {
5190     getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
5191     return -1;
5192   }
5193   else if (getQueryOperationDef().getOpNo() > 0)
5194   {
5195     getQuery().setErrorCode(Err_FunctionNotImplemented);
5196     return -1;
5197   }
5198   else if (parallelism < 1 || parallelism > NDB_PARTITION_MASK)
5199   {
5200     getQuery().setErrorCode(Err_ParameterError);
5201     return -1;
5202   }
5203   m_parallelism = parallelism;
5204   return 0;
5205 }
5206 
setMaxParallelism()5207 int NdbQueryOperationImpl::setMaxParallelism(){
5208   if (!getQueryOperationDef().isScanOperation())
5209   {
5210     getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5211     return -1;
5212   }
5213   m_parallelism = Parallelism_max;
5214   return 0;
5215 }
5216 
setAdaptiveParallelism()5217 int NdbQueryOperationImpl::setAdaptiveParallelism(){
5218   if (!getQueryOperationDef().isScanOperation())
5219   {
5220     getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5221     return -1;
5222   }
5223   else if (getQueryOperationDef().getOpNo() == 0)
5224   {
5225     getQuery().setErrorCode(Err_FunctionNotImplemented);
5226     return -1;
5227   }
5228   m_parallelism = Parallelism_adaptive;
5229   return 0;
5230 }
5231 
setBatchSize(Uint32 batchSize)5232 int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
5233   if (!getQueryOperationDef().isScanOperation())
5234   {
5235     getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
5236     return -1;
5237   }
5238   if (this != &getRoot() &&
5239       batchSize < getQueryOperationDef().getTable().getFragmentCount())
5240   {
5241     /** Each SPJ block instance will scan each fragment, so the batch size
5242      * cannot be smaller than the number of fragments.*/
5243     getQuery().setErrorCode(QRY_BATCH_SIZE_TOO_SMALL);
5244     return -1;
5245   }
5246   m_maxBatchRows = batchSize;
5247   return 0;
5248 }
5249 
5250 bool
hasInterpretedCode() const5251 NdbQueryOperationImpl::hasInterpretedCode() const
5252 {
5253   return (m_interpretedCode && m_interpretedCode->m_instructions_length > 0) ||
5254          (getQueryOperationDef().getInterpretedCode() != NULL);
5255 } // NdbQueryOperationImpl::hasInterpretedCode
5256 
5257 int
prepareInterpretedCode(Uint32Buffer & attrInfo) const5258 NdbQueryOperationImpl::prepareInterpretedCode(Uint32Buffer& attrInfo) const
5259 {
5260   const NdbInterpretedCode* interpretedCode =
5261     (m_interpretedCode && m_interpretedCode->m_instructions_length > 0)
5262      ? m_interpretedCode
5263      : getQueryOperationDef().getInterpretedCode();
5264 
5265   // There should be no subroutines in a filter.
5266   assert(interpretedCode->m_first_sub_instruction_pos==0);
5267   assert(interpretedCode->m_instructions_length > 0);
5268   assert(interpretedCode->m_instructions_length <= 0xffff);
5269 
5270   // Allocate space for program and length field.
5271   Uint32* const buffer =
5272     attrInfo.alloc(1+interpretedCode->m_instructions_length);
5273   if(unlikely(buffer==NULL))
5274   {
5275     return Err_MemoryAlloc;
5276   }
5277 
5278   buffer[0] = interpretedCode->m_instructions_length;
5279   memcpy(buffer+1,
5280          interpretedCode->m_buffer,
5281          interpretedCode->m_instructions_length * sizeof(Uint32));
5282   return 0;
5283 } // NdbQueryOperationImpl::prepareInterpretedCode
5284 
5285 
5286 Uint32
getIdOfReceiver() const5287 NdbQueryOperationImpl::getIdOfReceiver() const {
5288   NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0];
5289   return rootFrag.getResultStream(*this).getReceiver().getId();
5290 }
5291 
getRowSize() const5292 Uint32 NdbQueryOperationImpl::getRowSize() const
5293 {
5294   // Check if row size has been computed yet.
5295   if (m_rowSize == 0xffffffff)
5296   {
5297     m_rowSize =
5298       NdbReceiver::ndbrecord_rowsize(m_ndbRecord, false);
5299   }
5300   return m_rowSize;
5301 }
5302 
getBatchBufferSize() const5303 Uint32 NdbQueryOperationImpl::getBatchBufferSize() const
5304 {
5305   // Check if batch buffer size has been computed yet.
5306   if (m_batchBufferSize == 0xffffffff)
5307   {
5308     Uint32 batchRows = getMaxBatchRows();
5309     Uint32 batchByteSize = 0;
5310     Uint32 batchFrags = 1;
5311 
5312     if (m_operationDef.isScanOperation())
5313     {
5314       const Ndb* const ndb = getQuery().getNdbTransaction().getNdb();
5315       NdbReceiver::calculate_batch_size(* ndb->theImpl,
5316                                         getQuery().getRootFragCount(),
5317                                         batchRows,
5318                                         batchByteSize);
5319       assert(batchRows == getMaxBatchRows());
5320 
5321       /**
5322        * When LQH reads a scan batch, the size of the batch is limited
5323        * both to a maximal number of rows and a maximal number of bytes.
5324        * The latter limit is interpreted such that the batch ends when the
5325        * limit has been exceeded. Consequently, the buffer must be able to
5326        * hold max_no_of_bytes plus one extra row. In addition,  when the
5327        * SPJ block executes a (pushed) child scan operation, it scans a
5328        * number of fragments (possibly all) in parallel, and divides the
5329        * row and byte limits by the number of parallel fragments.
5330        * Consequently, a child scan operation may return max_no_of_bytes,
5331        * plus one extra row for each fragment.
5332        */
5333       if (getParentOperation() != NULL)
5334       {
5335         batchFrags = getQuery().getRootFragCount();
5336       }
5337     }
5338 
5339     AttributeMask readMask;
5340     if (m_ndbRecord != NULL)
5341     {
5342       m_ndbRecord->copyMask(readMask.rep.data, m_read_mask);
5343     }
5344 
5345     m_batchBufferSize = NdbReceiver::result_bufsize(
5346                                                   batchRows,
5347                                                   batchByteSize,
5348                                                   batchFrags,
5349                                                   m_ndbRecord,
5350                                                   readMask.rep.data,
5351                                                   m_firstRecAttr,
5352                                                   0, 0);
5353   }
5354   return m_batchBufferSize;
5355 }
5356 
5357 /** For debugging.*/
operator <<(NdbOut & out,const NdbQueryOperationImpl & op)5358 NdbOut& operator<<(NdbOut& out, const NdbQueryOperationImpl& op){
5359   out << "[ this: " << &op
5360       << "  m_magic: " << op.m_magic;
5361   out << " op.operationDef.getOpNo()"
5362       << op.m_operationDef.getOpNo();
5363   if (op.getParentOperation()){
5364     out << "  m_parent: " << op.getParentOperation();
5365   }
5366   for(unsigned int i = 0; i<op.getNoOfChildOperations(); i++){
5367     out << "  m_children[" << i << "]: " << &op.getChildOperation(i);
5368   }
5369   out << "  m_queryImpl: " << &op.m_queryImpl;
5370   out << "  m_operationDef: " << &op.m_operationDef;
5371   out << " m_isRowNull " << op.m_isRowNull;
5372   out << " ]";
5373   return out;
5374 }
5375 
operator <<(NdbOut & out,const NdbResultStream & stream)5376 NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){
5377   out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount();
5378   return out;
5379 }
5380 
5381 
5382 // Compiler settings require explicit instantiation.
5383 template class Vector<NdbQueryOperationImpl*>;
5384