1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "API.hpp"
26 #include <AttributeHeader.hpp>
27 #include <signaldata/TcKeyConf.hpp>
28 #include <signaldata/DictTabInfo.hpp>
29 
NdbReceiver(Ndb * aNdb)30 NdbReceiver::NdbReceiver(Ndb *aNdb) :
31   theMagicNumber(0),
32   m_ndb(aNdb),
33   m_id(NdbObjectIdMap::InvalidId),
34   m_tcPtrI(RNIL),
35   m_type(NDB_UNINITIALIZED),
36   m_owner(0),
37   m_using_ndb_record(false),
38   theFirstRecAttr(NULL),
39   theCurrentRecAttr(NULL),
40   m_rows(NULL),
41   m_current_row(0xffffffff),
42   m_result_rows(0)
43 {}
44 
~NdbReceiver()45 NdbReceiver::~NdbReceiver()
46 {
47   DBUG_ENTER("NdbReceiver::~NdbReceiver");
48   if (m_id != NdbObjectIdMap::InvalidId) {
49     m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
50   }
51   delete[] m_rows;
52   DBUG_VOID_RETURN;
53 }
54 
55 int
init(ReceiverType type,bool useRec,void * owner)56 NdbReceiver::init(ReceiverType type, bool useRec, void* owner)
57 {
58   theMagicNumber = 0x11223344;
59   m_type = type;
60   m_using_ndb_record= useRec;
61   m_owner = owner;
62 
63   if (useRec)
64   {
65     m_record.m_ndb_record= NULL;
66     m_record.m_row_recv= NULL;
67     m_record.m_row_buffer= NULL;
68     m_record.m_row_offset= 0;
69     m_record.m_read_range_no= false;
70   }
71   theFirstRecAttr = NULL;
72   theCurrentRecAttr = NULL;
73 
74   if (m_id == NdbObjectIdMap::InvalidId) {
75     if (m_ndb)
76     {
77       m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
78       if (m_id == NdbObjectIdMap::InvalidId)
79       {
80         setErrorCode(4000);
81         return -1;
82       }
83     }
84   }
85 
86   return 0;
87 }
88 
89 void
release()90 NdbReceiver::release(){
91   theMagicNumber = 0;
92   NdbRecAttr* tRecAttr = theFirstRecAttr;
93   while (tRecAttr != NULL)
94   {
95     NdbRecAttr* tSaveRecAttr = tRecAttr;
96     tRecAttr = tRecAttr->next();
97     m_ndb->releaseRecAttr(tSaveRecAttr);
98   }
99   m_using_ndb_record= false;
100   theFirstRecAttr = NULL;
101   theCurrentRecAttr = NULL;
102 }
103 
104 NdbRecAttr *
getValue(const NdbColumnImpl * tAttrInfo,char * user_dst_ptr)105 NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
106   NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
107   if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
108     if (theFirstRecAttr == NULL)
109       theFirstRecAttr = tRecAttr;
110     else
111       theCurrentRecAttr->next(tRecAttr);
112     theCurrentRecAttr = tRecAttr;
113     tRecAttr->next(NULL);
114     return tRecAttr;
115   }
116   if(tRecAttr){
117     m_ndb->releaseRecAttr(tRecAttr);
118   }
119   return 0;
120 }
121 
122 void
getValues(const NdbRecord * rec,char * row_ptr)123 NdbReceiver::getValues(const NdbRecord* rec, char *row_ptr)
124 {
125   assert(m_using_ndb_record);
126 
127   m_record.m_ndb_record= rec;
128   m_record.m_row_recv= row_ptr;
129   m_record.m_row_offset= rec->m_row_size;
130 }
131 
132 void
prepareReceive(char * buf)133 NdbReceiver::prepareReceive(char *buf)
134 {
135   /* Set pointers etc. to prepare for receiving the first row of the batch. */
136   assert(theMagicNumber == 0x11223344);
137   m_received_result_length = 0;
138   m_expected_result_length = 0;
139   if (m_using_ndb_record)
140   {
141     m_record.m_row_recv= buf;
142   }
143   theCurrentRecAttr = theFirstRecAttr;
144 }
145 
146 void
prepareRead(char * buf,Uint32 rows)147 NdbReceiver::prepareRead(char *buf, Uint32 rows)
148 {
149   /* Set pointers etc. to prepare for reading the first row of the batch. */
150   assert(theMagicNumber == 0x11223344);
151   m_current_row = 0;
152   m_result_rows = rows;
153   if (m_using_ndb_record)
154   {
155     m_record.m_row_buffer = buf;
156   }
157 }
158 
159  #define KEY_ATTR_ID (~(Uint32)0)
160 
161 /*
162   Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
163   use, taking into account limits in the transporter, user preference, etc.
164 
165   Hm, there are some magic overhead numbers (4 bytes/attr, 32 bytes/row) here,
166   would be nice with some explanation on how these numbers were derived.
167 
168   TODO : Check whether these numbers need to be revised w.r.t. read packed
169 */
170 //static
171 void
calculate_batch_size(const NdbImpl & theImpl,const NdbRecord * record,const NdbRecAttr * first_rec_attr,Uint32 key_size,Uint32 parallelism,Uint32 & batch_size,Uint32 & batch_byte_size,Uint32 & first_batch_size)172 NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
173                                   const NdbRecord *record,
174                                   const NdbRecAttr *first_rec_attr,
175                                   Uint32 key_size,
176                                   Uint32 parallelism,
177                                   Uint32& batch_size,
178                                   Uint32& batch_byte_size,
179                                   Uint32& first_batch_size)
180 {
181   const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
182   const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
183   const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
184   const Uint32 max_batch_size= cfg.m_batch_size;
185 
186   Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
187   if (record)
188   {
189     tot_size+= record->m_max_transid_ai_bytes;
190   }
191 
192   const NdbRecAttr *rec_attr= first_rec_attr;
193   while (rec_attr != NULL) {
194     Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
195     attr_size= ((attr_size + 4 + 3) >> 2) << 2; //Even to word + overhead
196     tot_size+= attr_size;
197     rec_attr= rec_attr->next();
198   }
199 
200   tot_size+= 32; //include signal overhead
201 
202   /**
203    * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
204    * bytes sent for each batch from each node. We do however ensure that
205    * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
206    * batch.
207    */
208   if (batch_size == 0)
209   {
210     batch_byte_size= max_batch_byte_size;
211   }
212   else
213   {
214     batch_byte_size= batch_size * tot_size;
215   }
216 
217   if (batch_byte_size * parallelism > max_scan_batch_size) {
218     batch_byte_size= max_scan_batch_size / parallelism;
219   }
220   batch_size= batch_byte_size / tot_size;
221   if (batch_size == 0) {
222     batch_size= 1;
223   } else {
224     if (batch_size > max_batch_size) {
225       batch_size= max_batch_size;
226     } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
227       batch_size= MAX_PARALLEL_OP_PER_SCAN;
228     }
229   }
230   first_batch_size= batch_size;
231   return;
232 }
233 
234 void
calculate_batch_size(Uint32 key_size,Uint32 parallelism,Uint32 & batch_size,Uint32 & batch_byte_size,Uint32 & first_batch_size,const NdbRecord * record) const235 NdbReceiver::calculate_batch_size(Uint32 key_size,
236                                   Uint32 parallelism,
237                                   Uint32& batch_size,
238                                   Uint32& batch_byte_size,
239                                   Uint32& first_batch_size,
240                                   const NdbRecord *record) const
241 {
242   calculate_batch_size(* m_ndb->theImpl,
243                        record,
244                        theFirstRecAttr,
245                        key_size, parallelism, batch_size, batch_byte_size,
246                        first_batch_size);
247 }
248 
249 void
do_setup_ndbrecord(const NdbRecord * ndb_record,Uint32 batch_size,Uint32 key_size,Uint32 read_range_no,Uint32 rowsize,char * row_buffer)250 NdbReceiver::do_setup_ndbrecord(const NdbRecord *ndb_record, Uint32 batch_size,
251                                 Uint32 key_size, Uint32 read_range_no,
252                                 Uint32 rowsize, char *row_buffer)
253 {
254   m_using_ndb_record= true;
255   m_record.m_ndb_record= ndb_record;
256   m_record.m_row_recv= row_buffer;
257   m_record.m_row_buffer= row_buffer;
258   m_record.m_row_offset= rowsize;
259   m_record.m_read_range_no= read_range_no;
260 }
261 
262 //static
263 Uint32
ndbrecord_rowsize(const NdbRecord * ndb_record,const NdbRecAttr * first_rec_attr,Uint32 key_size,bool read_range_no)264 NdbReceiver::ndbrecord_rowsize(const NdbRecord *ndb_record,
265                                const NdbRecAttr *first_rec_attr,
266                                Uint32 key_size,
267                                bool read_range_no)
268 {
269   Uint32 rowsize= (ndb_record) ? ndb_record->m_row_size : 0;
270 
271   /* Room for range_no. */
272   if (read_range_no)
273     rowsize+= 4;
274   /*
275     If keyinfo, need room for max. key + 4 bytes of actual key length + 4
276     bytes of scan info (all from KEYINFO20 signal).
277   */
278   if (key_size)
279     rowsize+= 8 + key_size*4;
280   /*
281     Compute extra space needed to buffer getValue() results in NdbRecord
282     scans.
283   */
284   const NdbRecAttr *ra= first_rec_attr;
285   while (ra != NULL)
286   {
287     rowsize+= sizeof(Uint32) + ra->getColumn()->getSizeInBytes();
288     ra= ra->next();
289   }
290   /* Ensure 4-byte alignment. */
291   rowsize= (rowsize+3) & 0xfffffffc;
292   return rowsize;
293 }
294 
295 /**
296  * pad
297  * This function determines how much 'padding' should be applied
298  * to the passed in pointer and bitPos to get to the start of a
299  * field with the passed in alignment.
300  * The rules are :
301  *   - First bit field is 32-bit aligned
302  *   - Subsequent bit fields are packed in the next available bits
303  *   - 8 and 16 bit aligned fields are packed in the next available
304  *     word (but not necessarily word aligned.
305  *   - 32, 64 and 128 bit aligned fields are packed in the next
306  *     aligned 32-bit word.
307  * This algorithm is used to unpack a stream of fields packed by the code
308  * in src/kernel/blocks/dbtup/DbtupRoutines::read_packed()
309  */
310 static
311 inline
312 const Uint8*
pad(const Uint8 * src,Uint32 align,Uint32 bitPos)313 pad(const Uint8* src, Uint32 align, Uint32 bitPos)
314 {
315   UintPtr ptr = UintPtr(src);
316   switch(align){
317   case DictTabInfo::aBit:
318   case DictTabInfo::a32Bit:
319   case DictTabInfo::a64Bit:
320   case DictTabInfo::a128Bit:
321     return (Uint8*)(((ptr + 3) & ~(UintPtr)3) + 4 * ((bitPos + 31) >> 5));
322 charpad:
323   case DictTabInfo::an8Bit:
324   case DictTabInfo::a16Bit:
325     return src + 4 * ((bitPos + 31) >> 5);
326   default:
327 #ifdef VM_TRACE
328     abort();
329 #endif
330     goto charpad;
331   }
332 }
333 
334 /**
335  * handle_packed_bit
336  * This function copies the bitfield of length len, offset pos from
337  * word-aligned ptr _src to memory starting at the byte ptr dst.
338  */
339 static
340 void
handle_packed_bit(const char * _src,Uint32 pos,Uint32 len,char * _dst)341 handle_packed_bit(const char* _src, Uint32 pos, Uint32 len, char* _dst)
342 {
343   Uint32 * src = (Uint32*)_src;
344   assert((UintPtr(src) & 3) == 0);
345 
346   /* Convert char* to aligned Uint32* and some byte offset */
347   UintPtr uiPtr= UintPtr((Uint32*)_dst);
348   Uint32 dstByteOffset= Uint32(uiPtr) & 3;
349   Uint32* dst= (Uint32*) (uiPtr - dstByteOffset);
350 
351   BitmaskImpl::copyField(dst, dstByteOffset << 3,
352                          src, pos, len);
353 }
354 
355 
356 /**
357  * receive_packed_recattr
358  * Receive a packed stream of field values, whose presence and nullness
359  * is indicated by a leading bitmap into a list of NdbRecAttr objects
360  * Return the number of words read from the input stream.
361  */
362 Uint32
receive_packed_recattr(NdbRecAttr ** recAttr,Uint32 bmlen,const Uint32 * aDataPtr,Uint32 aLength)363 NdbReceiver::receive_packed_recattr(NdbRecAttr** recAttr,
364                                     Uint32 bmlen,
365                                     const Uint32* aDataPtr,
366                                     Uint32 aLength)
367 {
368   NdbRecAttr* currRecAttr = *recAttr;
369   const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
370   Uint32 bitPos = 0;
371   for (Uint32 i = 0, attrId = 0; i<32*bmlen; i++, attrId++)
372   {
373     if (BitmaskImpl::get(bmlen, aDataPtr, i))
374     {
375       const NdbColumnImpl & col =
376 	NdbColumnImpl::getImpl(* currRecAttr->getColumn());
377       if (unlikely(attrId != (Uint32)col.m_attrId))
378         goto err;
379       if (col.m_nullable)
380       {
381 	if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
382 	{
383 	  currRecAttr->setNULL();
384 	  currRecAttr = currRecAttr->next();
385 	  continue;
386 	}
387       }
388       Uint32 align = col.m_orgAttrSize;
389       Uint32 attrSize = col.m_attrSize;
390       Uint32 array = col.m_arraySize;
391       Uint32 len = col.m_length;
392       Uint32 sz = attrSize * array;
393       Uint32 arrayType = col.m_arrayType;
394 
395       switch(align){
396       case DictTabInfo::aBit: // Bit
397         src = pad(src, 0, 0);
398 	handle_packed_bit((const char*)src, bitPos, len,
399                           currRecAttr->aRef());
400 	src += 4 * ((bitPos + len) >> 5);
401 	bitPos = (bitPos + len) & 31;
402         goto next;
403       default:
404         src = pad(src, align, bitPos);
405       }
406       switch(arrayType){
407       case NDB_ARRAYTYPE_FIXED:
408         break;
409       case NDB_ARRAYTYPE_SHORT_VAR:
410         sz = 1 + src[0];
411         break;
412       case NDB_ARRAYTYPE_MEDIUM_VAR:
413 	sz = 2 + src[0] + 256 * src[1];
414         break;
415       default:
416         goto err;
417       }
418 
419       bitPos = 0;
420       currRecAttr->receive_data((Uint32*)src, sz);
421       src += sz;
422   next:
423       currRecAttr = currRecAttr->next();
424     }
425   }
426   * recAttr = currRecAttr;
427   return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
428 
429 err:
430   abort();
431   return 0;
432 }
433 
434 
435 /* Set NdbRecord field to non-NULL value. */
assignToRec(const NdbRecord::Attr * col,char * row,const Uint8 * src,Uint32 byteSize)436 static void assignToRec(const NdbRecord::Attr *col,
437                         char *row,
438                         const Uint8 *src,
439                         Uint32 byteSize)
440 {
441   /* Set NULLable attribute to "not NULL". */
442   if (col->flags & NdbRecord::IsNullable)
443     row[col->nullbit_byte_offset]&= ~(1 << col->nullbit_bit_in_byte);
444 
445   memcpy(&row[col->offset], src, byteSize);
446 }
447 
448 /* Set NdbRecord field to NULL. */
setRecToNULL(const NdbRecord::Attr * col,char * row)449 static void setRecToNULL(const NdbRecord::Attr *col,
450                          char *row)
451 {
452   assert(col->flags & NdbRecord::IsNullable);
453   row[col->nullbit_byte_offset]|= 1 << col->nullbit_bit_in_byte;
454 }
455 
456 int
get_range_no() const457 NdbReceiver::get_range_no() const
458 {
459   int range_no;
460   assert(m_using_ndb_record);
461   Uint32 idx= m_current_row;
462   if (idx == 0 || !m_record.m_read_range_no)
463     return -1;
464   memcpy(&range_no,
465          m_record.m_row_buffer +
466            (idx-1)*m_record.m_row_offset +
467            m_record.m_ndb_record->m_row_size,
468          4);
469   return range_no;
470 }
471 
472 /**
473  * handle_bitfield_ndbrecord
474  * Packed bitfield handling for NdbRecord - also deals with
475  * mapping the bitfields into MySQLD format if necessary.
476  */
477 static void
handle_bitfield_ndbrecord(const NdbRecord::Attr * col,const Uint8 * & src,Uint32 & bitPos,Uint32 & len,char * row)478 handle_bitfield_ndbrecord(const NdbRecord::Attr* col,
479                           const Uint8*& src,
480                           Uint32& bitPos,
481                           Uint32& len,
482                           char* row)
483 {
484   if (col->flags & NdbRecord::IsNullable)
485   {
486     /* Clear nullbit in row */
487     row[col->nullbit_byte_offset] &=
488       ~(1 << col->nullbit_bit_in_byte);
489   }
490 
491   char* dest;
492   Uint64 mysqldSpace;
493 
494   /* For MySqldBitField, we read it as normal into a local on the
495    * stack and then use the put_mysqld_bitfield function to rearrange
496    * and write it to the row
497    */
498   bool isMDBitfield= (col->flags & NdbRecord::IsMysqldBitfield) != 0;
499 
500   if (isMDBitfield)
501   {
502     assert(len <= 64);
503     dest= (char*) &mysqldSpace;
504   }
505   else
506     dest= row + col->offset;
507 
508   /* Copy bitfield to memory starting at dest */
509   src = pad(src, 0, 0);
510   handle_packed_bit((const char*)src, bitPos, len, dest);
511   src += 4 * ((bitPos + len) >> 5);
512   bitPos = (bitPos + len) & 31;
513 
514   if (isMDBitfield)
515     /* Rearrange bitfield from stack to row storage */
516     col->put_mysqld_bitfield(row, dest);
517 }
518 
519 
520 /**
521  * receive_packed_ndbrecord
522  * Receive a packed stream of field values, whose presence and nullness
523  * is indicated by a leading bitmap, into an NdbRecord row.
524  * Return the number of words consumed from the input stream.
525  */
526 Uint32
receive_packed_ndbrecord(Uint32 bmlen,const Uint32 * aDataPtr,char * row)527 NdbReceiver::receive_packed_ndbrecord(Uint32 bmlen,
528                                       const Uint32* aDataPtr,
529                                       char* row)
530 {
531   const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
532   Uint32 bitPos = 0;
533   const NdbRecord* rec= m_record.m_ndb_record;
534   const Uint32 maxAttrId= rec->columns[rec->noOfColumns -1].attrId;
535   const Uint32 bmSize= bmlen << 5;
536 
537   /* Use bitmap to determine which columns have been sent */
538   for (Uint32 i = 0, attrId = 0;
539        (i < bmSize) && (attrId <= maxAttrId);
540        i++, attrId++)
541   {
542     if (BitmaskImpl::get(bmlen, aDataPtr, i))
543     {
544       /* Found bit in column presence bitmask, get corresponding
545        * Attr struct from NdbRecord
546        */
547       assert(attrId < rec->m_attrId_indexes_length);
548       assert((Uint32) rec->m_attrId_indexes[attrId]
549              < rec->noOfColumns);
550       const NdbRecord::Attr* col= &rec->columns[rec->m_attrId_indexes[attrId]];
551 
552       assert((col->flags & NdbRecord::IsBlob) == 0);
553 
554       /* If col is nullable, check for null and
555        * set bit
556        */
557       if (col->flags & NdbRecord::IsNullable)
558       {
559 	if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
560 	{
561           setRecToNULL(col, m_record.m_row_recv);
562 
563           // Next column...
564 	  continue;
565 	}
566       }
567 
568       Uint32 align = col->orgAttrSize;
569       Uint32 sz = col->maxSize;
570       Uint32 len = col->bitCount;
571       Uint32 arrayType =
572         (col->flags & NdbRecord::IsVar1ByteLen)?
573         NDB_ARRAYTYPE_SHORT_VAR :
574         (
575          (col->flags & NdbRecord::IsVar2ByteLen)?
576          NDB_ARRAYTYPE_MEDIUM_VAR :
577          NDB_ARRAYTYPE_FIXED);
578 
579       switch(align){
580       case DictTabInfo::aBit: // Bit
581         handle_bitfield_ndbrecord(col,
582                                   src,
583                                   bitPos,
584                                   len,
585                                   row);
586         continue; // Next column
587       default:
588         src = pad(src, align, bitPos);
589       }
590       switch(arrayType){
591       case NDB_ARRAYTYPE_FIXED:
592         break;
593       case NDB_ARRAYTYPE_SHORT_VAR:
594         sz = 1 + src[0];
595         break;
596       case NDB_ARRAYTYPE_MEDIUM_VAR:
597 	sz = 2 + src[0] + 256 * src[1];
598         break;
599       default:
600         abort();
601       }
602 
603       bitPos = 0;
604       assignToRec(col,
605                   row,
606                   src,
607                   sz);
608 
609       src += sz;
610     }
611   }
612 
613   return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
614 }
615 
616 
617 int
get_keyinfo20(Uint32 & scaninfo,Uint32 & length,const char * & data_ptr) const618 NdbReceiver::get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
619                            const char * & data_ptr) const
620 {
621   assert(m_using_ndb_record);
622   Uint32 idx= m_current_row;
623   if (idx == 0)
624     return -1;                                  // No rows fetched yet
625   const char *p= m_record.m_row_buffer +
626     (idx-1)*m_record.m_row_offset +
627     m_record.m_ndb_record->m_row_size;
628   if (m_record.m_read_range_no)
629     p+= 4;
630   scaninfo= uint4korr(p);
631   p+= 4;
632   length= uint4korr(p);
633   p+= 4;
634   data_ptr= p;
635   return 0;
636 }
637 
638 
639 int
getScanAttrData(const char * & data,Uint32 & size,Uint32 & pos) const640 NdbReceiver::getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const
641 {
642   assert(m_using_ndb_record);
643   Uint32 idx= m_current_row;
644   if (idx == 0)
645     return -1;                                  // No rows fetched yet
646   const char *row_end= m_record.m_row_buffer + idx*m_record.m_row_offset;
647 
648   pos+= sizeof(Uint32);
649   memcpy(&size, row_end - pos, sizeof(Uint32));
650   pos+= size;
651   data= row_end - pos;
652 
653   assert (pos <= m_record.m_row_offset);
654   return 0;
655 }
656 
657 int
execTRANSID_AI(const Uint32 * aDataPtr,Uint32 aLength)658 NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
659 {
660   /*
661    * NdbRecord and NdbRecAttr row result handling are merged here
662    *   First any NdbRecord attributes are extracted
663    *   Then any NdbRecAttr attributes are extracted
664    *   NdbRecord scans with extra NdbRecAttr getValue() attrs
665    *   are handled separately in the NdbRecord code
666    * Scenarios :
667    *   NdbRecord only PK read result
668    *   NdbRecAttr only PK read result
669    *   Mixed PK read results
670    *   NdbRecord only scan read result
671    *   NdbRecAttr only scan read result
672    *   Mixed scan read results
673    */
674   Uint32 exp= m_expected_result_length;
675   Uint32 tmp= m_received_result_length + aLength;
676   Uint32 origLength=aLength;
677   NdbRecAttr* currRecAttr = theCurrentRecAttr;
678   Uint32 save_pos= 0;
679 
680   bool ndbrecord_part_done= !m_using_ndb_record;
681   const bool isScan= (m_type == NDB_SCANRECEIVER) ||
682     (m_type == NDB_QUERY_OPERATION);
683 
684   /* Read words from the incoming signal train.
685    * The length passed in is enough for one row, either as an individual
686    * read op, or part of a scan.  When there are no more words, we're at
687    * the end of the row
688    */
689   while (aLength > 0)
690   {
691     AttributeHeader ah(* aDataPtr++);
692     const Uint32 attrId= ah.getAttributeId();
693     Uint32 attrSize= ah.getByteSize();
694     aLength--;
695 
696     if (!ndbrecord_part_done)
697     {
698       /* Special case for RANGE_NO, which is received first and is
699        * stored just after the row. */
700       if (attrId == AttributeHeader::RANGE_NO)
701       {
702         assert(m_record.m_read_range_no);
703         assert(attrSize==4);
704         assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
705         memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size,
706                aDataPtr++, 4);
707         aLength--;
708         continue; // Next
709       }
710 
711       /* Normal case for all NdbRecord primary key, index key, table scan
712        * and index scan reads.  Extract all requested columns from packed
713        * format into the row.
714        */
715       if (attrId == AttributeHeader::READ_PACKED)
716       {
717         assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
718         Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
719                                              aDataPtr,
720                                              m_record.m_row_recv);
721         aDataPtr+= len;
722         aLength-= len;
723         continue;  // Next
724       }
725 
726       /* If we get here then we must have 'extra getValues' - columns
727        * requested outwith the normal NdbRecord + bitmask mechanism.
728        * This could be : pseudo columns, columns read via an old-Api
729        * scan, or just some extra columns added by the user to an
730        * NdbRecord operation.
731        * If the extra values are part of a scan then they get copied
732        * to a special area after the end of the normal row data.
733        * When the user calls NdbScanOperation.nextResult() they will
734        * be copied into the correct NdbRecAttr objects.
735        * If the extra values are not part of a scan then they are
736        * put into their NdbRecAttr objects now.
737        */
738       if (isScan)
739       {
740         /* For scans, we save the extra information at the end of the
741          * row buffer, in reverse order.  When nextResult() is called,
742          * this data is copied into the correct NdbRecAttr objects.
743          */
744 
745         /* Save this extra getValue */
746         save_pos+= sizeof(Uint32);
747         memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
748                &attrSize, sizeof(Uint32));
749         if (attrSize > 0)
750         {
751           save_pos+= attrSize;
752           assert (save_pos<=m_record.m_row_offset);
753           memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
754                  aDataPtr, attrSize);
755         }
756 
757         Uint32 sizeInWords= (attrSize+3)>>2;
758         aDataPtr+= sizeInWords;
759         aLength-= sizeInWords;
760         continue; // Next
761       }
762       else
763       {
764         /* Not a scan, so extra information is added to RecAttrs in
765          * the 'normal' way.
766          */
767         assert(theCurrentRecAttr != NULL);
768         assert(theCurrentRecAttr->attrId() == attrId);
769         /* Handle extra attributes requested with getValue(). */
770         /* This implies that we've finished with the NdbRecord part
771            of the read, so move onto NdbRecAttr */
772         ndbrecord_part_done=true;
773         // Fall through to RecAttr handling
774       }
775     } // / if (!ndbrecord_part_done)
776 
777     /* If we get here then there are some attribute values to be
778      * read into the attached list of NdbRecAttrs.
779      * This occurs for old-Api primary and unique index keyed operations
780      * and for NdbRecord primary and unique index keyed operations
781      * using 'extra GetValues'.
782      */
783     if (ndbrecord_part_done)
784     {
785       // We've processed the NdbRecord part of the TRANSID_AI, if
786       // any.  There are signal words left, so they must be
787       // RecAttr data
788       //
789       if (attrId == AttributeHeader::READ_PACKED)
790       {
791         assert(!m_using_ndb_record);
792         NdbRecAttr* tmp = currRecAttr;
793         Uint32 len = receive_packed_recattr(&tmp, attrSize>>2, aDataPtr, origLength);
794         aDataPtr += len;
795         aLength -= len;
796         currRecAttr = tmp;
797         continue;
798       }
799       /**
800        * Skip over missing attributes
801        * TODO : How can this happen?
802        */
803       while(currRecAttr && currRecAttr->attrId() != attrId){
804             currRecAttr = currRecAttr->next();
805       }
806 
807       if(currRecAttr && currRecAttr->receive_data(aDataPtr, attrSize))
808       {
809         Uint32 add= (attrSize + 3) >> 2;
810         aLength -= add;
811         aDataPtr += add;
812         currRecAttr = currRecAttr->next();
813       } else {
814         /*
815           This should not happen: we got back an attribute for which we have no
816           stored NdbRecAttr recording that we requested said attribute (or we got
817           back attributes in the wrong order).
818           So dump some info for debugging, and abort.
819         */
820         ndbout_c("this=%p: attrId: %d currRecAttr: %p theCurrentRecAttr: %p "
821                  "attrSize: %d %d", this,
822 	         attrId, currRecAttr, theCurrentRecAttr, attrSize,
823                  currRecAttr ? currRecAttr->get_size_in_bytes() : 0);
824         currRecAttr = theCurrentRecAttr;
825         while(currRecAttr != 0){
826 	  ndbout_c("%d ", currRecAttr->attrId());
827 	  currRecAttr = currRecAttr->next();
828         }
829         abort();
830         return -1;
831       } // if (currRecAttr...)
832     } // /if (ndbrecord_part_done)
833   } // / while (aLength > 0)
834 
835   theCurrentRecAttr = currRecAttr;
836 
837   m_received_result_length = tmp;
838 
839   if (m_using_ndb_record) {
840     /* Move onto next row in scan buffer */
841     m_record.m_row_recv+= m_record.m_row_offset;
842   }
843   return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
844 }
845 
846 int
execKEYINFO20(Uint32 info,const Uint32 * aDataPtr,Uint32 aLength)847 NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
848 {
849   if (m_using_ndb_record)
850   {
851     /* Copy in the keyinfo after the user row and any range_no value. */
852 
853     char *keyinfo_ptr= m_record.m_row_buffer +
854                        m_current_row++ * m_record.m_row_offset +
855                        m_record.m_ndb_record->m_row_size;
856     if (m_record.m_read_range_no)
857       keyinfo_ptr+= 4;
858 
859     int4store(keyinfo_ptr, info);
860     keyinfo_ptr+= 4;
861     int4store(keyinfo_ptr, aLength);
862     keyinfo_ptr+= 4;
863     memcpy(keyinfo_ptr, aDataPtr, 4*aLength);
864 
865     Uint32 tmp= m_received_result_length + aLength;
866     m_received_result_length = tmp;
867 
868     return (tmp == m_expected_result_length ? 1 : 0);
869   }
870 
871   /* The old method, using NdbRecAttr. */
872   NdbRecAttr* currRecAttr = m_rows[m_current_row++];
873   assert(currRecAttr->attrId() == KEY_ATTR_ID);
874   /*
875     This is actually reading data one word off the end of the received
876     signal (or off the end of the long signal data section 0, for a
877     long signal), due to the aLength+1. This is to ensure the correct length
878     being set for the NdbRecAttr (one extra word for the scanInfo word placed
879     at the end), overwritten immediately below.
880     But it's a bit ugly that we rely on being able to read one word over the
881     end of the signal without crashing...
882   */
883   currRecAttr->receive_data(aDataPtr, 4*(aLength + 1));
884 
885   /**
886    * Save scanInfo in the end of keyinfo
887    */
888   ((Uint32*)currRecAttr->aRef())[aLength] = info;
889 
890   Uint32 tmp = m_received_result_length + aLength;
891   m_received_result_length = tmp;
892 
893   return (tmp == m_expected_result_length ? 1 : 0);
894 }
895 
896 void
setErrorCode(int code)897 NdbReceiver::setErrorCode(int code)
898 {
899   theMagicNumber = 0;
900   if (getType()==NDB_QUERY_OPERATION)
901   {
902     NdbQueryOperationImpl* op = (NdbQueryOperationImpl*)getOwner();
903     op->getQuery().setErrorCode(code);
904   }
905   else
906   {
907     NdbOperation* const op = (NdbOperation*)getOwner();
908     assert(op->checkMagicNumber()==0);
909     op->setErrorCode(code);
910   }
911 }
912