1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "API.hpp"
26 #include <AttributeHeader.hpp>
27 #include <signaldata/TcKeyConf.hpp>
28 #include <signaldata/DictTabInfo.hpp>
29
NdbReceiver(Ndb * aNdb)30 NdbReceiver::NdbReceiver(Ndb *aNdb) :
31 theMagicNumber(0),
32 m_ndb(aNdb),
33 m_id(NdbObjectIdMap::InvalidId),
34 m_tcPtrI(RNIL),
35 m_type(NDB_UNINITIALIZED),
36 m_owner(0),
37 m_using_ndb_record(false),
38 theFirstRecAttr(NULL),
39 theCurrentRecAttr(NULL),
40 m_rows(NULL),
41 m_current_row(0xffffffff),
42 m_result_rows(0)
43 {}
44
~NdbReceiver()45 NdbReceiver::~NdbReceiver()
46 {
47 DBUG_ENTER("NdbReceiver::~NdbReceiver");
48 if (m_id != NdbObjectIdMap::InvalidId) {
49 m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
50 }
51 delete[] m_rows;
52 DBUG_VOID_RETURN;
53 }
54
55 int
init(ReceiverType type,bool useRec,void * owner)56 NdbReceiver::init(ReceiverType type, bool useRec, void* owner)
57 {
58 theMagicNumber = 0x11223344;
59 m_type = type;
60 m_using_ndb_record= useRec;
61 m_owner = owner;
62
63 if (useRec)
64 {
65 m_record.m_ndb_record= NULL;
66 m_record.m_row_recv= NULL;
67 m_record.m_row_buffer= NULL;
68 m_record.m_row_offset= 0;
69 m_record.m_read_range_no= false;
70 }
71 theFirstRecAttr = NULL;
72 theCurrentRecAttr = NULL;
73
74 if (m_id == NdbObjectIdMap::InvalidId) {
75 if (m_ndb)
76 {
77 m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
78 if (m_id == NdbObjectIdMap::InvalidId)
79 {
80 setErrorCode(4000);
81 return -1;
82 }
83 }
84 }
85
86 return 0;
87 }
88
89 void
release()90 NdbReceiver::release(){
91 theMagicNumber = 0;
92 NdbRecAttr* tRecAttr = theFirstRecAttr;
93 while (tRecAttr != NULL)
94 {
95 NdbRecAttr* tSaveRecAttr = tRecAttr;
96 tRecAttr = tRecAttr->next();
97 m_ndb->releaseRecAttr(tSaveRecAttr);
98 }
99 m_using_ndb_record= false;
100 theFirstRecAttr = NULL;
101 theCurrentRecAttr = NULL;
102 }
103
104 NdbRecAttr *
getValue(const NdbColumnImpl * tAttrInfo,char * user_dst_ptr)105 NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
106 NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
107 if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
108 if (theFirstRecAttr == NULL)
109 theFirstRecAttr = tRecAttr;
110 else
111 theCurrentRecAttr->next(tRecAttr);
112 theCurrentRecAttr = tRecAttr;
113 tRecAttr->next(NULL);
114 return tRecAttr;
115 }
116 if(tRecAttr){
117 m_ndb->releaseRecAttr(tRecAttr);
118 }
119 return 0;
120 }
121
122 void
getValues(const NdbRecord * rec,char * row_ptr)123 NdbReceiver::getValues(const NdbRecord* rec, char *row_ptr)
124 {
125 assert(m_using_ndb_record);
126
127 m_record.m_ndb_record= rec;
128 m_record.m_row_recv= row_ptr;
129 m_record.m_row_offset= rec->m_row_size;
130 }
131
132 void
prepareReceive(char * buf)133 NdbReceiver::prepareReceive(char *buf)
134 {
135 /* Set pointers etc. to prepare for receiving the first row of the batch. */
136 assert(theMagicNumber == 0x11223344);
137 m_received_result_length = 0;
138 m_expected_result_length = 0;
139 if (m_using_ndb_record)
140 {
141 m_record.m_row_recv= buf;
142 }
143 theCurrentRecAttr = theFirstRecAttr;
144 }
145
146 void
prepareRead(char * buf,Uint32 rows)147 NdbReceiver::prepareRead(char *buf, Uint32 rows)
148 {
149 /* Set pointers etc. to prepare for reading the first row of the batch. */
150 assert(theMagicNumber == 0x11223344);
151 m_current_row = 0;
152 m_result_rows = rows;
153 if (m_using_ndb_record)
154 {
155 m_record.m_row_buffer = buf;
156 }
157 }
158
159 #define KEY_ATTR_ID (~(Uint32)0)
160
161 /*
162 Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
163 use, taking into account limits in the transporter, user preference, etc.
164
165 Hm, there are some magic overhead numbers (4 bytes/attr, 32 bytes/row) here,
166 would be nice with some explanation on how these numbers were derived.
167
168 TODO : Check whether these numbers need to be revised w.r.t. read packed
169 */
170 //static
171 void
calculate_batch_size(const NdbImpl & theImpl,const NdbRecord * record,const NdbRecAttr * first_rec_attr,Uint32 key_size,Uint32 parallelism,Uint32 & batch_size,Uint32 & batch_byte_size,Uint32 & first_batch_size)172 NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
173 const NdbRecord *record,
174 const NdbRecAttr *first_rec_attr,
175 Uint32 key_size,
176 Uint32 parallelism,
177 Uint32& batch_size,
178 Uint32& batch_byte_size,
179 Uint32& first_batch_size)
180 {
181 const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
182 const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
183 const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
184 const Uint32 max_batch_size= cfg.m_batch_size;
185
186 Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
187 if (record)
188 {
189 tot_size+= record->m_max_transid_ai_bytes;
190 }
191
192 const NdbRecAttr *rec_attr= first_rec_attr;
193 while (rec_attr != NULL) {
194 Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
195 attr_size= ((attr_size + 4 + 3) >> 2) << 2; //Even to word + overhead
196 tot_size+= attr_size;
197 rec_attr= rec_attr->next();
198 }
199
200 tot_size+= 32; //include signal overhead
201
202 /**
203 * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
204 * bytes sent for each batch from each node. We do however ensure that
205 * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
206 * batch.
207 */
208 if (batch_size == 0)
209 {
210 batch_byte_size= max_batch_byte_size;
211 }
212 else
213 {
214 batch_byte_size= batch_size * tot_size;
215 }
216
217 if (batch_byte_size * parallelism > max_scan_batch_size) {
218 batch_byte_size= max_scan_batch_size / parallelism;
219 }
220 batch_size= batch_byte_size / tot_size;
221 if (batch_size == 0) {
222 batch_size= 1;
223 } else {
224 if (batch_size > max_batch_size) {
225 batch_size= max_batch_size;
226 } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
227 batch_size= MAX_PARALLEL_OP_PER_SCAN;
228 }
229 }
230 first_batch_size= batch_size;
231 return;
232 }
233
234 void
calculate_batch_size(Uint32 key_size,Uint32 parallelism,Uint32 & batch_size,Uint32 & batch_byte_size,Uint32 & first_batch_size,const NdbRecord * record) const235 NdbReceiver::calculate_batch_size(Uint32 key_size,
236 Uint32 parallelism,
237 Uint32& batch_size,
238 Uint32& batch_byte_size,
239 Uint32& first_batch_size,
240 const NdbRecord *record) const
241 {
242 calculate_batch_size(* m_ndb->theImpl,
243 record,
244 theFirstRecAttr,
245 key_size, parallelism, batch_size, batch_byte_size,
246 first_batch_size);
247 }
248
249 void
do_setup_ndbrecord(const NdbRecord * ndb_record,Uint32 batch_size,Uint32 key_size,Uint32 read_range_no,Uint32 rowsize,char * row_buffer)250 NdbReceiver::do_setup_ndbrecord(const NdbRecord *ndb_record, Uint32 batch_size,
251 Uint32 key_size, Uint32 read_range_no,
252 Uint32 rowsize, char *row_buffer)
253 {
254 m_using_ndb_record= true;
255 m_record.m_ndb_record= ndb_record;
256 m_record.m_row_recv= row_buffer;
257 m_record.m_row_buffer= row_buffer;
258 m_record.m_row_offset= rowsize;
259 m_record.m_read_range_no= read_range_no;
260 }
261
262 //static
263 Uint32
ndbrecord_rowsize(const NdbRecord * ndb_record,const NdbRecAttr * first_rec_attr,Uint32 key_size,bool read_range_no)264 NdbReceiver::ndbrecord_rowsize(const NdbRecord *ndb_record,
265 const NdbRecAttr *first_rec_attr,
266 Uint32 key_size,
267 bool read_range_no)
268 {
269 Uint32 rowsize= (ndb_record) ? ndb_record->m_row_size : 0;
270
271 /* Room for range_no. */
272 if (read_range_no)
273 rowsize+= 4;
274 /*
275 If keyinfo, need room for max. key + 4 bytes of actual key length + 4
276 bytes of scan info (all from KEYINFO20 signal).
277 */
278 if (key_size)
279 rowsize+= 8 + key_size*4;
280 /*
281 Compute extra space needed to buffer getValue() results in NdbRecord
282 scans.
283 */
284 const NdbRecAttr *ra= first_rec_attr;
285 while (ra != NULL)
286 {
287 rowsize+= sizeof(Uint32) + ra->getColumn()->getSizeInBytes();
288 ra= ra->next();
289 }
290 /* Ensure 4-byte alignment. */
291 rowsize= (rowsize+3) & 0xfffffffc;
292 return rowsize;
293 }
294
295 /**
296 * pad
297 * This function determines how much 'padding' should be applied
298 * to the passed in pointer and bitPos to get to the start of a
299 * field with the passed in alignment.
300 * The rules are :
301 * - First bit field is 32-bit aligned
302 * - Subsequent bit fields are packed in the next available bits
303 * - 8 and 16 bit aligned fields are packed in the next available
304 * word (but not necessarily word aligned.
305 * - 32, 64 and 128 bit aligned fields are packed in the next
306 * aligned 32-bit word.
307 * This algorithm is used to unpack a stream of fields packed by the code
308 * in src/kernel/blocks/dbtup/DbtupRoutines::read_packed()
309 */
310 static
311 inline
312 const Uint8*
pad(const Uint8 * src,Uint32 align,Uint32 bitPos)313 pad(const Uint8* src, Uint32 align, Uint32 bitPos)
314 {
315 UintPtr ptr = UintPtr(src);
316 switch(align){
317 case DictTabInfo::aBit:
318 case DictTabInfo::a32Bit:
319 case DictTabInfo::a64Bit:
320 case DictTabInfo::a128Bit:
321 return (Uint8*)(((ptr + 3) & ~(UintPtr)3) + 4 * ((bitPos + 31) >> 5));
322 charpad:
323 case DictTabInfo::an8Bit:
324 case DictTabInfo::a16Bit:
325 return src + 4 * ((bitPos + 31) >> 5);
326 default:
327 #ifdef VM_TRACE
328 abort();
329 #endif
330 goto charpad;
331 }
332 }
333
334 /**
335 * handle_packed_bit
336 * This function copies the bitfield of length len, offset pos from
337 * word-aligned ptr _src to memory starting at the byte ptr dst.
338 */
339 static
340 void
handle_packed_bit(const char * _src,Uint32 pos,Uint32 len,char * _dst)341 handle_packed_bit(const char* _src, Uint32 pos, Uint32 len, char* _dst)
342 {
343 Uint32 * src = (Uint32*)_src;
344 assert((UintPtr(src) & 3) == 0);
345
346 /* Convert char* to aligned Uint32* and some byte offset */
347 UintPtr uiPtr= UintPtr((Uint32*)_dst);
348 Uint32 dstByteOffset= Uint32(uiPtr) & 3;
349 Uint32* dst= (Uint32*) (uiPtr - dstByteOffset);
350
351 BitmaskImpl::copyField(dst, dstByteOffset << 3,
352 src, pos, len);
353 }
354
355
356 /**
357 * receive_packed_recattr
358 * Receive a packed stream of field values, whose presence and nullness
359 * is indicated by a leading bitmap into a list of NdbRecAttr objects
360 * Return the number of words read from the input stream.
361 */
362 Uint32
receive_packed_recattr(NdbRecAttr ** recAttr,Uint32 bmlen,const Uint32 * aDataPtr,Uint32 aLength)363 NdbReceiver::receive_packed_recattr(NdbRecAttr** recAttr,
364 Uint32 bmlen,
365 const Uint32* aDataPtr,
366 Uint32 aLength)
367 {
368 NdbRecAttr* currRecAttr = *recAttr;
369 const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
370 Uint32 bitPos = 0;
371 for (Uint32 i = 0, attrId = 0; i<32*bmlen; i++, attrId++)
372 {
373 if (BitmaskImpl::get(bmlen, aDataPtr, i))
374 {
375 const NdbColumnImpl & col =
376 NdbColumnImpl::getImpl(* currRecAttr->getColumn());
377 if (unlikely(attrId != (Uint32)col.m_attrId))
378 goto err;
379 if (col.m_nullable)
380 {
381 if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
382 {
383 currRecAttr->setNULL();
384 currRecAttr = currRecAttr->next();
385 continue;
386 }
387 }
388 Uint32 align = col.m_orgAttrSize;
389 Uint32 attrSize = col.m_attrSize;
390 Uint32 array = col.m_arraySize;
391 Uint32 len = col.m_length;
392 Uint32 sz = attrSize * array;
393 Uint32 arrayType = col.m_arrayType;
394
395 switch(align){
396 case DictTabInfo::aBit: // Bit
397 src = pad(src, 0, 0);
398 handle_packed_bit((const char*)src, bitPos, len,
399 currRecAttr->aRef());
400 src += 4 * ((bitPos + len) >> 5);
401 bitPos = (bitPos + len) & 31;
402 goto next;
403 default:
404 src = pad(src, align, bitPos);
405 }
406 switch(arrayType){
407 case NDB_ARRAYTYPE_FIXED:
408 break;
409 case NDB_ARRAYTYPE_SHORT_VAR:
410 sz = 1 + src[0];
411 break;
412 case NDB_ARRAYTYPE_MEDIUM_VAR:
413 sz = 2 + src[0] + 256 * src[1];
414 break;
415 default:
416 goto err;
417 }
418
419 bitPos = 0;
420 currRecAttr->receive_data((Uint32*)src, sz);
421 src += sz;
422 next:
423 currRecAttr = currRecAttr->next();
424 }
425 }
426 * recAttr = currRecAttr;
427 return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
428
429 err:
430 abort();
431 return 0;
432 }
433
434
435 /* Set NdbRecord field to non-NULL value. */
assignToRec(const NdbRecord::Attr * col,char * row,const Uint8 * src,Uint32 byteSize)436 static void assignToRec(const NdbRecord::Attr *col,
437 char *row,
438 const Uint8 *src,
439 Uint32 byteSize)
440 {
441 /* Set NULLable attribute to "not NULL". */
442 if (col->flags & NdbRecord::IsNullable)
443 row[col->nullbit_byte_offset]&= ~(1 << col->nullbit_bit_in_byte);
444
445 memcpy(&row[col->offset], src, byteSize);
446 }
447
448 /* Set NdbRecord field to NULL. */
setRecToNULL(const NdbRecord::Attr * col,char * row)449 static void setRecToNULL(const NdbRecord::Attr *col,
450 char *row)
451 {
452 assert(col->flags & NdbRecord::IsNullable);
453 row[col->nullbit_byte_offset]|= 1 << col->nullbit_bit_in_byte;
454 }
455
456 int
get_range_no() const457 NdbReceiver::get_range_no() const
458 {
459 int range_no;
460 assert(m_using_ndb_record);
461 Uint32 idx= m_current_row;
462 if (idx == 0 || !m_record.m_read_range_no)
463 return -1;
464 memcpy(&range_no,
465 m_record.m_row_buffer +
466 (idx-1)*m_record.m_row_offset +
467 m_record.m_ndb_record->m_row_size,
468 4);
469 return range_no;
470 }
471
472 /**
473 * handle_bitfield_ndbrecord
474 * Packed bitfield handling for NdbRecord - also deals with
475 * mapping the bitfields into MySQLD format if necessary.
476 */
477 static void
handle_bitfield_ndbrecord(const NdbRecord::Attr * col,const Uint8 * & src,Uint32 & bitPos,Uint32 & len,char * row)478 handle_bitfield_ndbrecord(const NdbRecord::Attr* col,
479 const Uint8*& src,
480 Uint32& bitPos,
481 Uint32& len,
482 char* row)
483 {
484 if (col->flags & NdbRecord::IsNullable)
485 {
486 /* Clear nullbit in row */
487 row[col->nullbit_byte_offset] &=
488 ~(1 << col->nullbit_bit_in_byte);
489 }
490
491 char* dest;
492 Uint64 mysqldSpace;
493
494 /* For MySqldBitField, we read it as normal into a local on the
495 * stack and then use the put_mysqld_bitfield function to rearrange
496 * and write it to the row
497 */
498 bool isMDBitfield= (col->flags & NdbRecord::IsMysqldBitfield) != 0;
499
500 if (isMDBitfield)
501 {
502 assert(len <= 64);
503 dest= (char*) &mysqldSpace;
504 }
505 else
506 dest= row + col->offset;
507
508 /* Copy bitfield to memory starting at dest */
509 src = pad(src, 0, 0);
510 handle_packed_bit((const char*)src, bitPos, len, dest);
511 src += 4 * ((bitPos + len) >> 5);
512 bitPos = (bitPos + len) & 31;
513
514 if (isMDBitfield)
515 /* Rearrange bitfield from stack to row storage */
516 col->put_mysqld_bitfield(row, dest);
517 }
518
519
520 /**
521 * receive_packed_ndbrecord
522 * Receive a packed stream of field values, whose presence and nullness
523 * is indicated by a leading bitmap, into an NdbRecord row.
524 * Return the number of words consumed from the input stream.
525 */
526 Uint32
receive_packed_ndbrecord(Uint32 bmlen,const Uint32 * aDataPtr,char * row)527 NdbReceiver::receive_packed_ndbrecord(Uint32 bmlen,
528 const Uint32* aDataPtr,
529 char* row)
530 {
531 const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
532 Uint32 bitPos = 0;
533 const NdbRecord* rec= m_record.m_ndb_record;
534 const Uint32 maxAttrId= rec->columns[rec->noOfColumns -1].attrId;
535 const Uint32 bmSize= bmlen << 5;
536
537 /* Use bitmap to determine which columns have been sent */
538 for (Uint32 i = 0, attrId = 0;
539 (i < bmSize) && (attrId <= maxAttrId);
540 i++, attrId++)
541 {
542 if (BitmaskImpl::get(bmlen, aDataPtr, i))
543 {
544 /* Found bit in column presence bitmask, get corresponding
545 * Attr struct from NdbRecord
546 */
547 assert(attrId < rec->m_attrId_indexes_length);
548 assert((Uint32) rec->m_attrId_indexes[attrId]
549 < rec->noOfColumns);
550 const NdbRecord::Attr* col= &rec->columns[rec->m_attrId_indexes[attrId]];
551
552 assert((col->flags & NdbRecord::IsBlob) == 0);
553
554 /* If col is nullable, check for null and
555 * set bit
556 */
557 if (col->flags & NdbRecord::IsNullable)
558 {
559 if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
560 {
561 setRecToNULL(col, m_record.m_row_recv);
562
563 // Next column...
564 continue;
565 }
566 }
567
568 Uint32 align = col->orgAttrSize;
569 Uint32 sz = col->maxSize;
570 Uint32 len = col->bitCount;
571 Uint32 arrayType =
572 (col->flags & NdbRecord::IsVar1ByteLen)?
573 NDB_ARRAYTYPE_SHORT_VAR :
574 (
575 (col->flags & NdbRecord::IsVar2ByteLen)?
576 NDB_ARRAYTYPE_MEDIUM_VAR :
577 NDB_ARRAYTYPE_FIXED);
578
579 switch(align){
580 case DictTabInfo::aBit: // Bit
581 handle_bitfield_ndbrecord(col,
582 src,
583 bitPos,
584 len,
585 row);
586 continue; // Next column
587 default:
588 src = pad(src, align, bitPos);
589 }
590 switch(arrayType){
591 case NDB_ARRAYTYPE_FIXED:
592 break;
593 case NDB_ARRAYTYPE_SHORT_VAR:
594 sz = 1 + src[0];
595 break;
596 case NDB_ARRAYTYPE_MEDIUM_VAR:
597 sz = 2 + src[0] + 256 * src[1];
598 break;
599 default:
600 abort();
601 }
602
603 bitPos = 0;
604 assignToRec(col,
605 row,
606 src,
607 sz);
608
609 src += sz;
610 }
611 }
612
613 return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
614 }
615
616
617 int
get_keyinfo20(Uint32 & scaninfo,Uint32 & length,const char * & data_ptr) const618 NdbReceiver::get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
619 const char * & data_ptr) const
620 {
621 assert(m_using_ndb_record);
622 Uint32 idx= m_current_row;
623 if (idx == 0)
624 return -1; // No rows fetched yet
625 const char *p= m_record.m_row_buffer +
626 (idx-1)*m_record.m_row_offset +
627 m_record.m_ndb_record->m_row_size;
628 if (m_record.m_read_range_no)
629 p+= 4;
630 scaninfo= uint4korr(p);
631 p+= 4;
632 length= uint4korr(p);
633 p+= 4;
634 data_ptr= p;
635 return 0;
636 }
637
638
639 int
getScanAttrData(const char * & data,Uint32 & size,Uint32 & pos) const640 NdbReceiver::getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const
641 {
642 assert(m_using_ndb_record);
643 Uint32 idx= m_current_row;
644 if (idx == 0)
645 return -1; // No rows fetched yet
646 const char *row_end= m_record.m_row_buffer + idx*m_record.m_row_offset;
647
648 pos+= sizeof(Uint32);
649 memcpy(&size, row_end - pos, sizeof(Uint32));
650 pos+= size;
651 data= row_end - pos;
652
653 assert (pos <= m_record.m_row_offset);
654 return 0;
655 }
656
657 int
execTRANSID_AI(const Uint32 * aDataPtr,Uint32 aLength)658 NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
659 {
660 /*
661 * NdbRecord and NdbRecAttr row result handling are merged here
662 * First any NdbRecord attributes are extracted
663 * Then any NdbRecAttr attributes are extracted
664 * NdbRecord scans with extra NdbRecAttr getValue() attrs
665 * are handled separately in the NdbRecord code
666 * Scenarios :
667 * NdbRecord only PK read result
668 * NdbRecAttr only PK read result
669 * Mixed PK read results
670 * NdbRecord only scan read result
671 * NdbRecAttr only scan read result
672 * Mixed scan read results
673 */
674 Uint32 exp= m_expected_result_length;
675 Uint32 tmp= m_received_result_length + aLength;
676 Uint32 origLength=aLength;
677 NdbRecAttr* currRecAttr = theCurrentRecAttr;
678 Uint32 save_pos= 0;
679
680 bool ndbrecord_part_done= !m_using_ndb_record;
681 const bool isScan= (m_type == NDB_SCANRECEIVER) ||
682 (m_type == NDB_QUERY_OPERATION);
683
684 /* Read words from the incoming signal train.
685 * The length passed in is enough for one row, either as an individual
686 * read op, or part of a scan. When there are no more words, we're at
687 * the end of the row
688 */
689 while (aLength > 0)
690 {
691 AttributeHeader ah(* aDataPtr++);
692 const Uint32 attrId= ah.getAttributeId();
693 Uint32 attrSize= ah.getByteSize();
694 aLength--;
695
696 if (!ndbrecord_part_done)
697 {
698 /* Special case for RANGE_NO, which is received first and is
699 * stored just after the row. */
700 if (attrId == AttributeHeader::RANGE_NO)
701 {
702 assert(m_record.m_read_range_no);
703 assert(attrSize==4);
704 assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
705 memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size,
706 aDataPtr++, 4);
707 aLength--;
708 continue; // Next
709 }
710
711 /* Normal case for all NdbRecord primary key, index key, table scan
712 * and index scan reads. Extract all requested columns from packed
713 * format into the row.
714 */
715 if (attrId == AttributeHeader::READ_PACKED)
716 {
717 assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
718 Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
719 aDataPtr,
720 m_record.m_row_recv);
721 aDataPtr+= len;
722 aLength-= len;
723 continue; // Next
724 }
725
726 /* If we get here then we must have 'extra getValues' - columns
727 * requested outwith the normal NdbRecord + bitmask mechanism.
728 * This could be : pseudo columns, columns read via an old-Api
729 * scan, or just some extra columns added by the user to an
730 * NdbRecord operation.
731 * If the extra values are part of a scan then they get copied
732 * to a special area after the end of the normal row data.
733 * When the user calls NdbScanOperation.nextResult() they will
734 * be copied into the correct NdbRecAttr objects.
735 * If the extra values are not part of a scan then they are
736 * put into their NdbRecAttr objects now.
737 */
738 if (isScan)
739 {
740 /* For scans, we save the extra information at the end of the
741 * row buffer, in reverse order. When nextResult() is called,
742 * this data is copied into the correct NdbRecAttr objects.
743 */
744
745 /* Save this extra getValue */
746 save_pos+= sizeof(Uint32);
747 memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
748 &attrSize, sizeof(Uint32));
749 if (attrSize > 0)
750 {
751 save_pos+= attrSize;
752 assert (save_pos<=m_record.m_row_offset);
753 memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
754 aDataPtr, attrSize);
755 }
756
757 Uint32 sizeInWords= (attrSize+3)>>2;
758 aDataPtr+= sizeInWords;
759 aLength-= sizeInWords;
760 continue; // Next
761 }
762 else
763 {
764 /* Not a scan, so extra information is added to RecAttrs in
765 * the 'normal' way.
766 */
767 assert(theCurrentRecAttr != NULL);
768 assert(theCurrentRecAttr->attrId() == attrId);
769 /* Handle extra attributes requested with getValue(). */
770 /* This implies that we've finished with the NdbRecord part
771 of the read, so move onto NdbRecAttr */
772 ndbrecord_part_done=true;
773 // Fall through to RecAttr handling
774 }
775 } // / if (!ndbrecord_part_done)
776
777 /* If we get here then there are some attribute values to be
778 * read into the attached list of NdbRecAttrs.
779 * This occurs for old-Api primary and unique index keyed operations
780 * and for NdbRecord primary and unique index keyed operations
781 * using 'extra GetValues'.
782 */
783 if (ndbrecord_part_done)
784 {
785 // We've processed the NdbRecord part of the TRANSID_AI, if
786 // any. There are signal words left, so they must be
787 // RecAttr data
788 //
789 if (attrId == AttributeHeader::READ_PACKED)
790 {
791 assert(!m_using_ndb_record);
792 NdbRecAttr* tmp = currRecAttr;
793 Uint32 len = receive_packed_recattr(&tmp, attrSize>>2, aDataPtr, origLength);
794 aDataPtr += len;
795 aLength -= len;
796 currRecAttr = tmp;
797 continue;
798 }
799 /**
800 * Skip over missing attributes
801 * TODO : How can this happen?
802 */
803 while(currRecAttr && currRecAttr->attrId() != attrId){
804 currRecAttr = currRecAttr->next();
805 }
806
807 if(currRecAttr && currRecAttr->receive_data(aDataPtr, attrSize))
808 {
809 Uint32 add= (attrSize + 3) >> 2;
810 aLength -= add;
811 aDataPtr += add;
812 currRecAttr = currRecAttr->next();
813 } else {
814 /*
815 This should not happen: we got back an attribute for which we have no
816 stored NdbRecAttr recording that we requested said attribute (or we got
817 back attributes in the wrong order).
818 So dump some info for debugging, and abort.
819 */
820 ndbout_c("this=%p: attrId: %d currRecAttr: %p theCurrentRecAttr: %p "
821 "attrSize: %d %d", this,
822 attrId, currRecAttr, theCurrentRecAttr, attrSize,
823 currRecAttr ? currRecAttr->get_size_in_bytes() : 0);
824 currRecAttr = theCurrentRecAttr;
825 while(currRecAttr != 0){
826 ndbout_c("%d ", currRecAttr->attrId());
827 currRecAttr = currRecAttr->next();
828 }
829 abort();
830 return -1;
831 } // if (currRecAttr...)
832 } // /if (ndbrecord_part_done)
833 } // / while (aLength > 0)
834
835 theCurrentRecAttr = currRecAttr;
836
837 m_received_result_length = tmp;
838
839 if (m_using_ndb_record) {
840 /* Move onto next row in scan buffer */
841 m_record.m_row_recv+= m_record.m_row_offset;
842 }
843 return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
844 }
845
846 int
execKEYINFO20(Uint32 info,const Uint32 * aDataPtr,Uint32 aLength)847 NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
848 {
849 if (m_using_ndb_record)
850 {
851 /* Copy in the keyinfo after the user row and any range_no value. */
852
853 char *keyinfo_ptr= m_record.m_row_buffer +
854 m_current_row++ * m_record.m_row_offset +
855 m_record.m_ndb_record->m_row_size;
856 if (m_record.m_read_range_no)
857 keyinfo_ptr+= 4;
858
859 int4store(keyinfo_ptr, info);
860 keyinfo_ptr+= 4;
861 int4store(keyinfo_ptr, aLength);
862 keyinfo_ptr+= 4;
863 memcpy(keyinfo_ptr, aDataPtr, 4*aLength);
864
865 Uint32 tmp= m_received_result_length + aLength;
866 m_received_result_length = tmp;
867
868 return (tmp == m_expected_result_length ? 1 : 0);
869 }
870
871 /* The old method, using NdbRecAttr. */
872 NdbRecAttr* currRecAttr = m_rows[m_current_row++];
873 assert(currRecAttr->attrId() == KEY_ATTR_ID);
874 /*
875 This is actually reading data one word off the end of the received
876 signal (or off the end of the long signal data section 0, for a
877 long signal), due to the aLength+1. This is to ensure the correct length
878 being set for the NdbRecAttr (one extra word for the scanInfo word placed
879 at the end), overwritten immediately below.
880 But it's a bit ugly that we rely on being able to read one word over the
881 end of the signal without crashing...
882 */
883 currRecAttr->receive_data(aDataPtr, 4*(aLength + 1));
884
885 /**
886 * Save scanInfo in the end of keyinfo
887 */
888 ((Uint32*)currRecAttr->aRef())[aLength] = info;
889
890 Uint32 tmp = m_received_result_length + aLength;
891 m_received_result_length = tmp;
892
893 return (tmp == m_expected_result_length ? 1 : 0);
894 }
895
896 void
setErrorCode(int code)897 NdbReceiver::setErrorCode(int code)
898 {
899 theMagicNumber = 0;
900 if (getType()==NDB_QUERY_OPERATION)
901 {
902 NdbQueryOperationImpl* op = (NdbQueryOperationImpl*)getOwner();
903 op->getQuery().setErrorCode(code);
904 }
905 else
906 {
907 NdbOperation* const op = (NdbOperation*)getOwner();
908 assert(op->checkMagicNumber()==0);
909 op->setErrorCode(code);
910 }
911 }
912