1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include "Packer.hpp"
28 #include <TransporterRegistry.hpp>
29 #include <TransporterCallback.hpp>
30 #include <RefConvert.hpp>
31
32 #ifdef ERROR_INSERT
33 Uint32 MAX_RECEIVED_SIGNALS = 1024;
34 #else
35 #define MAX_RECEIVED_SIGNALS 1024
36 #endif
37
38 Uint32
unpack(Uint32 * readPtr,Uint32 sizeOfData,NodeId remoteNodeId,IOState state)39 TransporterRegistry::unpack(Uint32 * readPtr,
40 Uint32 sizeOfData,
41 NodeId remoteNodeId,
42 IOState state) {
43 SignalHeader signalHeader;
44 LinearSectionPtr ptr[3];
45
46 Uint32 usedData = 0;
47 Uint32 loop_count = 0;
48
49 if(state == NoHalt || state == HaltOutput){
50 while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
51 (loop_count < MAX_RECEIVED_SIGNALS)) {
52 Uint32 word1 = readPtr[0];
53 Uint32 word2 = readPtr[1];
54 Uint32 word3 = readPtr[2];
55 loop_count++;
56
57 #if 0
58 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
59 //Do funky stuff
60 }
61 #endif
62
63 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
64 const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
65
66 if(messageLenBytes == 0 || messageLenBytes > MAX_RECV_MESSAGE_BYTESIZE){
67 DEBUG("Message Size = " << messageLenBytes);
68 report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
69 return usedData;
70 }//if
71
72 if (sizeOfData < messageLenBytes) {
73 break;
74 }//if
75
76 if(Protocol6::getCheckSumIncluded(word1)){
77 const Uint32 tmpLen = messageLen32 - 1;
78 const Uint32 checkSumSent = readPtr[tmpLen];
79 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
80
81 if(checkSumComputed != checkSumSent){
82 report_error(remoteNodeId, TE_INVALID_CHECKSUM);
83 return usedData;
84 }//if
85 }//if
86
87 #if 0
88 if(Protocol6::getCompressed(word1)){
89 //Do funky stuff
90 }//if
91 #endif
92
93 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
94
95 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
96 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
97 signalHeader.theSendersBlockRef = sBlockNum;
98
99 Uint8 prio = Protocol6::getPrio(word1);
100
101 Uint32 * signalData = &readPtr[3];
102
103 if(Protocol6::getSignalIdIncluded(word1) == 0){
104 signalHeader.theSendersSignalId = ~0;
105 } else {
106 signalHeader.theSendersSignalId = * signalData;
107 signalData ++;
108 }//if
109 signalHeader.theSignalId= ~0;
110
111 Uint32 * sectionPtr = signalData + signalHeader.theLength;
112 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
113 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
114 Uint32 sz = * sectionPtr;
115 ptr[i].sz = sz;
116 ptr[i].p = sectionData;
117
118 sectionPtr ++;
119 sectionData += sz;
120 }
121
122 callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
123
124 readPtr += messageLen32;
125 sizeOfData -= messageLenBytes;
126 usedData += messageLenBytes;
127 }//while
128
129 return usedData;
130 } else {
131 /** state = HaltIO || state == HaltInput */
132
133 while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
134 (loop_count < MAX_RECEIVED_SIGNALS)) {
135 Uint32 word1 = readPtr[0];
136 Uint32 word2 = readPtr[1];
137 Uint32 word3 = readPtr[2];
138 loop_count++;
139
140 #if 0
141 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
142 //Do funky stuff
143 }//if
144 #endif
145
146 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
147 const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
148 if(messageLenBytes == 0 || messageLenBytes > MAX_RECV_MESSAGE_BYTESIZE){
149 DEBUG("Message Size = " << messageLenBytes);
150 report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
151 return usedData;
152 }//if
153
154 if (sizeOfData < messageLenBytes) {
155 break;
156 }//if
157
158 if(Protocol6::getCheckSumIncluded(word1)){
159 const Uint32 tmpLen = messageLen32 - 1;
160 const Uint32 checkSumSent = readPtr[tmpLen];
161 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
162
163 if(checkSumComputed != checkSumSent){
164
165 //theTransporters[remoteNodeId]->disconnect();
166 report_error(remoteNodeId, TE_INVALID_CHECKSUM);
167 return usedData;
168 }//if
169 }//if
170
171 #if 0
172 if(Protocol6::getCompressed(word1)){
173 //Do funky stuff
174 }//if
175 #endif
176
177 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
178
179 Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
180
181 if(rBlockNum == 252){
182 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
183 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
184 signalHeader.theSendersBlockRef = sBlockNum;
185
186 Uint8 prio = Protocol6::getPrio(word1);
187
188 Uint32 * signalData = &readPtr[3];
189
190 if(Protocol6::getSignalIdIncluded(word1) == 0){
191 signalHeader.theSendersSignalId = ~0;
192 } else {
193 signalHeader.theSendersSignalId = * signalData;
194 signalData ++;
195 }//if
196
197 Uint32 * sectionPtr = signalData + signalHeader.theLength;
198 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
199 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
200 Uint32 sz = * sectionPtr;
201 ptr[i].sz = sz;
202 ptr[i].p = sectionData;
203
204 sectionPtr ++;
205 sectionData += sz;
206 }
207
208 callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
209 } else {
210 DEBUG("prepareReceive(...) - Discarding message to block: "
211 << rBlockNum << " from Node: " << remoteNodeId);
212 }//if
213
214 readPtr += messageLen32;
215 sizeOfData -= messageLenBytes;
216 usedData += messageLenBytes;
217 }//while
218
219
220 return usedData;
221 }//if
222 }
223
224 Uint32 *
unpack(Uint32 * readPtr,Uint32 * eodPtr,NodeId remoteNodeId,IOState state)225 TransporterRegistry::unpack(Uint32 * readPtr,
226 Uint32 * eodPtr,
227 NodeId remoteNodeId,
228 IOState state) {
229 SignalHeader signalHeader;
230 LinearSectionPtr ptr[3];
231 Uint32 loop_count = 0;
232 if(state == NoHalt || state == HaltOutput){
233 while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
234 Uint32 word1 = readPtr[0];
235 Uint32 word2 = readPtr[1];
236 Uint32 word3 = readPtr[2];
237 loop_count++;
238 #if 0
239 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
240 //Do funky stuff
241 }
242 #endif
243
244 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
245
246 if(messageLen32 == 0 ||
247 messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))
248 {
249 DEBUG("Message Size(words) = " << messageLen32);
250 report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
251 return readPtr;
252 }//if
253
254 if(Protocol6::getCheckSumIncluded(word1)){
255 const Uint32 tmpLen = messageLen32 - 1;
256 const Uint32 checkSumSent = readPtr[tmpLen];
257 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
258
259 if(checkSumComputed != checkSumSent){
260 report_error(remoteNodeId, TE_INVALID_CHECKSUM);
261 return readPtr;
262 }//if
263 }//if
264
265 #if 0
266 if(Protocol6::getCompressed(word1)){
267 //Do funky stuff
268 }//if
269 #endif
270
271 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
272
273 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
274 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
275 signalHeader.theSendersBlockRef = sBlockNum;
276
277 Uint8 prio = Protocol6::getPrio(word1);
278
279 Uint32 * signalData = &readPtr[3];
280
281 if(Protocol6::getSignalIdIncluded(word1) == 0){
282 signalHeader.theSendersSignalId = ~0;
283 } else {
284 signalHeader.theSendersSignalId = * signalData;
285 signalData ++;
286 }//if
287
288 Uint32 * sectionPtr = signalData + signalHeader.theLength;
289 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
290 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
291 Uint32 sz = * sectionPtr;
292 ptr[i].sz = sz;
293 ptr[i].p = sectionData;
294
295 sectionPtr ++;
296 sectionData += sz;
297 }
298
299 callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
300
301 readPtr += messageLen32;
302 }//while
303 } else {
304 /** state = HaltIO || state == HaltInput */
305
306 while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
307 Uint32 word1 = readPtr[0];
308 Uint32 word2 = readPtr[1];
309 Uint32 word3 = readPtr[2];
310 loop_count++;
311 #if 0
312 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
313 //Do funky stuff
314 }//if
315 #endif
316
317 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
318 if(messageLen32 == 0 ||
319 messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))
320 {
321 DEBUG("Message Size(words) = " << messageLen32);
322 report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
323 return readPtr;
324 }//if
325
326 if(Protocol6::getCheckSumIncluded(word1)){
327 const Uint32 tmpLen = messageLen32 - 1;
328 const Uint32 checkSumSent = readPtr[tmpLen];
329 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
330
331 if(checkSumComputed != checkSumSent){
332
333 //theTransporters[remoteNodeId]->disconnect();
334 report_error(remoteNodeId, TE_INVALID_CHECKSUM);
335 return readPtr;
336 }//if
337 }//if
338
339 #if 0
340 if(Protocol6::getCompressed(word1)){
341 //Do funky stuff
342 }//if
343 #endif
344
345 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
346
347 Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
348
349 if(rBlockNum == 252){
350 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
351 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
352 signalHeader.theSendersBlockRef = sBlockNum;
353
354 Uint8 prio = Protocol6::getPrio(word1);
355
356 Uint32 * signalData = &readPtr[3];
357
358 if(Protocol6::getSignalIdIncluded(word1) == 0){
359 signalHeader.theSendersSignalId = ~0;
360 } else {
361 signalHeader.theSendersSignalId = * signalData;
362 signalData ++;
363 }//if
364
365 Uint32 * sectionPtr = signalData + signalHeader.theLength;
366 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
367 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
368 Uint32 sz = * sectionPtr;
369 ptr[i].sz = sz;
370 ptr[i].p = sectionData;
371
372 sectionPtr ++;
373 sectionData += sz;
374 }
375
376 callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
377 } else {
378 DEBUG("prepareReceive(...) - Discarding message to block: "
379 << rBlockNum << " from Node: " << remoteNodeId);
380 }//if
381
382 readPtr += messageLen32;
383 }//while
384 }//if
385 return readPtr;
386 }
387
Packer(bool signalId,bool checksum)388 Packer::Packer(bool signalId, bool checksum) {
389
390 checksumUsed = (checksum ? 1 : 0);
391 signalIdUsed = (signalId ? 1 : 0);
392
393 // Set the priority
394
395 preComputedWord1 = 0;
396 Protocol6::setByteOrder(preComputedWord1, MY_OWN_BYTE_ORDER);
397 Protocol6::setSignalIdIncluded(preComputedWord1, signalIdUsed);
398 Protocol6::setCheckSumIncluded(preComputedWord1, checksumUsed);
399 Protocol6::setCompressed(preComputedWord1, 0);
400 }
401
402 inline
403 void
import(Uint32 * & insertPtr,const LinearSectionPtr & ptr)404 import(Uint32 * & insertPtr, const LinearSectionPtr & ptr){
405 const Uint32 sz = ptr.sz;
406 memcpy(insertPtr, ptr.p, 4 * sz);
407 insertPtr += sz;
408 }
409
410 inline
411 void
importGeneric(Uint32 * & insertPtr,const GenericSectionPtr & ptr)412 importGeneric(Uint32 * & insertPtr, const GenericSectionPtr & ptr){
413 /* Use the section iterator to obtain the words in this section */
414 Uint32 remain= ptr.sz;
415
416 while (remain > 0)
417 {
418 Uint32 len= 0;
419 const Uint32* next= ptr.sectionIter->getNextWords(len);
420
421 assert(len <= remain);
422 assert(next != NULL);
423
424 memcpy(insertPtr, next, 4 * len);
425 insertPtr+= len;
426 remain-= len;
427 }
428
429 /* Check that there were no more words available from the
430 * Signal iterator
431 */
432 assert(ptr.sectionIter->getNextWords(remain) == NULL);
433 }
434
435 void copy(Uint32 * & insertPtr,
436 class SectionSegmentPool &, const SegmentedSectionPtr & ptr);
437
438 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,const LinearSectionPtr ptr[3]) const439 Packer::pack(Uint32 * insertPtr,
440 Uint32 prio,
441 const SignalHeader * header,
442 const Uint32 * theData,
443 const LinearSectionPtr ptr[3]) const {
444 Uint32 i;
445
446 Uint32 dataLen32 = header->theLength;
447 Uint32 no_segs = header->m_noOfSections;
448
449 Uint32 len32 =
450 dataLen32 + no_segs +
451 checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
452
453
454 for(i = 0; i<no_segs; i++){
455 len32 += ptr[i].sz;
456 }
457
458 /**
459 * Do insert of data
460 */
461 Uint32 word1 = preComputedWord1;
462 Uint32 word2 = 0;
463 Uint32 word3 = 0;
464
465 Protocol6::setPrio(word1, prio);
466 Protocol6::setMessageLength(word1, len32);
467 Protocol6::createProtocol6Header(word1, word2, word3, header);
468
469 insertPtr[0] = word1;
470 insertPtr[1] = word2;
471 insertPtr[2] = word3;
472
473 Uint32 * tmpInserPtr = &insertPtr[3];
474
475 if(signalIdUsed){
476 * tmpInserPtr = header->theSignalId;
477 tmpInserPtr++;
478 }
479
480 memcpy(tmpInserPtr, theData, 4 * dataLen32);
481
482 tmpInserPtr += dataLen32;
483 for(i = 0; i<no_segs; i++){
484 tmpInserPtr[i] = ptr[i].sz;
485 }
486
487 tmpInserPtr += no_segs;
488 for(i = 0; i<no_segs; i++){
489 import(tmpInserPtr, ptr[i]);
490 }
491
492 if(checksumUsed){
493 * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
494 }
495 }
496
497 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3]) const498 Packer::pack(Uint32 * insertPtr,
499 Uint32 prio,
500 const SignalHeader * header,
501 const Uint32 * theData,
502 class SectionSegmentPool & thePool,
503 const SegmentedSectionPtr ptr[3]) const {
504 Uint32 i;
505
506 Uint32 dataLen32 = header->theLength;
507 Uint32 no_segs = header->m_noOfSections;
508
509 Uint32 len32 =
510 dataLen32 + no_segs +
511 checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
512
513 for(i = 0; i<no_segs; i++){
514 len32 += ptr[i].sz;
515 }
516
517 /**
518 * Do insert of data
519 */
520 Uint32 word1 = preComputedWord1;
521 Uint32 word2 = 0;
522 Uint32 word3 = 0;
523
524 Protocol6::setPrio(word1, prio);
525 Protocol6::setMessageLength(word1, len32);
526 Protocol6::createProtocol6Header(word1, word2, word3, header);
527
528 insertPtr[0] = word1;
529 insertPtr[1] = word2;
530 insertPtr[2] = word3;
531
532 Uint32 * tmpInserPtr = &insertPtr[3];
533
534 if(signalIdUsed){
535 * tmpInserPtr = header->theSignalId;
536 tmpInserPtr++;
537 }
538
539 memcpy(tmpInserPtr, theData, 4 * dataLen32);
540
541 tmpInserPtr += dataLen32;
542 for(i = 0; i<no_segs; i++){
543 tmpInserPtr[i] = ptr[i].sz;
544 }
545
546 tmpInserPtr += no_segs;
547 for(i = 0; i<no_segs; i++){
548 copy(tmpInserPtr, thePool, ptr[i]);
549 }
550
551 if(checksumUsed){
552 * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
553 }
554 }
555
556
557 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,const GenericSectionPtr ptr[3]) const558 Packer::pack(Uint32 * insertPtr,
559 Uint32 prio,
560 const SignalHeader * header,
561 const Uint32 * theData,
562 const GenericSectionPtr ptr[3]) const {
563 Uint32 i;
564
565 Uint32 dataLen32 = header->theLength;
566 Uint32 no_segs = header->m_noOfSections;
567
568 Uint32 len32 =
569 dataLen32 + no_segs +
570 checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
571
572
573 for(i = 0; i<no_segs; i++){
574 len32 += ptr[i].sz;
575 }
576
577 /**
578 * Do insert of data
579 */
580 Uint32 word1 = preComputedWord1;
581 Uint32 word2 = 0;
582 Uint32 word3 = 0;
583
584 Protocol6::setPrio(word1, prio);
585 Protocol6::setMessageLength(word1, len32);
586 Protocol6::createProtocol6Header(word1, word2, word3, header);
587
588 insertPtr[0] = word1;
589 insertPtr[1] = word2;
590 insertPtr[2] = word3;
591
592 Uint32 * tmpInsertPtr = &insertPtr[3];
593
594 if(signalIdUsed){
595 * tmpInsertPtr = header->theSignalId;
596 tmpInsertPtr++;
597 }
598
599 memcpy(tmpInsertPtr, theData, 4 * dataLen32);
600
601 tmpInsertPtr += dataLen32;
602 for(i = 0; i<no_segs; i++){
603 tmpInsertPtr[i] = ptr[i].sz;
604 }
605
606 tmpInsertPtr += no_segs;
607 for(i = 0; i<no_segs; i++){
608 importGeneric(tmpInsertPtr, ptr[i]);
609 }
610
611 if(checksumUsed){
612 * tmpInsertPtr = computeChecksum(&insertPtr[0], len32-1);
613 }
614 }
615
616 /**
617 * Find longest data size that does not exceed given maximum, and does not
618 * cause individual signals to be split.
619 *
620 * Used by SHM_Transporter, as it is designed to send data in Signal chunks,
621 * not bytes or words.
622 */
623 Uint32
unpack_length_words(const Uint32 * readPtr,Uint32 maxWords)624 TransporterRegistry::unpack_length_words(const Uint32 *readPtr, Uint32 maxWords)
625 {
626 Uint32 wordLength = 0;
627
628 while (wordLength + 4 + sizeof(Protocol6) <= maxWords)
629 {
630 Uint32 word1 = readPtr[wordLength];
631 Uint16 messageLen32 = Protocol6::getMessageLength(word1);
632 if (wordLength + messageLen32 > maxWords)
633 break;
634 wordLength += messageLen32;
635 }
636 return wordLength;
637 }
638