1 /*
2 Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "API.hpp"
26 #include <AttributeHeader.hpp>
27 #include <signaldata/TcKeyConf.hpp>
28 #include <signaldata/DictTabInfo.hpp>
29
30
31 /**
32 * 'class NdbReceiveBuffer' takes care of buffering multi-row
33 * result sets received as the result of scan- or query- operations.
34 * Rows are stored in the 'raw' transporter format in the buffer,
35 * and later (only) the 'current row' is retrieved from the buffer,
36 * and unpacked into the full NdbRecord row format when navigated to.
37 */
38 class NdbReceiverBuffer
39 {
40 public:
41
42 /**
43 * End of row and key buffer area has an 'eodMagic'
44 * as a debugging aid in detecting buffer overflows.
45 */
46 static const Uint32 eodMagic = 0xacbd1234;
47
48 explicit NdbReceiverBuffer(Uint32 bufSizeBytes,
49 Uint32 batchRows);
50
reset()51 void reset()
52 { m_rows = m_keys = 0; }
53
getBufSizeWords() const54 Uint32 getBufSizeWords() const
55 { return m_bufSizeWords; }
56
getMaxRows() const57 Uint32 getMaxRows() const
58 { return m_maxRows; }
59
getRowCount() const60 Uint32 getRowCount() const
61 { return m_rows; }
62
getKeyCount() const63 Uint32 getKeyCount() const
64 { return m_keys; }
65
66 /**
67 * Rows are buffered in the first part of 'm_buffer'. The first
68 * 'm_maxrows+1' Uint32s in the buffer is a row_ix[] containing
69 * buffer indexes, such that:
70 * - Row 'n' starts at m_buffer[row_ix[n]].
71 * - Length of row 'n' is 'row_ix[n+1] - row_ix[n].
72 * row_ix[] contains one more item than 'm_maxrows'. The item
73 * past the last row is maintained such that the length of last row
74 * can be calculated.
75 */
allocRow(Uint32 noOfWords)76 Uint32 *allocRow(Uint32 noOfWords)
77 {
78 assert(verifyBuffer());
79 const Uint32 pos = rowIx(m_rows); //First free
80 rowIx(++m_rows) = pos + noOfWords; //Next free
81 #ifndef NDEBUG
82 m_buffer[pos + noOfWords] = eodMagic;
83 assert(verifyBuffer());
84 #endif
85 return &m_buffer[pos];
86 }
87
88 /**
89 * Keys are allocated from the end of 'm_buffer', it grows and are
90 * indexed in *reverse order*. key_ix[] is allocated at the very end of
91 * the buffer:
92 * - Key 'n' starts at m_buffer[key_ix[n]]
93 * - Length of key 'n' is 'key_ix[n-1] - key_ix[n]'.
94 */
allocKey(Uint32 noOfWords)95 Uint32 *allocKey(Uint32 noOfWords)
96 {
97 assert(verifyBuffer());
98 const Uint32 prev = keyIx(m_keys-1);
99 const Uint32 pos = prev - noOfWords;
100 keyIx(m_keys) = pos;
101 m_keys++;
102 #ifndef NDEBUG
103 m_buffer[pos-1] = eodMagic;
104 assert(verifyBuffer());
105 #endif
106 return &m_buffer[pos];
107 }
108
getRow(Uint32 row,Uint32 & noOfWords) const109 const Uint32 *getRow(Uint32 row, Uint32& noOfWords) const
110 {
111 assert(verifyBuffer());
112 if (unlikely(row >= m_rows))
113 return NULL;
114
115 const Uint32 ix = rowIx(row);
116 noOfWords = rowIx(row+1) - ix;
117 assert(noOfWords < m_bufSizeWords); // Sanity check
118 return m_buffer + ix;
119 }
120
getKey(Uint32 key,Uint32 & noOfWords) const121 const Uint32 *getKey(Uint32 key, Uint32& noOfWords) const
122 {
123 assert(verifyBuffer());
124 if (unlikely(key >= m_keys))
125 return NULL;
126
127 const Uint32 ix = keyIx(key);
128 noOfWords = keyIx(key-1) - ix;
129 assert(noOfWords < m_bufSizeWords); // Sanity check
130 return m_buffer + ix;
131 }
132
133 /**
134 * We know 'bufSizeWords', the total max size of data to store
135 * in the buffer. In addition there are some overhead required by
136 * the buffer management itself. Calculate total words required to
137 * be allocated for the NdbReceiverBuffer structure.
138 */
calculateSizeInWords(Uint32 bufSizeWords,Uint32 batchRows,Uint32 keySize)139 static Uint32 calculateSizeInWords(Uint32 bufSizeWords,
140 Uint32 batchRows,
141 Uint32 keySize)
142 {
143 return bufSizeWords + // Words to store
144 1 + // 'eodMagic' in buffer
145 headerWords + // Admin overhead
146 ((keySize > 0) // Row + optional key indexes
147 ? (batchRows+1) * 2 // Row + key indexes
148 : (batchRows+1)); // Row index only
149 }
150
151 private:
152 static const Uint32 headerWords= 4; //4*Uint32's below
153
154 // No copying / assignment:
155 NdbReceiverBuffer(const NdbReceiverBuffer&);
156 NdbReceiverBuffer& operator=(const NdbReceiverBuffer&);
157
158 const Uint32 m_maxRows; // Max capacity in #rows / #keys
159 const Uint32 m_bufSizeWords; // Size of 'm_buffer'
160
161 Uint32 m_rows; // Current #rows in m_buffer
162 Uint32 m_keys; // Current #keys in m_buffer
163
164 Uint32 m_buffer[1]; // Variable size buffer area (m_bufSizeWords)
165
166 /**
167 * Index to row offset is first 'maxrows' items in m_buffer.
168 * We maintain a 'next free row' position for all
169 * 'm_maxrows' in the buffer. Thus, this index array has
170 * to contain 'm_maxrows+1' items, indexed from [0..m_maxrows].
171 * This allows is to calculate data length for all rows
172 * as 'start(row+1) - start(row))
173 */
rowIx(Uint32 row) const174 Uint32 rowIx(Uint32 row) const { return m_buffer[row]; }
rowIx(Uint32 row)175 Uint32& rowIx(Uint32 row) { return m_buffer[row]; }
176
177 /**
178 * Index to key offset is last 'maxrows' items in m_buffer.
179 * We maintain a 'previous row start' position for all
180 * 'm_maxrows' in the buffer - even for 'key 0'.
181 * Thus, this index array has to contain 'm_maxrows+1'
182 * items, indexed from [-1..maxrows-1].
183 * This allows is to calculate data length for all keys
184 * as 'start(key-1) - start(key)).
185 */
186
187 // 'm_bufSizeWords-2' is keyIx(0), that place keyIx(-1) at 'm_bufSizeWords-1'
keyIx(Uint32 key) const188 Uint32 keyIx(Uint32 key) const { return m_buffer[m_bufSizeWords-2-key]; }
keyIx(Uint32 key)189 Uint32& keyIx(Uint32 key) { return m_buffer[m_bufSizeWords-2-key]; }
190
verifyBuffer() const191 bool verifyBuffer() const
192 {
193 assert(m_rows <= m_maxRows);
194 assert(m_keys <= m_maxRows);
195 // Check rows startpos and end within buffer
196 assert(rowIx(0) == m_maxRows+1);
197 assert(rowIx(m_rows) <= m_bufSizeWords);
198
199 // Rest of rows in sequence with non-negative length
200 for (Uint32 row=0; row<m_rows; row++)
201 {
202 assert(rowIx(row) <= rowIx(row+1));
203 }
204 // Overflow protection
205 assert(m_rows == 0 ||
206 m_buffer[rowIx(m_rows)] == eodMagic);
207
208 if (m_keys > 0)
209 {
210 // Check keys startpos and end before row buffer
211 assert(keyIx(-1) == (m_bufSizeWords - (m_maxRows+1)));
212 assert(keyIx(m_keys-1) >= rowIx(m_rows));
213
214 // Rest of keys in sequence with non-negative length
215 for (Uint32 key=0; key<m_keys; key++)
216 {
217 assert(keyIx(key) <= keyIx(key-1));
218 }
219
220 // Overflow protection
221 assert(m_buffer[keyIx(m_keys-1)-1] == eodMagic);
222 }
223 return true;
224 }
225
226 }; //class NdbReceiverBuffer
227
NdbReceiverBuffer(Uint32 bufSizeBytes,Uint32 batchRows)228 NdbReceiverBuffer::NdbReceiverBuffer(
229 Uint32 bufSizeBytes, // Word aligned size
230 Uint32 batchRows)
231 : m_maxRows(batchRows),
232 m_bufSizeWords((bufSizeBytes/sizeof(Uint32)) - headerWords),
233 m_rows(0),
234 m_keys(0)
235 {
236 assert((bufSizeBytes/sizeof(Uint32)) > headerWords);
237
238 /**
239 * Init row and key index arrays. Row indexes maintain
240 * a 'next free row' position which for rows start imm.
241 * after the 'm_maxrows+1' indexes.
242 */
243 rowIx(0) = m_maxRows+1;
244
245 /**
246 * Key indexes maintain a 'prev key startpos', even for key(0).
247 * Thus, for an empty key_ix[], we set startpos for
248 * (the non-existing) key(-1) which is imm. after the
249 * available key buffer area.
250 *
251 * NOTE: We init key_ix[] even if keyinfo not present
252 * in result set. that case it might later be overwritten
253 * by rows, which is ok as the keyinfo is then never used.
254 */
255 keyIx(-1) = m_bufSizeWords - (m_maxRows+1);
256 assert(verifyBuffer());
257 }
258
259
260 /**
261 * 'BEFORE' is used as the initial position before having a
262 * valid 'current' row. Beware, wraparound is assumed such
263 * that ' beforeFirstRow+1' -> 0 (first row)
264 */
265 static const Uint32 beforeFirstRow = 0xFFFFFFFF;
266
267 static
268 const Uint8*
269 pad(const Uint8* src, Uint32 align, Uint32 bitPos);
270
NdbReceiver(Ndb * aNdb)271 NdbReceiver::NdbReceiver(Ndb *aNdb) :
272 theMagicNumber(0),
273 m_ndb(aNdb),
274 m_id(NdbObjectIdMap::InvalidId),
275 m_tcPtrI(RNIL),
276 m_type(NDB_UNINITIALIZED),
277 m_owner(NULL),
278 m_ndb_record(NULL),
279 m_row_buffer(NULL),
280 m_recv_buffer(NULL),
281 m_read_range_no(false),
282 m_read_key_info(false),
283 m_firstRecAttr(NULL),
284 m_lastRecAttr(NULL),
285 m_rec_attr_data(NULL),
286 m_rec_attr_len(0),
287 m_current_row(beforeFirstRow),
288 m_expected_result_length(0),
289 m_received_result_length(0)
290 {}
291
~NdbReceiver()292 NdbReceiver::~NdbReceiver()
293 {
294 DBUG_ENTER("NdbReceiver::~NdbReceiver");
295 if (m_id != NdbObjectIdMap::InvalidId) {
296 m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
297 }
298 DBUG_VOID_RETURN;
299 }
300
301 //static
302 NdbReceiverBuffer*
initReceiveBuffer(Uint32 * buffer,Uint32 bufSizeBytes,Uint32 batchRows)303 NdbReceiver::initReceiveBuffer(Uint32 *buffer, // Uint32 aligned buffer
304 Uint32 bufSizeBytes, // Size, from ::result_bufsize()
305 Uint32 batchRows)
306 {
307 assert(((UintPtr)buffer % sizeof(Uint32)) == 0); //Is Uint32 aligned
308
309 return new(buffer) NdbReceiverBuffer(bufSizeBytes, batchRows);
310 }
311
312 int
init(ReceiverType type,void * owner)313 NdbReceiver::init(ReceiverType type, void* owner)
314 {
315 theMagicNumber = getMagicNumber();
316 m_type = type;
317 m_owner = owner;
318 m_ndb_record= NULL;
319 m_row_buffer= NULL;
320 m_recv_buffer= NULL;
321 m_read_range_no= false;
322 m_read_key_info= false;
323 m_firstRecAttr = NULL;
324 m_lastRecAttr = NULL;
325 m_rec_attr_data = NULL;
326 m_rec_attr_len = 0;
327
328 if (m_id == NdbObjectIdMap::InvalidId)
329 {
330 if (m_ndb)
331 {
332 m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
333 if (m_id == NdbObjectIdMap::InvalidId)
334 {
335 setErrorCode(4000);
336 return -1;
337 }
338 }
339 }
340 return 0;
341 }
342
343 void
do_setup_ndbrecord(const NdbRecord * ndb_record,char * row_buffer,bool read_range_no,bool read_key_info)344 NdbReceiver::do_setup_ndbrecord(const NdbRecord *ndb_record,
345 char *row_buffer,
346 bool read_range_no, bool read_key_info)
347 {
348 m_ndb_record= ndb_record;
349 m_row_buffer= row_buffer;
350 m_recv_buffer= NULL;
351 m_read_range_no= read_range_no;
352 m_read_key_info= read_key_info;
353 }
354
355 void
release()356 NdbReceiver::release()
357 {
358 theMagicNumber = 0;
359 NdbRecAttr* tRecAttr = m_firstRecAttr;
360 while (tRecAttr != NULL)
361 {
362 NdbRecAttr* tSaveRecAttr = tRecAttr;
363 tRecAttr = tRecAttr->next();
364 m_ndb->releaseRecAttr(tSaveRecAttr);
365 }
366 m_firstRecAttr = NULL;
367 m_lastRecAttr = NULL;
368 m_rec_attr_data = NULL;
369 m_rec_attr_len = 0;
370 m_ndb_record= NULL;
371 m_row_buffer= NULL;
372 m_recv_buffer= NULL;
373 }
374
375 NdbRecAttr *
getValue(const NdbColumnImpl * tAttrInfo,char * user_dst_ptr)376 NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr)
377 {
378 NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
379 if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
380 if (m_firstRecAttr == NULL)
381 m_firstRecAttr = tRecAttr;
382 else
383 m_lastRecAttr->next(tRecAttr);
384 m_lastRecAttr = tRecAttr;
385 tRecAttr->next(NULL);
386 return tRecAttr;
387 }
388 if(tRecAttr){
389 m_ndb->releaseRecAttr(tRecAttr);
390 }
391 return 0;
392 }
393
394 void
getValues(const NdbRecord * rec,char * row_ptr)395 NdbReceiver::getValues(const NdbRecord* rec, char *row_ptr)
396 {
397 assert(m_recv_buffer == NULL);
398 assert(rec != NULL);
399
400 m_ndb_record= rec;
401 m_row_buffer= row_ptr;
402 }
403
404 void
prepareSend()405 NdbReceiver::prepareSend()
406 {
407 /* Set pointers etc. to prepare for receiving the first row of the batch. */
408 theMagicNumber = 0x11223344;
409 m_current_row = beforeFirstRow;
410 m_received_result_length = 0;
411 m_expected_result_length = 0;
412
413 if (m_recv_buffer != NULL)
414 {
415 m_recv_buffer->reset();
416 }
417 }
418
419 void
prepareReceive(NdbReceiverBuffer * buffer)420 NdbReceiver::prepareReceive(NdbReceiverBuffer *buffer)
421 {
422 m_recv_buffer= buffer;
423 prepareSend();
424 }
425
426 /*
427 Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
428 use, taking into account limits in the transporter, user preference, etc.
429
430 It is the responsibility of the batch producer (LQH+TUP) to
431 stay within these 'batch_size' and 'batch_byte_size' limits.:
432
433 - It should stay strictly within the 'batch_size' (#rows) limit.
434 - It is allowed to overallocate the 'batch_byte_size' (slightly)
435 in order to complete the current row when it hit the limit.
436 (Upto ::packed_rowsize())
437
438 The client should be prepared to receive, and buffer, upto
439 'batch_size' rows from each fragment.
440 ::ndbrecord_bufsize() might be usefull for calculating the
441 buffersize to allocate for this resultset.
442 */
443 //static
444 void
calculate_batch_size(const NdbImpl & theImpl,Uint32 parallelism,Uint32 & batch_size,Uint32 & batch_byte_size)445 NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
446 Uint32 parallelism,
447 Uint32& batch_size,
448 Uint32& batch_byte_size)
449 {
450 const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
451 const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
452 const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
453 const Uint32 max_batch_size= cfg.m_batch_size;
454
455 batch_byte_size= max_batch_byte_size;
456 if (batch_byte_size * parallelism > max_scan_batch_size) {
457 batch_byte_size= max_scan_batch_size / parallelism;
458 }
459
460 if (batch_size == 0 || batch_size > max_batch_size) {
461 batch_size= max_batch_size;
462 }
463 if (unlikely(batch_size > MAX_PARALLEL_OP_PER_SCAN)) {
464 batch_size= MAX_PARALLEL_OP_PER_SCAN;
465 }
466 if (unlikely(batch_size > batch_byte_size)) {
467 batch_size= batch_byte_size;
468 }
469
470 return;
471 }
472
473 void
calculate_batch_size(Uint32 parallelism,Uint32 & batch_size,Uint32 & batch_byte_size) const474 NdbReceiver::calculate_batch_size(Uint32 parallelism,
475 Uint32& batch_size,
476 Uint32& batch_byte_size) const
477 {
478 calculate_batch_size(* m_ndb->theImpl,
479 parallelism,
480 batch_size,
481 batch_byte_size);
482 }
483
484 //static
485 Uint32
ndbrecord_rowsize(const NdbRecord * result_record,bool read_range_no)486 NdbReceiver::ndbrecord_rowsize(const NdbRecord *result_record,
487 bool read_range_no)
488 {
489 // Unpacked NdbRecords are stored in its full unprojected form
490 Uint32 rowsize= (result_record)
491 ? result_record->m_row_size
492 : 0;
493
494 // After unpack, the optional RANGE_NO is stored as an Uint32
495 if (read_range_no)
496 rowsize+= sizeof(Uint32);
497
498 return (rowsize+3) & 0xfffffffc;
499 }
500
501 /**
502 * Calculate max size (In Uint32 words) of a 'packed' result row,
503 * including optional 'keyinfo' and 'range_no'.
504 */
505 static
packed_rowsize(const NdbRecord * result_record,const Uint32 * read_mask,const NdbRecAttr * first_rec_attr,Uint32 keySizeWords,bool read_range_no)506 Uint32 packed_rowsize(const NdbRecord *result_record,
507 const Uint32* read_mask,
508 const NdbRecAttr *first_rec_attr,
509 Uint32 keySizeWords,
510 bool read_range_no)
511 {
512 Uint32 nullCount = 0;
513 Uint32 bitPos = 0;
514 const Uint8 *pos = NULL;
515
516 if (likely(result_record != NULL))
517 {
518 for (Uint32 i= 0; i<result_record->noOfColumns; i++)
519 {
520 const NdbRecord::Attr *col= &result_record->columns[i];
521 const Uint32 attrId= col->attrId;
522
523 /* Skip column if result_mask says so and we don't need
524 * to read it
525 */
526 if (BitmaskImpl::get(MAXNROFATTRIBUTESINWORDS, read_mask, attrId))
527 {
528 const Uint32 align = col->orgAttrSize;
529
530 switch(align){
531 case DictTabInfo::aBit:
532 pos = pad(pos, 0, 0);
533 bitPos += col->bitCount;
534 pos += 4 * (bitPos / 32);
535 bitPos = (bitPos % 32);
536 break;
537 default:
538 pos = pad(pos, align, bitPos);
539 bitPos = 0;
540 pos += col->maxSize;
541 break;
542 }
543
544 if (col->flags & NdbRecord::IsNullable)
545 nullCount++;
546 }
547 }
548 }
549 Uint32 sizeInWords = (Uint32)(((Uint32*)pad(pos, 0, bitPos) - (Uint32*)NULL));
550
551 // Add AttributeHeader::READ_PACKED or ::READ_ALL (Uint32) and
552 // variable size bitmask the 'packed' columns and their null bits.
553 if (sizeInWords > 0)
554 {
555 const Uint32 attrCount= result_record->columns[result_record->noOfColumns -1].attrId+1;
556 const Uint32 sigBitmaskWords= ((attrCount+nullCount+31)>>5);
557 sizeInWords += (1+sigBitmaskWords); //AttrHeader + bitMask
558 }
559
560 // The optional RANGE_NO is transfered and stored in buffer
561 // as AttributeHeader::RANGE_NO + an Uint32 'range_no'
562 if (read_range_no)
563 {
564 sizeInWords += 2;
565 }
566
567 // KeyInfo is transfered in a seperate signal,
568 // and is stored in the packed buffer together with 'info' word
569 if (keySizeWords > 0)
570 {
571 sizeInWords+= keySizeWords+1;
572 }
573
574 /* Add extra needed to transfer RecAttrs requested by getValue() */
575 const NdbRecAttr *ra= first_rec_attr;
576 while (ra != NULL)
577 {
578 // AttrHeader + max column size. Aligned to word boundary
579 sizeInWords+= 1 + ((ra->getColumn()->getSizeInBytes() + 3) / 4);
580 ra= ra->next();
581 }
582
583 return sizeInWords;
584 }
585
586 /**
587 * Calculate max size (In Uint32 words) of a buffer containing
588 * 'batch_rows' of packed result rows. Size also include
589 * overhead required by the NdbReceiverBuffer itself.
590 */
591 static
ndbrecord_bufsize(Uint32 batchRows,Uint32 batchBytes,Uint32 fragments,Uint32 rowSizeWords,Uint32 keySizeWords)592 Uint32 ndbrecord_bufsize(Uint32 batchRows,
593 Uint32 batchBytes, //Optional upper limit of batch size
594 Uint32 fragments, //Frags handled by this receiver (>= 1)
595 Uint32 rowSizeWords, //Packed size, from ::packed_rowsize()
596 Uint32 keySizeWords) //In words
597 {
598 /**
599 * Size of batch is either limited by max 'batchRows' fetched,
600 * or when the total 'batchBytes' limit is reached. In the
601 * later case we are allowed to over-allocate by allowing
602 * each fragment delivering to this NdbReceiver to complete
603 * the current row / key. If KeyInfo is requested, an additional
604 *'info' word is also added for each key which comes in addition
605 * to the 'batch_byte' limit
606 */
607 assert(batchRows > 0);
608 Uint32 batchSizeWords = batchRows * rowSizeWords;
609
610 // Result set size may be limited by 'batch_words' if specified.
611 if (batchBytes > 0)
612 {
613 const Uint32 batchWords = (batchBytes+3) / sizeof(Uint32);
614 // Batch size + over alloc last row + info word if keyinfo20
615 const Uint32 batchSizeLimit = batchWords
616 + (rowSizeWords * fragments)
617 + ((keySizeWords > 0) ? batchRows : 0);
618
619 if (batchSizeWords > batchSizeLimit)
620 batchSizeWords = batchSizeLimit;
621 }
622
623 return NdbReceiverBuffer::calculateSizeInWords(batchSizeWords, batchRows, keySizeWords);
624 }
625
626 //static
627 Uint32
result_bufsize(Uint32 batchRows,Uint32 batchBytes,Uint32 fragments,const NdbRecord * result_record,const Uint32 * read_mask,const NdbRecAttr * first_rec_attr,Uint32 keySizeWords,bool read_range_no)628 NdbReceiver::result_bufsize(Uint32 batchRows,
629 Uint32 batchBytes,
630 Uint32 fragments,
631 const NdbRecord *result_record,
632 const Uint32* read_mask,
633 const NdbRecAttr *first_rec_attr,
634 Uint32 keySizeWords,
635 bool read_range_no)
636 {
637 const Uint32 rowSizeWords= packed_rowsize(
638 result_record,
639 read_mask,
640 first_rec_attr,
641 keySizeWords,
642 read_range_no);
643
644 const Uint32 bufSizeWords= ndbrecord_bufsize(
645 batchRows,
646 batchBytes,
647 fragments,
648 rowSizeWords,
649 keySizeWords);
650
651 // bufsize is in word, return as bytes
652 return bufSizeWords * sizeof(Uint32);
653 }
654
655 /**
656 * pad
657 * This function determines how much 'padding' should be applied
658 * to the passed in pointer and bitPos to get to the start of a
659 * field with the passed in alignment.
660 * The rules are :
661 * - First bit field is 32-bit aligned
662 * - Subsequent bit fields are packed in the next available bits
663 * - 8 and 16 bit aligned fields are packed in the next available
664 * word (but not necessarily word aligned.
665 * - 32, 64 and 128 bit aligned fields are packed in the next
666 * aligned 32-bit word.
667 * This algorithm is used to unpack a stream of fields packed by the code
668 * in src/kernel/blocks/dbtup/DbtupRoutines::read_packed()
669 */
670 static
671 inline
672 const Uint8*
pad(const Uint8 * src,Uint32 align,Uint32 bitPos)673 pad(const Uint8* src, Uint32 align, Uint32 bitPos)
674 {
675 UintPtr ptr = UintPtr(src);
676 switch(align){
677 case DictTabInfo::aBit:
678 case DictTabInfo::a32Bit:
679 case DictTabInfo::a64Bit:
680 case DictTabInfo::a128Bit:
681 return (Uint8*)(((ptr + 3) & ~(UintPtr)3) + 4 * ((bitPos + 31) >> 5));
682
683 default:
684 #ifdef VM_TRACE
685 abort();
686 #endif
687 //Fall through:
688
689 case DictTabInfo::an8Bit:
690 case DictTabInfo::a16Bit:
691 return src + 4 * ((bitPos + 31) >> 5);
692 }
693 }
694
695 /**
696 * handle_packed_bit
697 * This function copies the bitfield of length len, offset pos from
698 * word-aligned ptr _src to memory starting at the byte ptr dst.
699 */
700 static
701 void
handle_packed_bit(const char * _src,Uint32 pos,Uint32 len,char * _dst)702 handle_packed_bit(const char* _src, Uint32 pos, Uint32 len, char* _dst)
703 {
704 Uint32 * src = (Uint32*)_src;
705 assert((UintPtr(src) & 3) == 0);
706
707 /* Convert char* to aligned Uint32* and some byte offset */
708 UintPtr uiPtr= UintPtr((Uint32*)_dst);
709 Uint32 dstByteOffset= Uint32(uiPtr) & 3;
710 Uint32* dst= (Uint32*) (uiPtr - dstByteOffset);
711
712 BitmaskImpl::copyField(dst, dstByteOffset << 3,
713 src, pos, len);
714 }
715
716
717 /**
718 * unpackRecAttr
719 * Unpack a packed stream of field values, whose presence and nullness
720 * is indicated by a leading bitmap into a list of NdbRecAttr objects
721 * Return the number of words read from the input stream.
722 */
723 //static
724 Uint32
unpackRecAttr(NdbRecAttr ** recAttr,Uint32 bmlen,const Uint32 * aDataPtr,Uint32 aLength)725 NdbReceiver::unpackRecAttr(NdbRecAttr** recAttr,
726 Uint32 bmlen,
727 const Uint32* aDataPtr,
728 Uint32 aLength)
729 {
730 NdbRecAttr* currRecAttr = *recAttr;
731 const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
732 Uint32 bitPos = 0;
733 for (Uint32 i = 0, attrId = 0; i<32*bmlen; i++, attrId++)
734 {
735 if (BitmaskImpl::get(bmlen, aDataPtr, i))
736 {
737 const NdbColumnImpl & col =
738 NdbColumnImpl::getImpl(* currRecAttr->getColumn());
739 if (unlikely(attrId != (Uint32)col.m_attrId))
740 goto err;
741 if (col.m_nullable)
742 {
743 if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
744 {
745 currRecAttr->setNULL();
746 currRecAttr = currRecAttr->next();
747 continue;
748 }
749 }
750 Uint32 align = col.m_orgAttrSize;
751 Uint32 attrSize = col.m_attrSize;
752 Uint32 array = col.m_arraySize;
753 Uint32 len = col.m_length;
754 Uint32 sz = attrSize * array;
755 Uint32 arrayType = col.m_arrayType;
756
757 switch(align){
758 case DictTabInfo::aBit: // Bit
759 src = pad(src, 0, 0);
760 handle_packed_bit((const char*)src, bitPos, len,
761 currRecAttr->aRef());
762 src += 4 * ((bitPos + len) >> 5);
763 bitPos = (bitPos + len) & 31;
764 goto next;
765 default:
766 src = pad(src, align, bitPos);
767 }
768 switch(arrayType){
769 case NDB_ARRAYTYPE_FIXED:
770 break;
771 case NDB_ARRAYTYPE_SHORT_VAR:
772 sz = 1 + src[0];
773 break;
774 case NDB_ARRAYTYPE_MEDIUM_VAR:
775 sz = 2 + src[0] + 256 * src[1];
776 break;
777 default:
778 goto err;
779 }
780
781 bitPos = 0;
782 currRecAttr->receive_data((Uint32*)src, sz);
783 src += sz;
784 next:
785 currRecAttr = currRecAttr->next();
786 }
787 }
788 * recAttr = currRecAttr;
789 return (Uint32)(((Uint32*)pad(src, 0, bitPos)) - aDataPtr);
790
791 err:
792 abort();
793 return 0;
794 }
795
796
797 /* Set NdbRecord field to NULL. */
setRecToNULL(const NdbRecord::Attr * col,char * row)798 static void setRecToNULL(const NdbRecord::Attr *col,
799 char *row)
800 {
801 assert(col->flags & NdbRecord::IsNullable);
802 row[col->nullbit_byte_offset]|= 1 << col->nullbit_bit_in_byte;
803 }
804
805 int
get_range_no() const806 NdbReceiver::get_range_no() const
807 {
808 Uint32 range_no;
809 assert(m_ndb_record != NULL);
810 assert(m_row_buffer != NULL);
811
812 if (unlikely(!m_read_range_no))
813 return -1;
814
815 memcpy(&range_no,
816 m_row_buffer + m_ndb_record->m_row_size,
817 sizeof(range_no));
818 return (int)range_no;
819 }
820
821 /**
822 * handle_bitfield_ndbrecord
823 * Packed bitfield handling for NdbRecord - also deals with
824 * mapping the bitfields into MySQLD format if necessary.
825 */
826 ATTRIBUTE_NOINLINE
827 static void
handle_bitfield_ndbrecord(const NdbRecord::Attr * col,const Uint8 * & src,Uint32 & bitPos,char * row)828 handle_bitfield_ndbrecord(const NdbRecord::Attr* col,
829 const Uint8*& src,
830 Uint32& bitPos,
831 char* row)
832 {
833 Uint32 len = col->bitCount;
834 if (col->flags & NdbRecord::IsNullable)
835 {
836 /* Clear nullbit in row */
837 row[col->nullbit_byte_offset] &=
838 ~(1 << col->nullbit_bit_in_byte);
839 }
840
841 char* dest;
842 Uint64 mysqldSpace;
843
844 /* For MySqldBitField, we read it as normal into a local on the
845 * stack and then use the put_mysqld_bitfield function to rearrange
846 * and write it to the row
847 */
848 bool isMDBitfield= (col->flags & NdbRecord::IsMysqldBitfield) != 0;
849
850 if (isMDBitfield)
851 {
852 assert(len <= 64);
853 dest= (char*) &mysqldSpace;
854 }
855 else
856 {
857 dest= row + col->offset;
858 }
859
860 /* Copy bitfield to memory starting at dest */
861 src = pad(src, 0, 0);
862 handle_packed_bit((const char*)src, bitPos, len, dest);
863 src += 4 * ((bitPos + len) >> 5);
864 bitPos = (bitPos + len) & 31;
865
866 if (isMDBitfield)
867 {
868 /* Rearrange bitfield from stack to row storage */
869 col->put_mysqld_bitfield(row, dest);
870 }
871 }
872
873
874 /**
875 * unpackNdbRecord
876 * Unpack a stream of field values, whose presence and nullness
877 * is indicated by a leading bitmap, into an NdbRecord row.
878 * Return the number of words consumed.
879 */
880 //static
881 Uint32
unpackNdbRecord(const NdbRecord * rec,Uint32 bmlen,const Uint32 * aDataPtr,char * row)882 NdbReceiver::unpackNdbRecord(const NdbRecord *rec,
883 Uint32 bmlen,
884 const Uint32* aDataPtr,
885 char* row)
886 {
887 /* Use bitmap to determine which columns have been sent */
888 /*
889 We save precious registers for the compiler by putting
890 three values in one i_attrId variable:
891 bit_index : Bit 0-15
892 attrId : Bit 16-31
893 maxAttrId : Bit 32-47
894
895 We use the same principle to store 3 variables in
896 bitPos_next_index variable:
897 next_index : Bit 0-15
898 bitPos : Bit 48-52
899 rpm_bmlen : Bit 32-47
900 0's : Bit 16-31
901
902 So we can also get bmSize by shifting 27 instead of 32 which
903 is equivalent to shift right 32 followed by shift left 5 when
904 one knows there are zeroes in the lower bits.
905
906 The compiler has to have quite a significant amount of live
907 variables in parallel here, so by the above handling we increase
908 the access time of these registers by 1-2 cycles, but this is
909 better than using the stack that has high chances of cache
910 misses.
911
912 This routine can easily be executed millions of times per
913 second in one CPU, it's called once for each record retrieved
914 from NDB data nodes in scans.
915 */
916 #define rpn_pack_attrId(bit_index, attrId, maxAttrId) \
917 Uint64((bit_index)) + \
918 (Uint64((attrId)) << 16) + \
919 (Uint64((maxAttrId)) << 32)
920 #define rpn_bit_index(index_attrId) ((index_attrId) & 0xFFFF)
921 #define rpn_attrId(index_attrId) (((index_attrId) >> 16) & 0xFFFF)
922 #define rpn_maxAttrId(index_attrId) ((index_attrId) >> 32)
923 #define rpn_inc_bit_index() Uint64(1)
924 #define rpn_inc_attrId() (Uint64(1) << 16)
925
926 #define rpn_pack_bitPos_next_index(bitPos, bmlen, next_index) \
927 Uint64((next_index) & 0xFFFF) + \
928 (Uint64((bmlen)) << 32) + \
929 (Uint64((bitPos)) << 48)
930 #define rpn_bmSize(bm_index) (((bm_index) >> 27) & 0xFFFF)
931 #define rpn_bmlen(bm_index) (((bm_index) >> 32) & 0xFFFF)
932 #define rpn_bitPos(bm_index) (((bm_index) >> 48) & 0x1F)
933 #define rpn_next_index(bm_index) ((bm_index) & 0xFFFF)
934 #define rpn_zero_bitPos(bm_index) \
935 { \
936 register Uint64 tmp_bitPos_next_index = bm_index; \
937 tmp_bitPos_next_index <<= 16; \
938 tmp_bitPos_next_index >>= 16; \
939 bm_index = tmp_bitPos_next_index; \
940 }
941 #define rpn_set_bitPos(bm_index, bitPos) \
942 { \
943 register Uint64 tmp_bitPos_next_index = bm_index; \
944 tmp_bitPos_next_index <<= 16; \
945 tmp_bitPos_next_index >>= 16; \
946 tmp_bitPos_next_index += (Uint64(bitPos) << 48); \
947 bm_index = tmp_bitPos_next_index; \
948 }
949 #define rpn_set_next_index(bm_index, val_next_index) \
950 { \
951 register Uint64 tmp_2_bitPos_next_index = Uint64(val_next_index); \
952 register Uint64 tmp_1_bitPos_next_index = bm_index; \
953 tmp_1_bitPos_next_index >>= 16; \
954 tmp_1_bitPos_next_index <<= 16; \
955 tmp_2_bitPos_next_index &= 0xFFFF; \
956 tmp_1_bitPos_next_index += tmp_2_bitPos_next_index; \
957 bm_index = tmp_1_bitPos_next_index; \
958 }
959
960 /**
961 * Both these macros can be called with an overflow value
962 * in val_next_index, to protect against this we ensure it
963 * doesn't overflow its 16 bits of space and affect other
964 * variables.
965 */
966
967 assert(bmlen <= 0x07FF);
968 register const Uint8 *src = (Uint8*)(aDataPtr + bmlen);
969 Uint32 noOfCols = rec->noOfColumns;
970 const NdbRecord::Attr* max_col = &rec->columns[noOfCols - 1];
971
972 const Uint64 maxAttrId = max_col->attrId;
973 assert(maxAttrId <= 0xFFFF);
974
975 /**
976 * Initialise the 3 fields stored in bitPos_next_index
977 *
978 * bitPos set to 0
979 * next_index set to rec->m_attrId_indexes[0]
980 * bmlen initialised
981 * bmSize is always bmlen / 32
982 */
983 register Uint64 bitPos_next_index =
984 rpn_pack_bitPos_next_index(0, bmlen, rec->m_attrId_indexes[0]);
985
986 /**
987 * Initialise the 3 fields stored in i_attrId
988 *
989 * bit_index set to 0
990 * attrId set to 0
991 * maxAttrId initialised
992 */
993 for (register Uint64 i_attrId = rpn_pack_attrId(0, 0, maxAttrId) ;
994 (rpn_bit_index(i_attrId) < rpn_bmSize(bitPos_next_index)) &&
995 (rpn_attrId(i_attrId) <= rpn_maxAttrId(i_attrId));
996 i_attrId += (rpn_inc_attrId() + rpn_inc_bit_index()))
997 {
998 const NdbRecord::Attr* col = &rec->columns[rpn_next_index(bitPos_next_index)];
999 if (BitmaskImpl::get(rpn_bmlen(bitPos_next_index),
1000 aDataPtr,
1001 rpn_bit_index(i_attrId)))
1002 {
1003 /* Found bit in column presence bitmask, get corresponding
1004 * Attr struct from NdbRecord
1005 */
1006 Uint32 align = col->orgAttrSize;
1007
1008 assert(rpn_attrId(i_attrId) < rec->m_attrId_indexes_length);
1009 assert (rpn_next_index(bitPos_next_index) < rec->noOfColumns);
1010 assert((col->flags & NdbRecord::IsBlob) == 0);
1011
1012 /* If col is nullable, check for null and set bit */
1013 if (col->flags & NdbRecord::IsNullable)
1014 {
1015 i_attrId += rpn_inc_bit_index();
1016 if (BitmaskImpl::get(rpn_bmlen(bitPos_next_index),
1017 aDataPtr,
1018 rpn_bit_index(i_attrId)))
1019 {
1020 setRecToNULL(col, row);
1021 assert(rpn_bitPos(bitPos_next_index) < 32);
1022 rpn_set_next_index(bitPos_next_index,
1023 rec->m_attrId_indexes[rpn_attrId(i_attrId) + 1]);
1024 continue; /* Next column */
1025 }
1026 }
1027 if (likely(align != DictTabInfo::aBit))
1028 {
1029 src = pad(src, align, rpn_bitPos(bitPos_next_index));
1030 rpn_zero_bitPos(bitPos_next_index);
1031 }
1032 else
1033 {
1034 Uint32 bitPos = rpn_bitPos(bitPos_next_index);
1035 const Uint8 *loc_src = src;
1036 handle_bitfield_ndbrecord(col,
1037 loc_src,
1038 bitPos,
1039 row);
1040 rpn_set_bitPos(bitPos_next_index, bitPos);
1041 src = loc_src;
1042 assert(rpn_bitPos(bitPos_next_index) < 32);
1043 rpn_set_next_index(bitPos_next_index,
1044 rec->m_attrId_indexes[rpn_attrId(i_attrId) + 1]);
1045 continue; /* Next column */
1046 }
1047
1048 {
1049 /* Set NULLable attribute to "not NULL". */
1050 if (col->flags & NdbRecord::IsNullable)
1051 {
1052 row[col->nullbit_byte_offset]&= ~(1 << col->nullbit_bit_in_byte);
1053 }
1054
1055 do
1056 {
1057 Uint32 sz;
1058 char *col_row_ptr = &row[col->offset];
1059 Uint32 flags = col->flags &
1060 (NdbRecord::IsVar1ByteLen |
1061 NdbRecord::IsVar2ByteLen);
1062 if (!flags)
1063 {
1064 sz = col->maxSize;
1065 if (likely(sz == 4))
1066 {
1067 col_row_ptr[0] = src[0];
1068 col_row_ptr[1] = src[1];
1069 col_row_ptr[2] = src[2];
1070 col_row_ptr[3] = src[3];
1071 src += sz;
1072 break;
1073 }
1074 }
1075 else if (flags & NdbRecord::IsVar1ByteLen)
1076 {
1077 sz = 1 + src[0];
1078 }
1079 else
1080 {
1081 sz = 2 + src[0] + 256 * src[1];
1082 }
1083 const Uint8 *source = src;
1084 src += sz;
1085 memcpy(col_row_ptr, source, sz);
1086 } while (0);
1087 }
1088 }
1089 rpn_set_next_index(bitPos_next_index,
1090 rec->m_attrId_indexes[rpn_attrId(i_attrId) + 1]);
1091 }
1092 Uint32 len = (Uint32)(((Uint32*)pad(src,
1093 0,
1094 rpn_bitPos(bitPos_next_index))) -
1095 aDataPtr);
1096 return len;
1097 }
1098
1099 int
get_keyinfo20(Uint32 & scaninfo,Uint32 & length,const char * & data_ptr) const1100 NdbReceiver::get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
1101 const char * & data_ptr) const
1102 {
1103 if (unlikely(!m_read_key_info))
1104 return -1;
1105
1106 Uint32 len;
1107 const Uint32 *p = m_recv_buffer->getKey(m_current_row, len);
1108 if (unlikely(p == NULL))
1109 return -1;
1110
1111 scaninfo = *p;
1112 data_ptr = reinterpret_cast<const char*>(p+1);
1113 length = len-1;
1114 return 0;
1115 }
1116
1117 const char*
unpackBuffer(const NdbReceiverBuffer * buffer,Uint32 row)1118 NdbReceiver::unpackBuffer(const NdbReceiverBuffer *buffer, Uint32 row)
1119 {
1120 assert(buffer != NULL);
1121
1122 Uint32 aLength;
1123 const Uint32 *aDataPtr = buffer->getRow(row, aLength);
1124 if (likely(aDataPtr != NULL))
1125 {
1126 if (unpackRow(aDataPtr, aLength, m_row_buffer) == -1)
1127 return NULL;
1128
1129 return m_row_buffer;
1130 }
1131
1132 /* ReceiveBuffer may containt only keyinfo */
1133 const Uint32 *key = buffer->getKey(row, aLength);
1134 if (key != NULL)
1135 {
1136 assert(m_row_buffer != NULL);
1137 return m_row_buffer; // Row is empty, used as non-NULL return
1138 }
1139 return NULL;
1140 }
1141
1142 int
unpackRow(const Uint32 * aDataPtr,Uint32 aLength,char * row)1143 NdbReceiver::unpackRow(const Uint32* aDataPtr, Uint32 aLength, char* row)
1144 {
1145 /*
1146 * NdbRecord and NdbRecAttr row result handling are merged here
1147 * First any NdbRecord attributes are extracted
1148 * Then any NdbRecAttr attributes are extracted
1149 * Scenarios :
1150 * NdbRecord only PK read result
1151 * NdbRecAttr only PK read result
1152 * Mixed PK read results
1153 * NdbRecord only scan read result
1154 * NdbRecAttr only scan read result
1155 * Mixed scan read results
1156 */
1157
1158 /* If present, NdbRecord data will come first */
1159 if (m_ndb_record != NULL)
1160 {
1161 /* Read words from the incoming signal train.
1162 * The length passed in is enough for one row, either as an individual
1163 * read op, or part of a scan. When there are no more words, we're at
1164 * the end of the row
1165 */
1166 while (aLength > 0)
1167 {
1168 const AttributeHeader ah(* aDataPtr++);
1169 const Uint32 attrId= ah.getAttributeId();
1170 const Uint32 attrSize= ah.getByteSize();
1171 aLength--;
1172 assert(aLength >= (attrSize/sizeof(Uint32)));
1173
1174 /* Normal case for all NdbRecord primary key, index key, table scan
1175 * and index scan reads. Extract all requested columns from packed
1176 * format into the row.
1177 */
1178 if (likely(attrId == AttributeHeader::READ_PACKED))
1179 {
1180 assert(row != NULL);
1181 const Uint32 len= unpackNdbRecord(m_ndb_record,
1182 attrSize >> 2, // Bitmap length
1183 aDataPtr,
1184 row);
1185 assert(aLength >= len);
1186 aDataPtr+= len;
1187 aLength-= len;
1188 }
1189
1190 /* Special case for RANGE_NO, which is received first and is
1191 * stored just after the row. */
1192 else if (attrId == AttributeHeader::RANGE_NO)
1193 {
1194 assert(row != NULL);
1195 assert(m_read_range_no);
1196 assert(attrSize==sizeof(Uint32));
1197 memcpy(row+m_ndb_record->m_row_size, aDataPtr++, sizeof(Uint32));
1198 aLength--;
1199 }
1200
1201 else
1202 {
1203 /* If we get here then we must have 'extra getValues' - columns
1204 * requested outwith the normal NdbRecord + bitmask mechanism.
1205 * This could be : pseudo columns, columns read via an old-Api
1206 * scan, or just some extra columns added by the user to an
1207 * NdbRecord operation.
1208 */
1209 aDataPtr--; // Undo read of AttributeHeader
1210 aLength++;
1211 break;
1212 }
1213 } // while (aLength > 0)
1214 } // if (m_ndb_record != NULL)
1215
1216 /* Handle 'getValues', possible requested after NdbRecord columns. */
1217 if (aLength > 0)
1218 {
1219 /**
1220 * If we get here then there are some attribute values to be
1221 * read into the attached list of NdbRecAttrs.
1222 * This occurs for old-Api primary and unique index keyed operations
1223 * and for NdbRecord primary and unique index keyed operations
1224 * using 'extra GetValues'.
1225 *
1226 * If the values are part of a scan then we save
1227 * the starting point of these RecAttr values.
1228 * When the user calls NdbScanOperation.nextResult(), they will
1229 * be copied into the correct NdbRecAttr objects by calling
1230 * NdbRecord::get_AttrValues.
1231 * If the extra values are not part of a scan, then they are
1232 * put into their NdbRecAttr objects now.
1233 */
1234 const bool isScan= (m_type == NDB_SCANRECEIVER) ||
1235 (m_type == NDB_QUERY_OPERATION);
1236
1237 if (isScan)
1238 {
1239 /* Save position for RecAttr values for later retrieval. */
1240 m_rec_attr_data = aDataPtr;
1241 m_rec_attr_len = aLength;
1242 return 0;
1243 }
1244 else
1245 {
1246 /* Put values into RecAttr now */
1247 const int ret = handle_rec_attrs(m_firstRecAttr, aDataPtr, aLength);
1248 if (unlikely(ret != 0))
1249 return -1;
1250
1251 aDataPtr += aLength;
1252 aLength = 0;
1253 }
1254 } // if (aLength > 0)
1255
1256 m_rec_attr_data = NULL;
1257 m_rec_attr_len = 0;
1258 return 0;
1259 }
1260
1261 //static
1262 int
handle_rec_attrs(NdbRecAttr * rec_attr_list,const Uint32 * aDataPtr,Uint32 aLength)1263 NdbReceiver::handle_rec_attrs(NdbRecAttr* rec_attr_list,
1264 const Uint32* aDataPtr,
1265 Uint32 aLength)
1266 {
1267 NdbRecAttr* currRecAttr = rec_attr_list;
1268
1269 /* If we get here then there are some attribute values to be
1270 * read into the attached list of NdbRecAttrs.
1271 * This occurs for old-Api primary and unique index keyed operations
1272 * and for NdbRecord primary and unique index keyed operations
1273 * using 'extra GetValues'.
1274 */
1275 while (aLength > 0)
1276 {
1277 const AttributeHeader ah(* aDataPtr++);
1278 const Uint32 attrId= ah.getAttributeId();
1279 const Uint32 attrSize= ah.getByteSize();
1280 aLength--;
1281 assert(aLength >= (attrSize/sizeof(Uint32)));
1282
1283 {
1284 // We've processed the NdbRecord part of the TRANSID_AI, if
1285 // any. There are signal words left, so they must be
1286 // RecAttr data
1287 //
1288 if (attrId == AttributeHeader::READ_PACKED)
1289 {
1290 const Uint32 len = unpackRecAttr(&currRecAttr,
1291 attrSize>>2, aDataPtr, aLength);
1292 assert(aLength >= len);
1293 aDataPtr += len;
1294 aLength -= len;
1295 continue;
1296 }
1297
1298 if(currRecAttr &&
1299 currRecAttr->attrId() == attrId &&
1300 currRecAttr->receive_data(aDataPtr, attrSize))
1301 {
1302 Uint32 add= (attrSize + 3) >> 2;
1303 aLength -= add;
1304 aDataPtr += add;
1305 currRecAttr = currRecAttr->next();
1306 } else {
1307 /*
1308 This should not happen: we got back an attribute for which we have no
1309 stored NdbRecAttr recording that we requested said attribute (or we got
1310 back attributes in the wrong order).
1311 So dump some info for debugging, and abort.
1312 */
1313 ndbout_c("NdbReceiver::handle_rec_attrs: attrId: %d currRecAttr: %p rec_attr_list: %p "
1314 "attrSize: %d %d",
1315 attrId, currRecAttr, rec_attr_list, attrSize,
1316 currRecAttr ? currRecAttr->get_size_in_bytes() : 0);
1317 currRecAttr = rec_attr_list;
1318 while(currRecAttr != 0){
1319 ndbout_c("%d ", currRecAttr->attrId());
1320 currRecAttr = currRecAttr->next();
1321 }
1322 abort();
1323 return -1;
1324 } // if (currRecAttr...)
1325 }
1326 } // while (aLength > 0)
1327
1328 return 0;
1329 }
1330
1331 int
get_AttrValues(NdbRecAttr * rec_attr_list) const1332 NdbReceiver::get_AttrValues(NdbRecAttr* rec_attr_list) const
1333 {
1334 return handle_rec_attrs(rec_attr_list,
1335 m_rec_attr_data,
1336 m_rec_attr_len);
1337 }
1338
1339 int
execTRANSID_AI(const Uint32 * aDataPtr,Uint32 aLength)1340 NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
1341 {
1342 const Uint32 exp= m_expected_result_length;
1343 const Uint32 tmp= m_received_result_length + aLength;
1344
1345 /*
1346 * Store received data unprocessed into receive buffer
1347 * in its packed format.
1348 * It is unpacked into NdbRecord format when
1349 * we navigate to each row.
1350 */
1351 if (m_recv_buffer != NULL)
1352 {
1353 Uint32 *row_recv = m_recv_buffer->allocRow(aLength);
1354 memcpy(row_recv, aDataPtr, aLength*sizeof(Uint32));
1355 }
1356 else
1357 {
1358 if (unpackRow(aDataPtr, aLength, m_row_buffer) == -1)
1359 return -1;
1360 }
1361 m_received_result_length = tmp;
1362 return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
1363 }
1364
1365 int
execKEYINFO20(Uint32 info,const Uint32 * aDataPtr,Uint32 aLength)1366 NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
1367 {
1368 assert(m_read_key_info);
1369 assert(m_recv_buffer != NULL);
1370
1371 Uint32 *keyinfo_ptr = m_recv_buffer->allocKey(aLength+1);
1372
1373 // Copy in key 'info', followed by 'data'
1374 *keyinfo_ptr= info;
1375 memcpy(keyinfo_ptr+1, aDataPtr, 4*aLength);
1376
1377 const Uint32 tmp= m_received_result_length + aLength;
1378 m_received_result_length = tmp;
1379
1380 return (tmp == m_expected_result_length ? 1 : 0);
1381 }
1382
1383 const char*
getRow(const NdbReceiverBuffer * buffer,Uint32 row)1384 NdbReceiver::getRow(const NdbReceiverBuffer* buffer, Uint32 row)
1385 {
1386 return unpackBuffer(buffer, row);
1387 }
1388
1389 const char*
getNextRow()1390 NdbReceiver::getNextRow()
1391 {
1392 assert(m_recv_buffer != NULL);
1393 const Uint32 nextRow = m_current_row+1;
1394 const char *row = unpackBuffer(m_recv_buffer, nextRow);
1395 if (likely(row != NULL))
1396 {
1397 m_current_row = nextRow;
1398 }
1399 return row;
1400 }
1401
1402 int
execSCANOPCONF(Uint32 tcPtrI,Uint32 len,Uint32 rows)1403 NdbReceiver::execSCANOPCONF(Uint32 tcPtrI, Uint32 len, Uint32 rows)
1404 {
1405 assert(m_recv_buffer != NULL);
1406 assert(m_recv_buffer->getMaxRows() >= rows);
1407 assert(m_recv_buffer->getBufSizeWords() >= len);
1408
1409 m_tcPtrI = tcPtrI;
1410
1411 if (unlikely(len == 0))
1412 {
1413 /**
1414 * No TRANSID_AI will be received. (Likely an empty projection requested.)
1415 * To get row count correct, we simulate specified number of
1416 * empty TRANSID_AIs being received.
1417 */
1418 for (Uint32 row=0; row<rows; row++)
1419 {
1420 execTRANSID_AI(NULL,0);
1421 }
1422 }
1423
1424 const Uint32 tmp = m_received_result_length;
1425 m_expected_result_length = len;
1426 return (tmp == len ? 1 : 0);
1427 }
1428
1429 void
setErrorCode(int code)1430 NdbReceiver::setErrorCode(int code)
1431 {
1432 theMagicNumber = 0;
1433 if (getType()==NDB_QUERY_OPERATION)
1434 {
1435 NdbQueryOperationImpl* op = (NdbQueryOperationImpl*)getOwner();
1436 op->getQuery().setErrorCode(code);
1437 }
1438 else
1439 {
1440 NdbOperation* const op = (NdbOperation*)getOwner();
1441 assert(op->checkMagicNumber()==0);
1442 op->setErrorCode(code);
1443 }
1444 }
1445