1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include "Packer.hpp"
28 #include <TransporterRegistry.hpp>
29 #include <TransporterCallback.hpp>
30 #include <RefConvert.hpp>
31 
32 #ifdef ERROR_INSERT
33 Uint32 MAX_RECEIVED_SIGNALS = 1024;
34 #else
35 #define MAX_RECEIVED_SIGNALS 1024
36 #endif
37 
38 Uint32
unpack(Uint32 * readPtr,Uint32 sizeOfData,NodeId remoteNodeId,IOState state)39 TransporterRegistry::unpack(Uint32 * readPtr,
40 			    Uint32 sizeOfData,
41 			    NodeId remoteNodeId,
42 			    IOState state) {
43   SignalHeader signalHeader;
44   LinearSectionPtr ptr[3];
45 
46   Uint32 usedData   = 0;
47   Uint32 loop_count = 0;
48 
49   if(state == NoHalt || state == HaltOutput){
50     while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
51            (loop_count < MAX_RECEIVED_SIGNALS)) {
52       Uint32 word1 = readPtr[0];
53       Uint32 word2 = readPtr[1];
54       Uint32 word3 = readPtr[2];
55       loop_count++;
56 
57 #if 0
58       if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
59 	//Do funky stuff
60       }
61 #endif
62 
63       const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
64       const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
65 
66       if(messageLenBytes == 0 || messageLenBytes > MAX_RECV_MESSAGE_BYTESIZE){
67         DEBUG("Message Size = " << messageLenBytes);
68 	report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
69         return usedData;
70       }//if
71 
72       if (sizeOfData < messageLenBytes) {
73 	break;
74       }//if
75 
76       if(Protocol6::getCheckSumIncluded(word1)){
77 	const Uint32 tmpLen = messageLen32 - 1;
78 	const Uint32 checkSumSent     = readPtr[tmpLen];
79 	const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
80 
81 	if(checkSumComputed != checkSumSent){
82 	  report_error(remoteNodeId, TE_INVALID_CHECKSUM);
83           return usedData;
84 	}//if
85       }//if
86 
87 #if 0
88       if(Protocol6::getCompressed(word1)){
89 	//Do funky stuff
90       }//if
91 #endif
92 
93       Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
94 
95       Uint32 sBlockNum = signalHeader.theSendersBlockRef;
96       sBlockNum = numberToRef(sBlockNum, remoteNodeId);
97       signalHeader.theSendersBlockRef = sBlockNum;
98 
99       Uint8 prio = Protocol6::getPrio(word1);
100 
101       Uint32 * signalData = &readPtr[3];
102 
103       if(Protocol6::getSignalIdIncluded(word1) == 0){
104 	signalHeader.theSendersSignalId = ~0;
105       } else {
106 	signalHeader.theSendersSignalId = * signalData;
107 	signalData ++;
108       }//if
109       signalHeader.theSignalId= ~0;
110 
111       Uint32 * sectionPtr = signalData + signalHeader.theLength;
112       Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
113       for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
114 	Uint32 sz = * sectionPtr;
115 	ptr[i].sz = sz;
116 	ptr[i].p = sectionData;
117 
118 	sectionPtr ++;
119 	sectionData += sz;
120       }
121 
122       callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
123 
124       readPtr     += messageLen32;
125       sizeOfData  -= messageLenBytes;
126       usedData    += messageLenBytes;
127     }//while
128 
129     return usedData;
130   } else {
131     /** state = HaltIO || state == HaltInput */
132 
133     while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
134            (loop_count < MAX_RECEIVED_SIGNALS)) {
135       Uint32 word1 = readPtr[0];
136       Uint32 word2 = readPtr[1];
137       Uint32 word3 = readPtr[2];
138       loop_count++;
139 
140 #if 0
141       if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
142 	//Do funky stuff
143       }//if
144 #endif
145 
146       const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
147       const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
148       if(messageLenBytes == 0 || messageLenBytes > MAX_RECV_MESSAGE_BYTESIZE){
149 	DEBUG("Message Size = " << messageLenBytes);
150 	report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
151         return usedData;
152       }//if
153 
154       if (sizeOfData < messageLenBytes) {
155 	break;
156       }//if
157 
158       if(Protocol6::getCheckSumIncluded(word1)){
159 	const Uint32 tmpLen = messageLen32 - 1;
160 	const Uint32 checkSumSent     = readPtr[tmpLen];
161 	const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
162 
163 	if(checkSumComputed != checkSumSent){
164 
165 	  //theTransporters[remoteNodeId]->disconnect();
166 	  report_error(remoteNodeId, TE_INVALID_CHECKSUM);
167           return usedData;
168 	}//if
169       }//if
170 
171 #if 0
172       if(Protocol6::getCompressed(word1)){
173 	//Do funky stuff
174       }//if
175 #endif
176 
177       Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
178 
179       Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
180 
181       if(rBlockNum == 252){
182 	Uint32 sBlockNum = signalHeader.theSendersBlockRef;
183 	sBlockNum = numberToRef(sBlockNum, remoteNodeId);
184 	signalHeader.theSendersBlockRef = sBlockNum;
185 
186 	Uint8 prio = Protocol6::getPrio(word1);
187 
188 	Uint32 * signalData = &readPtr[3];
189 
190 	if(Protocol6::getSignalIdIncluded(word1) == 0){
191 	  signalHeader.theSendersSignalId = ~0;
192 	} else {
193 	  signalHeader.theSendersSignalId = * signalData;
194 	  signalData ++;
195 	}//if
196 
197 	Uint32 * sectionPtr = signalData + signalHeader.theLength;
198 	Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
199 	for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
200 	  Uint32 sz = * sectionPtr;
201 	  ptr[i].sz = sz;
202 	  ptr[i].p = sectionData;
203 
204 	  sectionPtr ++;
205 	  sectionData += sz;
206 	}
207 
208 	callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
209       } else {
210 	DEBUG("prepareReceive(...) - Discarding message to block: "
211 	      << rBlockNum << " from Node: " << remoteNodeId);
212       }//if
213 
214       readPtr     += messageLen32;
215       sizeOfData  -= messageLenBytes;
216       usedData    += messageLenBytes;
217     }//while
218 
219 
220     return usedData;
221   }//if
222 }
223 
224 Uint32 *
unpack(Uint32 * readPtr,Uint32 * eodPtr,NodeId remoteNodeId,IOState state)225 TransporterRegistry::unpack(Uint32 * readPtr,
226 			    Uint32 * eodPtr,
227 			    NodeId remoteNodeId,
228 			    IOState state) {
229   SignalHeader signalHeader;
230   LinearSectionPtr ptr[3];
231   Uint32 loop_count = 0;
232   if(state == NoHalt || state == HaltOutput){
233     while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
234       Uint32 word1 = readPtr[0];
235       Uint32 word2 = readPtr[1];
236       Uint32 word3 = readPtr[2];
237       loop_count++;
238 #if 0
239       if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
240 	//Do funky stuff
241       }
242 #endif
243 
244       const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
245 
246       if(messageLen32 == 0 ||
247          messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))
248       {
249         DEBUG("Message Size(words) = " << messageLen32);
250 	report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
251         return readPtr;
252       }//if
253 
254       if(Protocol6::getCheckSumIncluded(word1)){
255 	const Uint32 tmpLen = messageLen32 - 1;
256 	const Uint32 checkSumSent     = readPtr[tmpLen];
257 	const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
258 
259 	if(checkSumComputed != checkSumSent){
260 	  report_error(remoteNodeId, TE_INVALID_CHECKSUM);
261 	  return readPtr;
262 	}//if
263       }//if
264 
265 #if 0
266       if(Protocol6::getCompressed(word1)){
267 	//Do funky stuff
268       }//if
269 #endif
270 
271       Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
272 
273       Uint32 sBlockNum = signalHeader.theSendersBlockRef;
274       sBlockNum = numberToRef(sBlockNum, remoteNodeId);
275       signalHeader.theSendersBlockRef = sBlockNum;
276 
277       Uint8 prio = Protocol6::getPrio(word1);
278 
279       Uint32 * signalData = &readPtr[3];
280 
281       if(Protocol6::getSignalIdIncluded(word1) == 0){
282 	signalHeader.theSendersSignalId = ~0;
283       } else {
284 	signalHeader.theSendersSignalId = * signalData;
285 	signalData ++;
286       }//if
287 
288       Uint32 * sectionPtr = signalData + signalHeader.theLength;
289       Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
290       for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
291 	Uint32 sz = * sectionPtr;
292 	ptr[i].sz = sz;
293 	ptr[i].p = sectionData;
294 
295 	sectionPtr ++;
296 	sectionData += sz;
297       }
298 
299       callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
300 
301       readPtr += messageLen32;
302     }//while
303   } else {
304     /** state = HaltIO || state == HaltInput */
305 
306     while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
307       Uint32 word1 = readPtr[0];
308       Uint32 word2 = readPtr[1];
309       Uint32 word3 = readPtr[2];
310       loop_count++;
311 #if 0
312       if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
313 	//Do funky stuff
314       }//if
315 #endif
316 
317       const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
318       if(messageLen32 == 0 ||
319          messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))
320       {
321 	DEBUG("Message Size(words) = " << messageLen32);
322 	report_error(remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
323         return readPtr;
324       }//if
325 
326       if(Protocol6::getCheckSumIncluded(word1)){
327 	const Uint32 tmpLen = messageLen32 - 1;
328 	const Uint32 checkSumSent     = readPtr[tmpLen];
329 	const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
330 
331 	if(checkSumComputed != checkSumSent){
332 
333 	  //theTransporters[remoteNodeId]->disconnect();
334 	  report_error(remoteNodeId, TE_INVALID_CHECKSUM);
335 	  return readPtr;
336 	}//if
337       }//if
338 
339 #if 0
340       if(Protocol6::getCompressed(word1)){
341 	//Do funky stuff
342       }//if
343 #endif
344 
345       Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
346 
347       Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
348 
349       if(rBlockNum == 252){
350 	Uint32 sBlockNum = signalHeader.theSendersBlockRef;
351 	sBlockNum = numberToRef(sBlockNum, remoteNodeId);
352 	signalHeader.theSendersBlockRef = sBlockNum;
353 
354 	Uint8 prio = Protocol6::getPrio(word1);
355 
356 	Uint32 * signalData = &readPtr[3];
357 
358 	if(Protocol6::getSignalIdIncluded(word1) == 0){
359 	  signalHeader.theSendersSignalId = ~0;
360 	} else {
361 	  signalHeader.theSendersSignalId = * signalData;
362 	  signalData ++;
363 	}//if
364 
365 	Uint32 * sectionPtr = signalData + signalHeader.theLength;
366 	Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
367 	for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
368 	  Uint32 sz = * sectionPtr;
369 	  ptr[i].sz = sz;
370 	  ptr[i].p = sectionData;
371 
372 	  sectionPtr ++;
373 	  sectionData += sz;
374 	}
375 
376 	callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr);
377       } else {
378 	DEBUG("prepareReceive(...) - Discarding message to block: "
379 	      << rBlockNum << " from Node: " << remoteNodeId);
380       }//if
381 
382       readPtr += messageLen32;
383     }//while
384   }//if
385   return readPtr;
386 }
387 
Packer(bool signalId,bool checksum)388 Packer::Packer(bool signalId, bool checksum) {
389 
390   checksumUsed    = (checksum ? 1 : 0);
391   signalIdUsed    = (signalId ? 1 : 0);
392 
393   // Set the priority
394 
395   preComputedWord1 = 0;
396   Protocol6::setByteOrder(preComputedWord1, MY_OWN_BYTE_ORDER);
397   Protocol6::setSignalIdIncluded(preComputedWord1, signalIdUsed);
398   Protocol6::setCheckSumIncluded(preComputedWord1, checksumUsed);
399   Protocol6::setCompressed(preComputedWord1, 0);
400 }
401 
402 inline
403 void
import(Uint32 * & insertPtr,const LinearSectionPtr & ptr)404 import(Uint32 * & insertPtr, const LinearSectionPtr & ptr){
405   const Uint32 sz = ptr.sz;
406   memcpy(insertPtr, ptr.p, 4 * sz);
407   insertPtr += sz;
408 }
409 
410 inline
411 void
importGeneric(Uint32 * & insertPtr,const GenericSectionPtr & ptr)412 importGeneric(Uint32 * & insertPtr, const GenericSectionPtr & ptr){
413   /* Use the section iterator to obtain the words in this section */
414   Uint32 remain= ptr.sz;
415 
416   while (remain > 0)
417   {
418     Uint32 len= 0;
419     const Uint32* next= ptr.sectionIter->getNextWords(len);
420 
421     assert(len <= remain);
422     assert(next != NULL);
423 
424     memcpy(insertPtr, next, 4 * len);
425     insertPtr+= len;
426     remain-= len;
427   }
428 
429   /* Check that there were no more words available from the
430    * Signal iterator
431    */
432   assert(ptr.sectionIter->getNextWords(remain) == NULL);
433 }
434 
435 void copy(Uint32 * & insertPtr,
436 	  class SectionSegmentPool &, const SegmentedSectionPtr & ptr);
437 
438 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,const LinearSectionPtr ptr[3]) const439 Packer::pack(Uint32 * insertPtr,
440 	     Uint32 prio,
441 	     const SignalHeader * header,
442 	     const Uint32 * theData,
443 	     const LinearSectionPtr ptr[3]) const {
444   Uint32 i;
445 
446   Uint32 dataLen32 = header->theLength;
447   Uint32 no_segs = header->m_noOfSections;
448 
449   Uint32 len32 =
450     dataLen32 + no_segs +
451     checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
452 
453 
454   for(i = 0; i<no_segs; i++){
455     len32 += ptr[i].sz;
456   }
457 
458   /**
459    * Do insert of data
460    */
461   Uint32 word1 = preComputedWord1;
462   Uint32 word2 = 0;
463   Uint32 word3 = 0;
464 
465   Protocol6::setPrio(word1, prio);
466   Protocol6::setMessageLength(word1, len32);
467   Protocol6::createProtocol6Header(word1, word2, word3, header);
468 
469   insertPtr[0] = word1;
470   insertPtr[1] = word2;
471   insertPtr[2] = word3;
472 
473   Uint32 * tmpInserPtr = &insertPtr[3];
474 
475   if(signalIdUsed){
476     * tmpInserPtr = header->theSignalId;
477     tmpInserPtr++;
478   }
479 
480   memcpy(tmpInserPtr, theData, 4 * dataLen32);
481 
482   tmpInserPtr += dataLen32;
483   for(i = 0; i<no_segs; i++){
484     tmpInserPtr[i] = ptr[i].sz;
485   }
486 
487   tmpInserPtr += no_segs;
488   for(i = 0; i<no_segs; i++){
489     import(tmpInserPtr, ptr[i]);
490   }
491 
492   if(checksumUsed){
493     * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
494   }
495 }
496 
497 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3]) const498 Packer::pack(Uint32 * insertPtr,
499 	     Uint32 prio,
500 	     const SignalHeader * header,
501 	     const Uint32 * theData,
502 	     class SectionSegmentPool & thePool,
503 	     const SegmentedSectionPtr ptr[3]) const {
504   Uint32 i;
505 
506   Uint32 dataLen32 = header->theLength;
507   Uint32 no_segs = header->m_noOfSections;
508 
509   Uint32 len32 =
510     dataLen32 + no_segs +
511     checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
512 
513   for(i = 0; i<no_segs; i++){
514     len32 += ptr[i].sz;
515   }
516 
517   /**
518    * Do insert of data
519    */
520   Uint32 word1 = preComputedWord1;
521   Uint32 word2 = 0;
522   Uint32 word3 = 0;
523 
524   Protocol6::setPrio(word1, prio);
525   Protocol6::setMessageLength(word1, len32);
526   Protocol6::createProtocol6Header(word1, word2, word3, header);
527 
528   insertPtr[0] = word1;
529   insertPtr[1] = word2;
530   insertPtr[2] = word3;
531 
532   Uint32 * tmpInserPtr = &insertPtr[3];
533 
534   if(signalIdUsed){
535     * tmpInserPtr = header->theSignalId;
536     tmpInserPtr++;
537   }
538 
539   memcpy(tmpInserPtr, theData, 4 * dataLen32);
540 
541   tmpInserPtr += dataLen32;
542   for(i = 0; i<no_segs; i++){
543     tmpInserPtr[i] = ptr[i].sz;
544   }
545 
546   tmpInserPtr += no_segs;
547   for(i = 0; i<no_segs; i++){
548     copy(tmpInserPtr, thePool, ptr[i]);
549   }
550 
551   if(checksumUsed){
552     * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
553   }
554 }
555 
556 
557 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,const GenericSectionPtr ptr[3]) const558 Packer::pack(Uint32 * insertPtr,
559 	     Uint32 prio,
560 	     const SignalHeader * header,
561 	     const Uint32 * theData,
562 	     const GenericSectionPtr ptr[3]) const {
563   Uint32 i;
564 
565   Uint32 dataLen32 = header->theLength;
566   Uint32 no_segs = header->m_noOfSections;
567 
568   Uint32 len32 =
569     dataLen32 + no_segs +
570     checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
571 
572 
573   for(i = 0; i<no_segs; i++){
574     len32 += ptr[i].sz;
575   }
576 
577   /**
578    * Do insert of data
579    */
580   Uint32 word1 = preComputedWord1;
581   Uint32 word2 = 0;
582   Uint32 word3 = 0;
583 
584   Protocol6::setPrio(word1, prio);
585   Protocol6::setMessageLength(word1, len32);
586   Protocol6::createProtocol6Header(word1, word2, word3, header);
587 
588   insertPtr[0] = word1;
589   insertPtr[1] = word2;
590   insertPtr[2] = word3;
591 
592   Uint32 * tmpInsertPtr = &insertPtr[3];
593 
594   if(signalIdUsed){
595     * tmpInsertPtr = header->theSignalId;
596     tmpInsertPtr++;
597   }
598 
599   memcpy(tmpInsertPtr, theData, 4 * dataLen32);
600 
601   tmpInsertPtr += dataLen32;
602   for(i = 0; i<no_segs; i++){
603     tmpInsertPtr[i] = ptr[i].sz;
604   }
605 
606   tmpInsertPtr += no_segs;
607   for(i = 0; i<no_segs; i++){
608     importGeneric(tmpInsertPtr, ptr[i]);
609   }
610 
611   if(checksumUsed){
612     * tmpInsertPtr = computeChecksum(&insertPtr[0], len32-1);
613   }
614 }
615 
616 /**
617  * Find longest data size that does not exceed given maximum, and does not
618  * cause individual signals to be split.
619  *
620  * Used by SHM_Transporter, as it is designed to send data in Signal chunks,
621  * not bytes or words.
622  */
623 Uint32
unpack_length_words(const Uint32 * readPtr,Uint32 maxWords)624 TransporterRegistry::unpack_length_words(const Uint32 *readPtr, Uint32 maxWords)
625 {
626   Uint32 wordLength = 0;
627 
628   while (wordLength + 4 + sizeof(Protocol6) <= maxWords)
629   {
630     Uint32 word1 = readPtr[wordLength];
631     Uint16 messageLen32 = Protocol6::getMessageLength(word1);
632     if (wordLength + messageLen32 > maxWords)
633       break;
634     wordLength += messageLen32;
635   }
636   return wordLength;
637 }
638