1 /*
2    Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "API.hpp"
26 #include <AttributeHeader.hpp>
27 #include <signaldata/TcKeyConf.hpp>
28 #include <signaldata/DictTabInfo.hpp>
29 
30 
31 /**
32  * 'class NdbReceiveBuffer' takes care of buffering multi-row
33  * result sets received as the result of scan- or query- operations.
34  * Rows are stored in the 'raw' transporter format in the buffer,
35  * and later (only) the 'current row' is retrieved from the buffer,
36  * and unpacked into the full NdbRecord row format when navigated to.
37  */
38 class NdbReceiverBuffer
39 {
40 public:
41 
42   /**
43    * End of row and key buffer area has an 'eodMagic'
44    * as a debugging aid in detecting buffer overflows.
45    */
46   static const Uint32 eodMagic = 0xacbd1234;
47 
48   explicit NdbReceiverBuffer(Uint32 bufSizeBytes,
49                              Uint32 batchRows);
50 
reset()51   void reset()
52   { m_rows = m_keys = 0; }
53 
getBufSizeWords() const54   Uint32 getBufSizeWords() const
55   { return m_bufSizeWords; }
56 
getMaxRows() const57   Uint32 getMaxRows() const
58   { return m_maxRows; }
59 
getRowCount() const60   Uint32 getRowCount() const
61   { return m_rows; }
62 
getKeyCount() const63   Uint32 getKeyCount() const
64   { return m_keys; }
65 
66   /**
67    * Rows are buffered in the first part of 'm_buffer'. The first
68    * 'm_maxrows+1' Uint32s in the buffer is a row_ix[] containing
69    * buffer indexes, such that:
70    *  - Row 'n' starts at m_buffer[row_ix[n]].
71    *  - Length of row 'n' is 'row_ix[n+1] - row_ix[n].
72    * row_ix[] contains one more item than 'm_maxrows'. The item
73    * past the last row is maintained such that the length of last row
74    * can be calculated.
75    */
allocRow(Uint32 noOfWords)76   Uint32 *allocRow(Uint32 noOfWords)
77   {
78     assert(verifyBuffer());
79     const Uint32 pos = rowIx(m_rows);  //First free
80     rowIx(++m_rows) = pos + noOfWords; //Next free
81 #ifndef NDEBUG
82     m_buffer[pos + noOfWords] = eodMagic;
83     assert(verifyBuffer());
84 #endif
85     return &m_buffer[pos];
86   }
87 
88   /**
89    * Keys are allocated from the end of 'm_buffer', it grows and are
90    * indexed in *reverse order*. key_ix[] is allocated at the very end of
91    * the buffer:
92    *  - Key 'n' starts at m_buffer[key_ix[n]]
93    *  - Length of key 'n' is 'key_ix[n-1] - key_ix[n]'.
94    */
allocKey(Uint32 noOfWords)95   Uint32 *allocKey(Uint32 noOfWords)
96   {
97     assert(verifyBuffer());
98     const Uint32 prev = keyIx(m_keys-1);
99     const Uint32 pos = prev - noOfWords;
100     keyIx(m_keys) = pos;
101     m_keys++;
102 #ifndef NDEBUG
103     m_buffer[pos-1] = eodMagic;
104     assert(verifyBuffer());
105 #endif
106     return &m_buffer[pos];
107   }
108 
getRow(Uint32 row,Uint32 & noOfWords) const109   const Uint32 *getRow(Uint32 row, Uint32& noOfWords) const
110   {
111     assert(verifyBuffer());
112     if (unlikely(row >= m_rows))
113       return NULL;
114 
115     const Uint32 ix = rowIx(row);
116     noOfWords = rowIx(row+1) - ix;
117     assert(noOfWords < m_bufSizeWords);  // Sanity check
118     return m_buffer + ix;
119   }
120 
getKey(Uint32 key,Uint32 & noOfWords) const121   const Uint32 *getKey(Uint32 key, Uint32& noOfWords) const
122   {
123     assert(verifyBuffer());
124     if (unlikely(key >= m_keys))
125       return NULL;
126 
127     const Uint32 ix = keyIx(key);
128     noOfWords = keyIx(key-1) - ix;
129     assert(noOfWords < m_bufSizeWords);  // Sanity check
130     return m_buffer + ix;
131   }
132 
133   /**
134    * We know 'bufSizeWords', the total max size of data to store
135    * in the buffer. In addition there are some overhead required by
136    * the buffer management itself. Calculate total words required to
137    * be allocated for the NdbReceiverBuffer structure.
138    */
calculateSizeInWords(Uint32 bufSizeWords,Uint32 batchRows,Uint32 keySize)139   static Uint32 calculateSizeInWords(Uint32 bufSizeWords,
140                                      Uint32 batchRows,
141                                      Uint32 keySize)
142   {
143     return  bufSizeWords +          // Words to store
144             1 +                     // 'eodMagic' in buffer
145             headerWords +           // Admin overhead
146             ((keySize > 0)          // Row + optional key indexes
147               ? (batchRows+1) * 2   // Row + key indexes
148 	      : (batchRows+1));     // Row index only
149   }
150 
151 private:
152   static const Uint32 headerWords= 4; //4*Uint32's below
153 
154   // No copying / assignment:
155   NdbReceiverBuffer(const NdbReceiverBuffer&);
156   NdbReceiverBuffer& operator=(const NdbReceiverBuffer&);
157 
158   const Uint32 m_maxRows;       // Max capacity in #rows / #keys
159   const Uint32 m_bufSizeWords;  // Size of 'm_buffer'
160 
161   Uint32 m_rows;                // Current #rows in m_buffer
162   Uint32 m_keys;                // Current #keys in m_buffer
163 
164   Uint32 m_buffer[1];           // Variable size buffer area (m_bufSizeWords)
165 
166   /**
167    * Index to row offset is first 'maxrows' items in m_buffer.
168    * We maintain a 'next free row' position for all
169    * 'm_maxrows' in the buffer. Thus, this index array has
170    * to contain 'm_maxrows+1' items, indexed from [0..m_maxrows].
171    * This allows is to calculate data length for all rows
172    * as 'start(row+1) - start(row))
173    */
rowIx(Uint32 row) const174   Uint32  rowIx(Uint32 row) const { return m_buffer[row]; }
rowIx(Uint32 row)175   Uint32& rowIx(Uint32 row)       { return m_buffer[row]; }
176 
177   /**
178    * Index to key offset is last 'maxrows' items in m_buffer.
179    * We maintain a 'previous row start' position for all
180    * 'm_maxrows' in the buffer - even for 'key 0'.
181    * Thus, this index array has to contain 'm_maxrows+1'
182    * items, indexed from [-1..maxrows-1].
183    * This allows is to calculate data length for all keys
184    * as 'start(key-1) - start(key)).
185    */
186 
187   // 'm_bufSizeWords-2' is keyIx(0), that place keyIx(-1) at 'm_bufSizeWords-1'
keyIx(Uint32 key) const188   Uint32  keyIx(Uint32 key) const { return m_buffer[m_bufSizeWords-2-key]; }
keyIx(Uint32 key)189   Uint32& keyIx(Uint32 key)       { return m_buffer[m_bufSizeWords-2-key]; }
190 
verifyBuffer() const191   bool verifyBuffer() const
192   {
193     assert(m_rows <= m_maxRows);
194     assert(m_keys <= m_maxRows);
195     // Check rows startpos and end within buffer
196     assert(rowIx(0) == m_maxRows+1);
197     assert(rowIx(m_rows) <= m_bufSizeWords);
198 
199     // Rest of rows in sequence with non-negative length
200     for (Uint32 row=0; row<m_rows; row++)
201     {
202       assert(rowIx(row) <= rowIx(row+1));
203     }
204     // Overflow protection
205     assert(m_rows == 0 ||
206            m_buffer[rowIx(m_rows)] == eodMagic);
207 
208     if (m_keys > 0)
209     {
210       // Check keys startpos and end before row buffer
211       assert(keyIx(-1) == (m_bufSizeWords - (m_maxRows+1)));
212       assert(keyIx(m_keys-1) >= rowIx(m_rows));
213 
214       // Rest of keys in sequence with non-negative length
215       for (Uint32 key=0; key<m_keys; key++)
216       {
217         assert(keyIx(key) <= keyIx(key-1));
218       }
219 
220       // Overflow protection
221       assert(m_buffer[keyIx(m_keys-1)-1] == eodMagic);
222     }
223     return true;
224   }
225 
226 }; //class NdbReceiverBuffer
227 
NdbReceiverBuffer(Uint32 bufSizeBytes,Uint32 batchRows)228 NdbReceiverBuffer::NdbReceiverBuffer(
229 			     Uint32 bufSizeBytes, // Word aligned size
230                              Uint32 batchRows)
231   : m_maxRows(batchRows),
232     m_bufSizeWords((bufSizeBytes/sizeof(Uint32)) - headerWords),
233     m_rows(0),
234     m_keys(0)
235 {
236   assert((bufSizeBytes/sizeof(Uint32)) > headerWords);
237 
238   /**
239    * Init row and key index arrays. Row indexes maintain
240    * a 'next free row' position which for rows start imm.
241    * after the 'm_maxrows+1' indexes.
242    */
243   rowIx(0) = m_maxRows+1;
244 
245   /**
246    * Key indexes maintain a 'prev key startpos', even for key(0).
247    * Thus, for an empty key_ix[], we set startpos for
248    * (the non-existing) key(-1) which is imm. after the
249    * available key buffer area.
250    *
251    * NOTE: We init key_ix[] even if keyinfo not present
252    * in result set. that case it might later be overwritten
253    * by rows, which is ok as the keyinfo is then never used.
254    */
255   keyIx(-1) = m_bufSizeWords - (m_maxRows+1);
256   assert(verifyBuffer());
257 }
258 
259 
260 /**
261  * 'BEFORE' is used as the initial position before having a
262  * valid 'current' row. Beware, wraparound is assumed such
263  * that ' beforeFirstRow+1' -> 0 (first row)
264  */
265 static const Uint32 beforeFirstRow = 0xFFFFFFFF;
266 
267 static
268 const Uint8*
269 pad(const Uint8* src, Uint32 align, Uint32 bitPos);
270 
NdbReceiver(Ndb * aNdb)271 NdbReceiver::NdbReceiver(Ndb *aNdb) :
272   theMagicNumber(0),
273   m_ndb(aNdb),
274   m_id(NdbObjectIdMap::InvalidId),
275   m_tcPtrI(RNIL),
276   m_type(NDB_UNINITIALIZED),
277   m_owner(NULL),
278   m_ndb_record(NULL),
279   m_row_buffer(NULL),
280   m_recv_buffer(NULL),
281   m_read_range_no(false),
282   m_read_key_info(false),
283   m_firstRecAttr(NULL),
284   m_lastRecAttr(NULL),
285   m_rec_attr_data(NULL),
286   m_rec_attr_len(0),
287   m_current_row(beforeFirstRow),
288   m_expected_result_length(0),
289   m_received_result_length(0)
290 {}
291 
~NdbReceiver()292 NdbReceiver::~NdbReceiver()
293 {
294   DBUG_ENTER("NdbReceiver::~NdbReceiver");
295   if (m_id != NdbObjectIdMap::InvalidId) {
296     m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
297   }
298   DBUG_VOID_RETURN;
299 }
300 
301 //static
302 NdbReceiverBuffer*
initReceiveBuffer(Uint32 * buffer,Uint32 bufSizeBytes,Uint32 batchRows)303 NdbReceiver::initReceiveBuffer(Uint32 *buffer,      // Uint32 aligned buffer
304                                Uint32 bufSizeBytes, // Size, from ::result_bufsize()
305                                Uint32 batchRows)
306 {
307   assert(((UintPtr)buffer % sizeof(Uint32)) == 0);   //Is Uint32 aligned
308 
309   return new(buffer) NdbReceiverBuffer(bufSizeBytes, batchRows);
310 }
311 
312 int
init(ReceiverType type,void * owner)313 NdbReceiver::init(ReceiverType type, void* owner)
314 {
315   theMagicNumber = getMagicNumber();
316   m_type = type;
317   m_owner = owner;
318   m_ndb_record= NULL;
319   m_row_buffer= NULL;
320   m_recv_buffer= NULL;
321   m_read_range_no= false;
322   m_read_key_info= false;
323   m_firstRecAttr = NULL;
324   m_lastRecAttr = NULL;
325   m_rec_attr_data = NULL;
326   m_rec_attr_len = 0;
327 
328   if (m_id == NdbObjectIdMap::InvalidId)
329   {
330     if (m_ndb)
331     {
332       m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
333       if (m_id == NdbObjectIdMap::InvalidId)
334       {
335         setErrorCode(4000);
336         return -1;
337       }
338     }
339   }
340   return 0;
341 }
342 
343 void
do_setup_ndbrecord(const NdbRecord * ndb_record,char * row_buffer,bool read_range_no,bool read_key_info)344 NdbReceiver::do_setup_ndbrecord(const NdbRecord *ndb_record,
345                                 char *row_buffer,
346                                 bool read_range_no, bool read_key_info)
347 {
348   m_ndb_record= ndb_record;
349   m_row_buffer= row_buffer;
350   m_recv_buffer= NULL;
351   m_read_range_no= read_range_no;
352   m_read_key_info= read_key_info;
353 }
354 
355 void
release()356 NdbReceiver::release()
357 {
358   theMagicNumber = 0;
359   NdbRecAttr* tRecAttr = m_firstRecAttr;
360   while (tRecAttr != NULL)
361   {
362     NdbRecAttr* tSaveRecAttr = tRecAttr;
363     tRecAttr = tRecAttr->next();
364     m_ndb->releaseRecAttr(tSaveRecAttr);
365   }
366   m_firstRecAttr = NULL;
367   m_lastRecAttr = NULL;
368   m_rec_attr_data = NULL;
369   m_rec_attr_len = 0;
370   m_ndb_record= NULL;
371   m_row_buffer= NULL;
372   m_recv_buffer= NULL;
373 }
374 
375 NdbRecAttr *
getValue(const NdbColumnImpl * tAttrInfo,char * user_dst_ptr)376 NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr)
377 {
378   NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
379   if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
380     if (m_firstRecAttr == NULL)
381       m_firstRecAttr = tRecAttr;
382     else
383       m_lastRecAttr->next(tRecAttr);
384     m_lastRecAttr = tRecAttr;
385     tRecAttr->next(NULL);
386     return tRecAttr;
387   }
388   if(tRecAttr){
389     m_ndb->releaseRecAttr(tRecAttr);
390   }
391   return 0;
392 }
393 
394 void
getValues(const NdbRecord * rec,char * row_ptr)395 NdbReceiver::getValues(const NdbRecord* rec, char *row_ptr)
396 {
397   assert(m_recv_buffer == NULL);
398   assert(rec != NULL);
399 
400   m_ndb_record= rec;
401   m_row_buffer= row_ptr;
402 }
403 
404 void
prepareSend()405 NdbReceiver::prepareSend()
406 {
407   /* Set pointers etc. to prepare for receiving the first row of the batch. */
408   theMagicNumber = 0x11223344;
409   m_current_row = beforeFirstRow;
410   m_received_result_length = 0;
411   m_expected_result_length = 0;
412 
413   if (m_recv_buffer != NULL)
414   {
415     m_recv_buffer->reset();
416   }
417 }
418 
419 void
prepareReceive(NdbReceiverBuffer * buffer)420 NdbReceiver::prepareReceive(NdbReceiverBuffer *buffer)
421 {
422   m_recv_buffer= buffer;
423   prepareSend();
424 }
425 
426 /*
427   Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
428   use, taking into account limits in the transporter, user preference, etc.
429 
430   It is the responsibility of the batch producer (LQH+TUP) to
431   stay within these 'batch_size' and 'batch_byte_size' limits.:
432 
433   - It should stay strictly within the 'batch_size' (#rows) limit.
434   - It is allowed to overallocate the 'batch_byte_size' (slightly)
435     in order to complete the current row when it hit the limit.
436     (Upto ::packed_rowsize())
437 
438   The client should be prepared to receive, and buffer, upto
439   'batch_size' rows from each fragment.
440   ::ndbrecord_bufsize() might be usefull for calculating the
441   buffersize to allocate for this resultset.
442 */
443 //static
444 void
calculate_batch_size(const NdbImpl & theImpl,Uint32 parallelism,Uint32 & batch_size,Uint32 & batch_byte_size)445 NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
446                                   Uint32 parallelism,
447                                   Uint32& batch_size,
448                                   Uint32& batch_byte_size)
449 {
450   const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
451   const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
452   const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
453   const Uint32 max_batch_size= cfg.m_batch_size;
454 
455   batch_byte_size= max_batch_byte_size;
456   if (batch_byte_size * parallelism > max_scan_batch_size) {
457     batch_byte_size= max_scan_batch_size / parallelism;
458   }
459 
460   if (batch_size == 0 || batch_size > max_batch_size) {
461     batch_size= max_batch_size;
462   }
463   if (unlikely(batch_size > MAX_PARALLEL_OP_PER_SCAN)) {
464     batch_size= MAX_PARALLEL_OP_PER_SCAN;
465   }
466   if (unlikely(batch_size > batch_byte_size)) {
467     batch_size= batch_byte_size;
468   }
469 
470   return;
471 }
472 
473 void
calculate_batch_size(Uint32 parallelism,Uint32 & batch_size,Uint32 & batch_byte_size) const474 NdbReceiver::calculate_batch_size(Uint32 parallelism,
475                                   Uint32& batch_size,
476                                   Uint32& batch_byte_size) const
477 {
478   calculate_batch_size(* m_ndb->theImpl,
479                        parallelism,
480                        batch_size,
481                        batch_byte_size);
482 }
483 
484 //static
485 Uint32
ndbrecord_rowsize(const NdbRecord * result_record,bool read_range_no)486 NdbReceiver::ndbrecord_rowsize(const NdbRecord *result_record,
487                                bool read_range_no)
488 {
489   // Unpacked NdbRecords are stored in its full unprojected form
490   Uint32 rowsize= (result_record)
491                  ? result_record->m_row_size
492                  : 0;
493 
494   // After unpack, the optional RANGE_NO is stored as an Uint32
495   if (read_range_no)
496     rowsize+= sizeof(Uint32);
497 
498   return (rowsize+3) & 0xfffffffc;
499 }
500 
501 /**
502  * Calculate max size (In Uint32 words) of a 'packed' result row,
503  * including optional 'keyinfo' and 'range_no'.
504  */
505 static
packed_rowsize(const NdbRecord * result_record,const Uint32 * read_mask,const NdbRecAttr * first_rec_attr,Uint32 keySizeWords,bool read_range_no)506 Uint32 packed_rowsize(const NdbRecord *result_record,
507                       const Uint32* read_mask,
508                       const NdbRecAttr *first_rec_attr,
509                       Uint32 keySizeWords,
510                       bool read_range_no)
511 {
512   Uint32 nullCount = 0;
513   Uint32 bitPos = 0;
514   const Uint8 *pos = NULL;
515 
516   if (likely(result_record != NULL))
517   {
518     for (Uint32 i= 0; i<result_record->noOfColumns; i++)
519     {
520       const NdbRecord::Attr *col= &result_record->columns[i];
521       const Uint32 attrId= col->attrId;
522 
523       /* Skip column if result_mask says so and we don't need
524        * to read it
525        */
526       if (BitmaskImpl::get(MAXNROFATTRIBUTESINWORDS, read_mask, attrId))
527       {
528         const Uint32 align = col->orgAttrSize;
529 
530         switch(align){
531         case DictTabInfo::aBit:
532           pos = pad(pos, 0, 0);
533           bitPos += col->bitCount;
534           pos += 4 * (bitPos / 32);
535           bitPos = (bitPos % 32);
536           break;
537         default:
538           pos = pad(pos, align, bitPos);
539           bitPos = 0;
540           pos += col->maxSize;
541           break;
542         }
543 
544         if (col->flags & NdbRecord::IsNullable)
545           nullCount++;
546       }
547     }
548   }
549   Uint32 sizeInWords = (Uint32)(((Uint32*)pad(pos, 0, bitPos) - (Uint32*)NULL));
550 
551   // Add AttributeHeader::READ_PACKED or ::READ_ALL (Uint32) and
552   // variable size bitmask the 'packed' columns and their null bits.
553   if (sizeInWords > 0)
554   {
555     const Uint32 attrCount= result_record->columns[result_record->noOfColumns -1].attrId+1;
556     const Uint32 sigBitmaskWords= ((attrCount+nullCount+31)>>5);
557     sizeInWords += (1+sigBitmaskWords);   //AttrHeader + bitMask
558   }
559 
560   // The optional RANGE_NO is transfered and stored in buffer
561   // as AttributeHeader::RANGE_NO + an Uint32 'range_no'
562   if (read_range_no)
563   {
564     sizeInWords += 2;
565   }
566 
567   // KeyInfo is transfered in a seperate signal,
568   // and is stored in the packed buffer together with 'info' word
569   if (keySizeWords > 0)
570   {
571     sizeInWords+= keySizeWords+1;
572   }
573 
574   /* Add extra needed to transfer RecAttrs requested by getValue() */
575   const NdbRecAttr *ra= first_rec_attr;
576   while (ra != NULL)
577   {
578     // AttrHeader + max column size. Aligned to word boundary
579     sizeInWords+= 1 + ((ra->getColumn()->getSizeInBytes() + 3) / 4);
580     ra= ra->next();
581   }
582 
583   return sizeInWords;
584 }
585 
586 /**
587  * Calculate max size (In Uint32 words) of a buffer containing
588  * 'batch_rows' of packed result rows. Size also include
589  * overhead required by the NdbReceiverBuffer itself.
590  */
591 static
ndbrecord_bufsize(Uint32 batchRows,Uint32 batchBytes,Uint32 fragments,Uint32 rowSizeWords,Uint32 keySizeWords)592 Uint32 ndbrecord_bufsize(Uint32 batchRows,
593                          Uint32 batchBytes,      //Optional upper limit of batch size
594                          Uint32 fragments,       //Frags handled by this receiver (>= 1)
595                          Uint32 rowSizeWords,    //Packed size, from ::packed_rowsize()
596                          Uint32 keySizeWords)    //In words
597 {
598   /**
599    * Size of batch is either limited by max 'batchRows' fetched,
600    * or when the total 'batchBytes' limit is reached. In the
601    * later case we are allowed to over-allocate by allowing
602    * each fragment delivering to this NdbReceiver to complete
603    * the current row / key. If KeyInfo is requested, an additional
604    *'info' word is also added for each key which comes in addition
605    *  to the 'batch_byte' limit
606    */
607   assert(batchRows > 0);
608   Uint32 batchSizeWords = batchRows * rowSizeWords;
609 
610   // Result set size may be limited by 'batch_words' if specified.
611   if (batchBytes > 0)
612   {
613     const Uint32 batchWords = (batchBytes+3) / sizeof(Uint32);
614     // Batch size + over alloc last row + info word if keyinfo20
615     const Uint32 batchSizeLimit = batchWords
616                                 + (rowSizeWords * fragments)
617                                 + ((keySizeWords > 0) ? batchRows : 0);
618 
619     if (batchSizeWords > batchSizeLimit)
620       batchSizeWords = batchSizeLimit;
621   }
622 
623   return NdbReceiverBuffer::calculateSizeInWords(batchSizeWords, batchRows, keySizeWords);
624 }
625 
626 //static
627 Uint32
result_bufsize(Uint32 batchRows,Uint32 batchBytes,Uint32 fragments,const NdbRecord * result_record,const Uint32 * read_mask,const NdbRecAttr * first_rec_attr,Uint32 keySizeWords,bool read_range_no)628 NdbReceiver::result_bufsize(Uint32 batchRows,
629                             Uint32 batchBytes,
630                             Uint32 fragments,
631                             const NdbRecord *result_record,
632                             const Uint32* read_mask,
633                             const NdbRecAttr *first_rec_attr,
634                             Uint32 keySizeWords,
635                             bool read_range_no)
636 {
637   const Uint32 rowSizeWords= packed_rowsize(
638                                        result_record,
639                                        read_mask,
640                                        first_rec_attr,
641                                        keySizeWords,
642                                        read_range_no);
643 
644   const Uint32 bufSizeWords= ndbrecord_bufsize(
645                                           batchRows,
646                                           batchBytes,
647                                           fragments,
648                                           rowSizeWords,
649                                           keySizeWords);
650 
651   // bufsize is in word, return as bytes
652   return bufSizeWords * sizeof(Uint32);
653 }
654 
655 /**
656  * pad
657  * This function determines how much 'padding' should be applied
658  * to the passed in pointer and bitPos to get to the start of a
659  * field with the passed in alignment.
660  * The rules are :
661  *   - First bit field is 32-bit aligned
662  *   - Subsequent bit fields are packed in the next available bits
663  *   - 8 and 16 bit aligned fields are packed in the next available
664  *     word (but not necessarily word aligned.
665  *   - 32, 64 and 128 bit aligned fields are packed in the next
666  *     aligned 32-bit word.
667  * This algorithm is used to unpack a stream of fields packed by the code
668  * in src/kernel/blocks/dbtup/DbtupRoutines::read_packed()
669  */
670 static
671 inline
672 const Uint8*
pad(const Uint8 * src,Uint32 align,Uint32 bitPos)673 pad(const Uint8* src, Uint32 align, Uint32 bitPos)
674 {
675   UintPtr ptr = UintPtr(src);
676   switch(align){
677   case DictTabInfo::aBit:
678   case DictTabInfo::a32Bit:
679   case DictTabInfo::a64Bit:
680   case DictTabInfo::a128Bit:
681     return (Uint8*)(((ptr + 3) & ~(UintPtr)3) + 4 * ((bitPos + 31) >> 5));
682 
683   default:
684 #ifdef VM_TRACE
685     abort();
686 #endif
687     //Fall through:
688 
689   case DictTabInfo::an8Bit:
690   case DictTabInfo::a16Bit:
691     return src + 4 * ((bitPos + 31) >> 5);
692   }
693 }
694 
695 /**
696  * handle_packed_bit
697  * This function copies the bitfield of length len, offset pos from
698  * word-aligned ptr _src to memory starting at the byte ptr dst.
699  */
700 static
701 void
handle_packed_bit(const char * _src,Uint32 pos,Uint32 len,char * _dst)702 handle_packed_bit(const char* _src, Uint32 pos, Uint32 len, char* _dst)
703 {
704   Uint32 * src = (Uint32*)_src;
705   assert((UintPtr(src) & 3) == 0);
706 
707   /* Convert char* to aligned Uint32* and some byte offset */
708   UintPtr uiPtr= UintPtr((Uint32*)_dst);
709   Uint32 dstByteOffset= Uint32(uiPtr) & 3;
710   Uint32* dst= (Uint32*) (uiPtr - dstByteOffset);
711 
712   BitmaskImpl::copyField(dst, dstByteOffset << 3,
713                          src, pos, len);
714 }
715 
716 
717 /**
718  * unpackRecAttr
719  * Unpack a packed stream of field values, whose presence and nullness
720  * is indicated by a leading bitmap into a list of NdbRecAttr objects
721  * Return the number of words read from the input stream.
722  */
723 //static
724 Uint32
unpackRecAttr(NdbRecAttr ** recAttr,Uint32 bmlen,const Uint32 * aDataPtr,Uint32 aLength)725 NdbReceiver::unpackRecAttr(NdbRecAttr** recAttr,
726                            Uint32 bmlen,
727                            const Uint32* aDataPtr,
728                            Uint32 aLength)
729 {
730   NdbRecAttr* currRecAttr = *recAttr;
731   const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
732   Uint32 bitPos = 0;
733   for (Uint32 i = 0, attrId = 0; i<32*bmlen; i++, attrId++)
734   {
735     if (BitmaskImpl::get(bmlen, aDataPtr, i))
736     {
737       const NdbColumnImpl & col =
738 	NdbColumnImpl::getImpl(* currRecAttr->getColumn());
739       if (unlikely(attrId != (Uint32)col.m_attrId))
740         goto err;
741       if (col.m_nullable)
742       {
743 	if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
744 	{
745 	  currRecAttr->setNULL();
746 	  currRecAttr = currRecAttr->next();
747 	  continue;
748 	}
749       }
750       Uint32 align = col.m_orgAttrSize;
751       Uint32 attrSize = col.m_attrSize;
752       Uint32 array = col.m_arraySize;
753       Uint32 len = col.m_length;
754       Uint32 sz = attrSize * array;
755       Uint32 arrayType = col.m_arrayType;
756 
757       switch(align){
758       case DictTabInfo::aBit: // Bit
759         src = pad(src, 0, 0);
760 	handle_packed_bit((const char*)src, bitPos, len,
761                           currRecAttr->aRef());
762 	src += 4 * ((bitPos + len) >> 5);
763 	bitPos = (bitPos + len) & 31;
764         goto next;
765       default:
766         src = pad(src, align, bitPos);
767       }
768       switch(arrayType){
769       case NDB_ARRAYTYPE_FIXED:
770         break;
771       case NDB_ARRAYTYPE_SHORT_VAR:
772         sz = 1 + src[0];
773         break;
774       case NDB_ARRAYTYPE_MEDIUM_VAR:
775 	sz = 2 + src[0] + 256 * src[1];
776         break;
777       default:
778         goto err;
779       }
780 
781       bitPos = 0;
782       currRecAttr->receive_data((Uint32*)src, sz);
783       src += sz;
784   next:
785       currRecAttr = currRecAttr->next();
786     }
787   }
788   * recAttr = currRecAttr;
789   return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
790 
791 err:
792   abort();
793   return 0;
794 }
795 
796 
797 /* Set NdbRecord field to NULL. */
setRecToNULL(const NdbRecord::Attr * col,char * row)798 static void setRecToNULL(const NdbRecord::Attr *col,
799                          char *row)
800 {
801   assert(col->flags & NdbRecord::IsNullable);
802   row[col->nullbit_byte_offset]|= 1 << col->nullbit_bit_in_byte;
803 }
804 
805 int
get_range_no() const806 NdbReceiver::get_range_no() const
807 {
808   Uint32 range_no;
809   assert(m_ndb_record != NULL);
810   assert(m_row_buffer != NULL);
811 
812   if (unlikely(!m_read_range_no))
813     return -1;
814 
815   memcpy(&range_no,
816          m_row_buffer + m_ndb_record->m_row_size,
817          sizeof(range_no));
818   return (int)range_no;
819 }
820 
821 /**
822  * handle_bitfield_ndbrecord
823  * Packed bitfield handling for NdbRecord - also deals with
824  * mapping the bitfields into MySQLD format if necessary.
825  */
826 ATTRIBUTE_NOINLINE
827 static void
handle_bitfield_ndbrecord(const NdbRecord::Attr * col,const Uint8 * & src,Uint32 & bitPos,char * row)828 handle_bitfield_ndbrecord(const NdbRecord::Attr* col,
829                           const Uint8*& src,
830                           Uint32& bitPos,
831                           char* row)
832 {
833   Uint32 len = col->bitCount;
834   if (col->flags & NdbRecord::IsNullable)
835   {
836     /* Clear nullbit in row */
837     row[col->nullbit_byte_offset] &=
838       ~(1 << col->nullbit_bit_in_byte);
839   }
840 
841   char* dest;
842   Uint64 mysqldSpace;
843 
844   /* For MySqldBitField, we read it as normal into a local on the
845    * stack and then use the put_mysqld_bitfield function to rearrange
846    * and write it to the row
847    */
848   bool isMDBitfield= (col->flags & NdbRecord::IsMysqldBitfield) != 0;
849 
850   if (isMDBitfield)
851   {
852     assert(len <= 64);
853     dest= (char*) &mysqldSpace;
854   }
855   else
856   {
857     dest= row + col->offset;
858   }
859 
860   /* Copy bitfield to memory starting at dest */
861   src = pad(src, 0, 0);
862   handle_packed_bit((const char*)src, bitPos, len, dest);
863   src += 4 * ((bitPos + len) >> 5);
864   bitPos = (bitPos + len) & 31;
865 
866   if (isMDBitfield)
867   {
868     /* Rearrange bitfield from stack to row storage */
869     col->put_mysqld_bitfield(row, dest);
870   }
871 }
872 
873 
874 /**
875  * unpackNdbRecord
876  * Unpack a stream of field values, whose presence and nullness
877  * is indicated by a leading bitmap, into an NdbRecord row.
878  * Return the number of words consumed.
879  */
880 //static
881 Uint32
unpackNdbRecord(const NdbRecord * rec,Uint32 bmlen,const Uint32 * aDataPtr,char * row)882 NdbReceiver::unpackNdbRecord(const NdbRecord *rec,
883                              Uint32 bmlen,
884                              const Uint32* aDataPtr,
885                              char* row)
886 {
887   /* Use bitmap to determine which columns have been sent */
888   /*
889     We save precious registers for the compiler by putting
890     three values in one i_attrId variable:
891     bit_index : Bit 0-15
892     attrId    : Bit 16-31
893     maxAttrId : Bit 32-47
894 
895     We use the same principle to store 3 variables in
896     bitPos_next_index variable:
897     next_index : Bit 0-15
898     bitPos     : Bit 48-52
899     rpm_bmlen  : Bit 32-47
900     0's        : Bit 16-31
901 
902     So we can also get bmSize by shifting 27 instead of 32 which
903     is equivalent to shift right 32 followed by shift left 5 when
904     one knows there are zeroes in the  lower bits.
905 
906     The compiler has to have quite a significant amount of live
907     variables in parallel here, so by the above handling we increase
908     the access time of these registers by 1-2 cycles, but this is
909     better than using the stack that has high chances of cache
910     misses.
911 
912     This routine can easily be executed millions of times per
913     second in one CPU, it's called once for each record retrieved
914     from NDB data nodes in scans.
915   */
916 #define rpn_pack_attrId(bit_index, attrId, maxAttrId) \
917   Uint64((bit_index)) + \
918     (Uint64((attrId)) << 16) + \
919     (Uint64((maxAttrId)) << 32)
920 #define rpn_bit_index(index_attrId) ((index_attrId) & 0xFFFF)
921 #define rpn_attrId(index_attrId) (((index_attrId) >> 16) & 0xFFFF)
922 #define rpn_maxAttrId(index_attrId) ((index_attrId) >> 32)
923 #define rpn_inc_bit_index() Uint64(1)
924 #define rpn_inc_attrId() (Uint64(1) << 16)
925 
926 #define rpn_pack_bitPos_next_index(bitPos, bmlen, next_index) \
927   Uint64((next_index) & 0xFFFF) + \
928     (Uint64((bmlen)) << 32) + \
929     (Uint64((bitPos)) << 48)
930 #define rpn_bmSize(bm_index) (((bm_index) >> 27) & 0xFFFF)
931 #define rpn_bmlen(bm_index) (((bm_index) >> 32) & 0xFFFF)
932 #define rpn_bitPos(bm_index) (((bm_index) >> 48) & 0x1F)
933 #define rpn_next_index(bm_index) ((bm_index) & 0xFFFF)
934 #define rpn_zero_bitPos(bm_index) \
935 { \
936   register Uint64 tmp_bitPos_next_index = bm_index; \
937   tmp_bitPos_next_index <<= 16; \
938   tmp_bitPos_next_index >>= 16; \
939   bm_index = tmp_bitPos_next_index; \
940 }
941 #define rpn_set_bitPos(bm_index, bitPos) \
942 { \
943   register Uint64 tmp_bitPos_next_index = bm_index; \
944   tmp_bitPos_next_index <<= 16; \
945   tmp_bitPos_next_index >>= 16; \
946   tmp_bitPos_next_index += (Uint64(bitPos) << 48); \
947   bm_index = tmp_bitPos_next_index; \
948 }
949 #define rpn_set_next_index(bm_index, val_next_index) \
950 { \
951   register Uint64 tmp_2_bitPos_next_index = Uint64(val_next_index); \
952   register Uint64 tmp_1_bitPos_next_index = bm_index; \
953   tmp_1_bitPos_next_index >>= 16; \
954   tmp_1_bitPos_next_index <<= 16; \
955   tmp_2_bitPos_next_index &= 0xFFFF; \
956   tmp_1_bitPos_next_index += tmp_2_bitPos_next_index; \
957   bm_index = tmp_1_bitPos_next_index; \
958 }
959 
960 /**
961   * Both these macros can be called with an overflow value
962   * in val_next_index, to protect against this we ensure it
963   * doesn't overflow its 16 bits of space and affect other
964   * variables.
965   */
966 
967   assert(bmlen <= 0x07FF);
968   register const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
969   Uint32 noOfCols = rec->noOfColumns;
970   const NdbRecord::Attr* max_col = &rec->columns[noOfCols - 1];
971 
972   const Uint64 maxAttrId = max_col->attrId;
973   assert(maxAttrId <= 0xFFFF);
974 
975   /**
976    * Initialise the 3 fields stored in bitPos_next_index
977    *
978    * bitPos set to 0
979    * next_index set to rec->m_attrId_indexes[0]
980    * bmlen initialised
981    * bmSize is always bmlen / 32
982    */
983   register Uint64 bitPos_next_index =
984     rpn_pack_bitPos_next_index(0, bmlen, rec->m_attrId_indexes[0]);
985 
986   /**
987    * Initialise the 3 fields stored in i_attrId
988    *
989    * bit_index set to 0
990    * attrId set to 0
991    * maxAttrId initialised
992    */
993   for (register Uint64 i_attrId = rpn_pack_attrId(0, 0, maxAttrId) ;
994        (rpn_bit_index(i_attrId) < rpn_bmSize(bitPos_next_index)) &&
995         (rpn_attrId(i_attrId) <= rpn_maxAttrId(i_attrId));
996         i_attrId += (rpn_inc_attrId() + rpn_inc_bit_index()))
997   {
998     const NdbRecord::Attr* col = &rec->columns[rpn_next_index(bitPos_next_index)];
999     if (BitmaskImpl::get(rpn_bmlen(bitPos_next_index),
1000                                    aDataPtr,
1001                                    rpn_bit_index(i_attrId)))
1002     {
1003       /* Found bit in column presence bitmask, get corresponding
1004        * Attr struct from NdbRecord
1005        */
1006       Uint32 align = col->orgAttrSize;
1007 
1008       assert(rpn_attrId(i_attrId) < rec->m_attrId_indexes_length);
1009       assert (rpn_next_index(bitPos_next_index) < rec->noOfColumns);
1010       assert((col->flags & NdbRecord::IsBlob) == 0);
1011 
1012       /* If col is nullable, check for null and set bit */
1013       if (col->flags & NdbRecord::IsNullable)
1014       {
1015         i_attrId += rpn_inc_bit_index();
1016         if (BitmaskImpl::get(rpn_bmlen(bitPos_next_index),
1017                              aDataPtr,
1018                              rpn_bit_index(i_attrId)))
1019         {
1020           setRecToNULL(col, row);
1021           assert(rpn_bitPos(bitPos_next_index) < 32);
1022           rpn_set_next_index(bitPos_next_index,
1023                              rec->m_attrId_indexes[rpn_attrId(i_attrId) + 1]);
1024           continue; /* Next column */
1025         }
1026       }
1027       if (likely(align != DictTabInfo::aBit))
1028       {
1029         src = pad(src, align, rpn_bitPos(bitPos_next_index));
1030         rpn_zero_bitPos(bitPos_next_index);
1031       }
1032       else
1033       {
1034         Uint32 bitPos = rpn_bitPos(bitPos_next_index);
1035         const Uint8 *loc_src = src;
1036         handle_bitfield_ndbrecord(col,
1037                                   loc_src,
1038                                   bitPos,
1039                                   row);
1040         rpn_set_bitPos(bitPos_next_index, bitPos);
1041         src = loc_src;
1042         assert(rpn_bitPos(bitPos_next_index) < 32);
1043         rpn_set_next_index(bitPos_next_index,
1044                            rec->m_attrId_indexes[rpn_attrId(i_attrId) + 1]);
1045         continue; /* Next column */
1046       }
1047 
1048       {
1049         /* Set NULLable attribute to "not NULL". */
1050         if (col->flags & NdbRecord::IsNullable)
1051         {
1052           row[col->nullbit_byte_offset]&= ~(1 << col->nullbit_bit_in_byte);
1053         }
1054 
1055         do
1056         {
1057           Uint32 sz;
1058           char *col_row_ptr = &row[col->offset];
1059           Uint32 flags = col->flags &
1060                          (NdbRecord::IsVar1ByteLen |
1061                           NdbRecord::IsVar2ByteLen);
1062           if (!flags)
1063           {
1064             sz = col->maxSize;
1065             if (likely(sz == 4))
1066             {
1067               col_row_ptr[0] = src[0];
1068               col_row_ptr[1] = src[1];
1069               col_row_ptr[2] = src[2];
1070               col_row_ptr[3] = src[3];
1071               src += sz;
1072               break;
1073             }
1074           }
1075           else if (flags & NdbRecord::IsVar1ByteLen)
1076           {
1077             sz = 1 + src[0];
1078           }
1079           else
1080           {
1081             sz = 2 + src[0] + 256 * src[1];
1082           }
1083           const Uint8 *source = src;
1084           src += sz;
1085           memcpy(col_row_ptr, source, sz);
1086         } while (0);
1087       }
1088     }
1089     rpn_set_next_index(bitPos_next_index,
1090                        rec->m_attrId_indexes[rpn_attrId(i_attrId) + 1]);
1091   }
1092   Uint32 len = (Uint32)(((Uint32*)pad(src,
1093                                       0,
1094                                       rpn_bitPos(bitPos_next_index))) -
1095                                         aDataPtr);
1096   return len;
1097 }
1098 
1099 int
get_keyinfo20(Uint32 & scaninfo,Uint32 & length,const char * & data_ptr) const1100 NdbReceiver::get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
1101                            const char * & data_ptr) const
1102 {
1103   if (unlikely(!m_read_key_info))
1104     return -1;
1105 
1106   Uint32 len;
1107   const Uint32 *p = m_recv_buffer->getKey(m_current_row, len);
1108   if (unlikely(p == NULL))
1109     return -1;
1110 
1111   scaninfo = *p;
1112   data_ptr = reinterpret_cast<const char*>(p+1);
1113   length = len-1;
1114   return 0;
1115 }
1116 
1117 const char*
unpackBuffer(const NdbReceiverBuffer * buffer,Uint32 row)1118 NdbReceiver::unpackBuffer(const NdbReceiverBuffer *buffer, Uint32 row)
1119 {
1120   assert(buffer != NULL);
1121 
1122   Uint32 aLength;
1123   const Uint32 *aDataPtr = buffer->getRow(row, aLength);
1124   if (likely(aDataPtr != NULL))
1125   {
1126     if (unpackRow(aDataPtr, aLength, m_row_buffer) == -1)
1127       return NULL;
1128 
1129     return m_row_buffer;
1130   }
1131 
1132   /* ReceiveBuffer may containt only keyinfo */
1133   const Uint32 *key = buffer->getKey(row, aLength);
1134   if (key != NULL)
1135   {
1136     assert(m_row_buffer != NULL);
1137     return m_row_buffer; // Row is empty, used as non-NULL return
1138   }
1139   return NULL;
1140 }
1141 
1142 int
unpackRow(const Uint32 * aDataPtr,Uint32 aLength,char * row)1143 NdbReceiver::unpackRow(const Uint32* aDataPtr, Uint32 aLength, char* row)
1144 {
1145   /*
1146    * NdbRecord and NdbRecAttr row result handling are merged here
1147    *   First any NdbRecord attributes are extracted
1148    *   Then any NdbRecAttr attributes are extracted
1149    * Scenarios :
1150    *   NdbRecord only PK read result
1151    *   NdbRecAttr only PK read result
1152    *   Mixed PK read results
1153    *   NdbRecord only scan read result
1154    *   NdbRecAttr only scan read result
1155    *   Mixed scan read results
1156    */
1157 
1158   /* If present, NdbRecord data will come first */
1159   if (m_ndb_record != NULL)
1160   {
1161     /* Read words from the incoming signal train.
1162      * The length passed in is enough for one row, either as an individual
1163      * read op, or part of a scan.  When there are no more words, we're at
1164      * the end of the row
1165      */
1166     while (aLength > 0)
1167     {
1168       const AttributeHeader ah(* aDataPtr++);
1169       const Uint32 attrId= ah.getAttributeId();
1170       const Uint32 attrSize= ah.getByteSize();
1171       aLength--;
1172       assert(aLength >= (attrSize/sizeof(Uint32)));
1173 
1174       /* Normal case for all NdbRecord primary key, index key, table scan
1175        * and index scan reads. Extract all requested columns from packed
1176        * format into the row.
1177        */
1178       if (likely(attrId == AttributeHeader::READ_PACKED))
1179       {
1180         assert(row != NULL);
1181         const Uint32 len= unpackNdbRecord(m_ndb_record,
1182                                           attrSize >> 2, // Bitmap length
1183                                           aDataPtr,
1184                                           row);
1185         assert(aLength >= len);
1186         aDataPtr+= len;
1187         aLength-= len;
1188       }
1189 
1190       /* Special case for RANGE_NO, which is received first and is
1191        * stored just after the row. */
1192       else if (attrId == AttributeHeader::RANGE_NO)
1193       {
1194         assert(row != NULL);
1195         assert(m_read_range_no);
1196         assert(attrSize==sizeof(Uint32));
1197         memcpy(row+m_ndb_record->m_row_size, aDataPtr++, sizeof(Uint32));
1198         aLength--;
1199       }
1200 
1201       else
1202       {
1203         /* If we get here then we must have 'extra getValues' - columns
1204          * requested outwith the normal NdbRecord + bitmask mechanism.
1205          * This could be : pseudo columns, columns read via an old-Api
1206          * scan, or just some extra columns added by the user to an
1207          * NdbRecord operation.
1208          */
1209         aDataPtr--;   // Undo read of AttributeHeader
1210         aLength++;
1211         break;
1212       }
1213     } // while (aLength > 0)
1214   } // if (m_ndb_record != NULL)
1215 
1216   /* Handle 'getValues', possible requested after NdbRecord columns. */
1217   if (aLength > 0)
1218   {
1219     /**
1220      * If we get here then there are some attribute values to be
1221      * read into the attached list of NdbRecAttrs.
1222      * This occurs for old-Api primary and unique index keyed operations
1223      * and for NdbRecord primary and unique index keyed operations
1224      * using 'extra GetValues'.
1225      *
1226      * If the values are part of a scan then we save
1227      * the starting point of these RecAttr values.
1228      * When the user calls NdbScanOperation.nextResult(), they will
1229      * be copied into the correct NdbRecAttr objects by calling
1230      * NdbRecord::get_AttrValues.
1231      * If the extra values are not part of a scan, then they are
1232      * put into their NdbRecAttr objects now.
1233      */
1234     const bool isScan= (m_type == NDB_SCANRECEIVER) ||
1235                        (m_type == NDB_QUERY_OPERATION);
1236 
1237     if (isScan)
1238     {
1239       /* Save position for RecAttr values for later retrieval. */
1240       m_rec_attr_data = aDataPtr;
1241       m_rec_attr_len = aLength;
1242       return 0;
1243     }
1244     else
1245     {
1246       /* Put values into RecAttr now */
1247       const int ret = handle_rec_attrs(m_firstRecAttr, aDataPtr, aLength);
1248       if (unlikely(ret != 0))
1249         return -1;
1250 
1251       aDataPtr += aLength;
1252       aLength  = 0;
1253     }
1254   } // if (aLength > 0)
1255 
1256   m_rec_attr_data = NULL;
1257   m_rec_attr_len = 0;
1258   return 0;
1259 }
1260 
1261 //static
1262 int
handle_rec_attrs(NdbRecAttr * rec_attr_list,const Uint32 * aDataPtr,Uint32 aLength)1263 NdbReceiver::handle_rec_attrs(NdbRecAttr* rec_attr_list,
1264                               const Uint32* aDataPtr,
1265                               Uint32 aLength)
1266 {
1267   NdbRecAttr* currRecAttr = rec_attr_list;
1268 
1269   /* If we get here then there are some attribute values to be
1270    * read into the attached list of NdbRecAttrs.
1271    * This occurs for old-Api primary and unique index keyed operations
1272    * and for NdbRecord primary and unique index keyed operations
1273    * using 'extra GetValues'.
1274    */
1275   while (aLength > 0)
1276   {
1277     const AttributeHeader ah(* aDataPtr++);
1278     const Uint32 attrId= ah.getAttributeId();
1279     const Uint32 attrSize= ah.getByteSize();
1280     aLength--;
1281     assert(aLength >= (attrSize/sizeof(Uint32)));
1282 
1283     {
1284       // We've processed the NdbRecord part of the TRANSID_AI, if
1285       // any.  There are signal words left, so they must be
1286       // RecAttr data
1287       //
1288       if (attrId == AttributeHeader::READ_PACKED)
1289       {
1290         const Uint32 len = unpackRecAttr(&currRecAttr,
1291                                          attrSize>>2, aDataPtr, aLength);
1292         assert(aLength >= len);
1293         aDataPtr += len;
1294         aLength -= len;
1295         continue;
1296       }
1297 
1298       if(currRecAttr &&
1299          currRecAttr->attrId() == attrId &&
1300          currRecAttr->receive_data(aDataPtr, attrSize))
1301       {
1302         Uint32 add= (attrSize + 3) >> 2;
1303         aLength -= add;
1304         aDataPtr += add;
1305         currRecAttr = currRecAttr->next();
1306       } else {
1307         /*
1308           This should not happen: we got back an attribute for which we have no
1309           stored NdbRecAttr recording that we requested said attribute (or we got
1310           back attributes in the wrong order).
1311           So dump some info for debugging, and abort.
1312         */
1313         ndbout_c("NdbReceiver::handle_rec_attrs: attrId: %d currRecAttr: %p rec_attr_list: %p "
1314                  "attrSize: %d %d",
1315 	         attrId, currRecAttr, rec_attr_list, attrSize,
1316                  currRecAttr ? currRecAttr->get_size_in_bytes() : 0);
1317         currRecAttr = rec_attr_list;
1318         while(currRecAttr != 0){
1319 	  ndbout_c("%d ", currRecAttr->attrId());
1320 	  currRecAttr = currRecAttr->next();
1321         }
1322         abort();
1323         return -1;
1324       } // if (currRecAttr...)
1325     }
1326   } // while (aLength > 0)
1327 
1328   return 0;
1329 }
1330 
1331 int
get_AttrValues(NdbRecAttr * rec_attr_list) const1332 NdbReceiver::get_AttrValues(NdbRecAttr* rec_attr_list) const
1333 {
1334   return handle_rec_attrs(rec_attr_list,
1335                           m_rec_attr_data,
1336                           m_rec_attr_len);
1337 }
1338 
1339 int
execTRANSID_AI(const Uint32 * aDataPtr,Uint32 aLength)1340 NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
1341 {
1342   const Uint32 exp= m_expected_result_length;
1343   const Uint32 tmp= m_received_result_length + aLength;
1344 
1345   /*
1346    * Store received data unprocessed into receive buffer
1347    * in its packed format.
1348    * It is unpacked into NdbRecord format when
1349    * we navigate to each row.
1350    */
1351   if (m_recv_buffer != NULL)
1352   {
1353     Uint32 *row_recv = m_recv_buffer->allocRow(aLength);
1354     memcpy(row_recv, aDataPtr, aLength*sizeof(Uint32));
1355   }
1356   else
1357   {
1358     if (unpackRow(aDataPtr, aLength, m_row_buffer) == -1)
1359       return -1;
1360   }
1361   m_received_result_length = tmp;
1362   return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
1363 }
1364 
1365 int
execKEYINFO20(Uint32 info,const Uint32 * aDataPtr,Uint32 aLength)1366 NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
1367 {
1368   assert(m_read_key_info);
1369   assert(m_recv_buffer != NULL);
1370 
1371   Uint32 *keyinfo_ptr = m_recv_buffer->allocKey(aLength+1);
1372 
1373   // Copy in key 'info', followed by 'data'
1374   *keyinfo_ptr= info;
1375   memcpy(keyinfo_ptr+1, aDataPtr, 4*aLength);
1376 
1377   const Uint32 tmp= m_received_result_length + aLength;
1378   m_received_result_length = tmp;
1379 
1380   return (tmp == m_expected_result_length ? 1 : 0);
1381 }
1382 
1383 const char*
getRow(const NdbReceiverBuffer * buffer,Uint32 row)1384 NdbReceiver::getRow(const NdbReceiverBuffer* buffer, Uint32 row)
1385 {
1386   return unpackBuffer(buffer, row);
1387 }
1388 
1389 const char*
getNextRow()1390 NdbReceiver::getNextRow()
1391 {
1392   assert(m_recv_buffer != NULL);
1393   const Uint32 nextRow =  m_current_row+1;
1394   const char *row = unpackBuffer(m_recv_buffer, nextRow);
1395   if (likely(row != NULL))
1396   {
1397     m_current_row = nextRow;
1398   }
1399   return row;
1400 }
1401 
1402 int
execSCANOPCONF(Uint32 tcPtrI,Uint32 len,Uint32 rows)1403 NdbReceiver::execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows)
1404 {
1405   assert(m_recv_buffer != NULL);
1406   assert(m_recv_buffer->getMaxRows() >= rows);
1407   assert(m_recv_buffer->getBufSizeWords() >= len);
1408 
1409   m_tcPtrI = tcPtrI;
1410 
1411   if (unlikely(len == 0))
1412   {
1413     /**
1414      * No TRANSID_AI will be received. (Likely an empty projection requested.)
1415      * To get row count correct, we simulate specified number of
1416      * empty TRANSID_AIs being received.
1417      */
1418     for (Uint32 row=0; row<rows; row++)
1419     {
1420       execTRANSID_AI(NULL,0);
1421     }
1422   }
1423 
1424   const Uint32 tmp = m_received_result_length;
1425   m_expected_result_length = len;
1426   return (tmp == len ? 1 : 0);
1427 }
1428 
1429 void
setErrorCode(int code)1430 NdbReceiver::setErrorCode(int code)
1431 {
1432   theMagicNumber = 0;
1433   if (getType()==NDB_QUERY_OPERATION)
1434   {
1435     NdbQueryOperationImpl* op = (NdbQueryOperationImpl*)getOwner();
1436     op->getQuery().setErrorCode(code);
1437   }
1438   else
1439   {
1440     NdbOperation* const op = (NdbOperation*)getOwner();
1441     assert(op->checkMagicNumber()==0);
1442     op->setErrorCode(code);
1443   }
1444 }
1445