1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include "Packer.hpp"
28 #include <TransporterRegistry.hpp>
29 #include <TransporterCallback.hpp>
30 #include <RefConvert.hpp>
31 #include "BaseString.hpp"
32 #include "EventLogger.hpp"
33 #include "BlockNumbers.h"
34 
35 #ifdef ERROR_INSERT
36 Uint32 MAX_RECEIVED_SIGNALS = 1024;
37 #else
38 #define MAX_RECEIVED_SIGNALS 1024
39 #endif
40 
41 extern EventLogger* g_eventLogger;
42 
43 void
dump_and_report_bad_message(const char file[],unsigned line,TransporterReceiveHandle & recvHandle,Uint32 * readPtr,size_t sizeInWords,NodeId remoteNodeId,IOState state,TransporterError errorCode)44 TransporterRegistry::dump_and_report_bad_message(const char file[], unsigned line,
45                           TransporterReceiveHandle & recvHandle,
46                           Uint32 * readPtr,
47                           size_t sizeInWords,
48                           NodeId remoteNodeId,
49                           IOState state,
50                           TransporterError errorCode)
51 {
52   report_error(remoteNodeId, errorCode);
53 
54   char msg[MAX_LOG_MESSAGE_SIZE];
55   const size_t sz = sizeof(msg);
56   Uint32 nextMsgOffset = Protocol6::getMessageLength(*readPtr);
57   if (sizeInWords >= nextMsgOffset)
58   {
59     nextMsgOffset = 0;
60   }
61 
62   {
63     size_t offs = 0;
64     ssize_t nb;
65     nb = BaseString::snprintf(msg + offs, sz - offs, "%s: %u: ", file, line);
66     if (nb < 0) goto log_it;
67     offs += nb;
68 
69     // Get error message for errorCode
70     LogLevel::EventCategory cat;
71     Uint32 threshold;
72     Logger::LoggerLevel severity;
73     EventLogger::EventTextFunction textF;
74     EventLoggerBase::event_lookup(NDB_LE_TransporterError,
75                                   cat, threshold, severity, textF);
76     Uint32 TE_words[3] = {0, remoteNodeId, errorCode};
77     g_eventLogger->getText(msg + offs, sz - offs, textF, TE_words, 3);
78     nb = strlen(msg + offs);
79     if (nb < 0) goto log_it;
80     offs += nb;
81 
82     const bool bad_data = recvHandle.m_bad_data_transporters.get(remoteNodeId);
83     nb = BaseString::snprintf(msg + offs, sz - offs,
84                   "\n"
85                   "PerformState %u: IOState %u: bad_data %u\n"
86                   "ptr %p: size %u bytes\n",
87                   performStates[remoteNodeId], state, bad_data,
88                   readPtr, (unsigned)(sizeInWords * 4));
89     if (nb < 0) goto log_it;
90     offs += nb;
91     size_t reserve;
92     if (!nextMsgOffset)
93     {
94       // If next message wont be dumped, print as much as possible
95       // from start of buffer.
96       reserve = 0;
97     }
98     else
99     {
100       // Keep some space for dumping next message, about 10 words
101       // plus 6 preceding words.
102       reserve = 200;
103     }
104     nb = BaseString::hexdump(msg + offs, sz - offs - reserve, readPtr, sizeInWords);
105     if (nb < 0) goto log_it;
106     offs += nb;
107     if (nextMsgOffset)
108     {
109       // Always print some words preceding next message. But assume
110       // at least 60 words will be printed for current message.
111       if (nextMsgOffset > 60)
112       {
113         nb = BaseString::snprintf(msg + offs, sz - offs,
114                   "Before next ptr %p\n",
115                   readPtr + nextMsgOffset - 6);
116         if (nb < 0) goto log_it;
117         offs += nb;
118         nb = BaseString::hexdump(msg + offs, sz - offs, readPtr + nextMsgOffset - 6, 6);
119         offs += nb;
120       }
121       // Dump words for next message.
122       nb = BaseString::snprintf(msg + offs, sz - offs,
123                     "Next ptr %p\n",
124                     readPtr + nextMsgOffset);
125       if (nb < 0) goto log_it;
126       offs += nb;
127       nb = BaseString::hexdump(msg + offs, sz - offs, readPtr + nextMsgOffset, sizeInWords - nextMsgOffset);
128       if (nb < 0) goto log_it;
129       offs += nb;
130     }
131   }
132 
133 log_it:
134   g_eventLogger->error("%s", msg);
135   recvHandle.m_bad_data_transporters.set(remoteNodeId);
136 }
137 
138 static
139 inline
140 bool
unpack_one(Uint32 * (& readPtr),Uint32 * eodPtr,Uint8 (& prio),SignalHeader (& signalHeader),Uint32 * (& signalData),LinearSectionPtr ptr[],TransporterError (& errorCode))141 unpack_one(Uint32* (&readPtr), Uint32* eodPtr,
142            Uint8 (&prio), SignalHeader (&signalHeader), Uint32* (&signalData), LinearSectionPtr ptr[],
143            TransporterError (&errorCode))
144 {
145   Uint32 word1 = readPtr[0];
146   Uint32 word2 = readPtr[1];
147   Uint32 word3 = readPtr[2];
148 
149   if (unlikely(!Protocol6::verifyByteOrder(word1, MY_OWN_BYTE_ORDER)))
150   {
151     errorCode = TE_UNSUPPORTED_BYTE_ORDER;
152     return false;
153   }
154 
155   if (unlikely(Protocol6::getCompressed(word1))){
156     errorCode = TE_COMPRESSED_UNSUPPORTED;
157     return false;
158   }
159 
160   const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
161 
162   if(unlikely(messageLen32 == 0 ||
163               messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))){
164     DEBUG("Message Size = " << messageLen32 << " words");
165     errorCode = TE_INVALID_MESSAGE_LENGTH;
166     return false;
167   }//if
168 
169   if (unlikely(eodPtr < readPtr + messageLen32)) {
170     errorCode = TE_NO_ERROR;
171     return false;
172   }//if
173 
174   if(unlikely(Protocol6::getCheckSumIncluded(word1))){
175     const Uint32 tmpLen = messageLen32 - 1;
176     const Uint32 checkSumSent     = readPtr[tmpLen];
177     const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
178 
179     if(unlikely(checkSumComputed != checkSumSent)){
180       errorCode = TE_INVALID_CHECKSUM;
181       return false;
182     }//if
183   }//if
184 
185   signalData = &readPtr[3];
186 
187   readPtr     += messageLen32;
188 
189   Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
190 
191   prio = Protocol6::getPrio(word1);
192 
193   if(Protocol6::getSignalIdIncluded(word1) == 0){
194     signalHeader.theSendersSignalId = ~0;
195   } else {
196     signalHeader.theSendersSignalId = * signalData;
197     signalData ++;
198   }//if
199   signalHeader.theSignalId= ~0;
200 
201   Uint32 * sectionPtr = signalData + signalHeader.theLength;
202   Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
203   for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
204     Uint32 sz = * sectionPtr;
205     ptr[i].sz = sz;
206     ptr[i].p = sectionData;
207 
208     sectionPtr ++;
209     sectionData += sz;
210   }
211 
212   if(Protocol6::getCheckSumIncluded(word1))
213   {
214     sectionData ++;
215   }
216   if (sectionData != readPtr)
217   {
218     readPtr -= messageLen32;
219     errorCode = TE_INVALID_MESSAGE_LENGTH;
220     return false;
221   }
222 
223   /* check of next message if possible before delivery */
224   if (eodPtr > readPtr)
225   { // check next message word1
226     Uint32 word1 = *readPtr;
227     // check byte order
228     if (unlikely(!Protocol6::verifyByteOrder(word1, MY_OWN_BYTE_ORDER)))
229     {
230       errorCode = TE_UNSUPPORTED_BYTE_ORDER;
231       return false;
232     }
233     if (unlikely(Protocol6::getCompressed(word1)))
234     {
235       errorCode = TE_COMPRESSED_UNSUPPORTED;
236       return false;
237     }
238     // check message size
239     const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
240     if (unlikely(messageLen32 == 0 ||
241                 messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2)))
242     {
243       DEBUG("Message Size = " << messageLenBytes);
244       errorCode = TE_INVALID_MESSAGE_LENGTH;
245       return false;
246     }//if
247   }
248 
249   return true;
250 }
251 
252 Uint32
unpack(TransporterReceiveHandle & recvHandle,Uint32 * readPtr,Uint32 sizeOfData,NodeId remoteNodeId,IOState state,bool & stopReceiving)253 TransporterRegistry::unpack(TransporterReceiveHandle & recvHandle,
254                             Uint32 * readPtr,
255                             Uint32 sizeOfData,
256                             NodeId remoteNodeId,
257                             IOState state,
258                             bool & stopReceiving)
259 {
260   assert(stopReceiving == false);
261   // If bad data detected in  previous run
262   // skip all further data
263   if (unlikely(recvHandle.m_bad_data_transporters.get(remoteNodeId)))
264   {
265     return sizeOfData;
266   }
267 
268   Uint8 prio;
269   SignalHeader signalHeader;
270   Uint32* signalData;
271   LinearSectionPtr ptr[3];
272   TransporterError errorCode = TE_NO_ERROR;
273 
274   Uint32* const sodPtr = readPtr;
275   Uint32* const eodPtr = readPtr + (sizeOfData >> 2);
276   Uint32 loop_count = 0;
277   bool doStopReceiving = false;
278 
279   if(likely(state == NoHalt || state == HaltOutput)){
280     while ((eodPtr >= readPtr + (1 + (sizeof(Protocol6) >> 2))) &&
281            (loop_count < MAX_RECEIVED_SIGNALS) &&
282 	   doStopReceiving == false &&
283            unpack_one(readPtr, eodPtr, prio, signalHeader, signalData, ptr, errorCode)) {
284       loop_count++;
285 
286       Uint32 sBlockNum = signalHeader.theSendersBlockRef;
287       sBlockNum = numberToRef(sBlockNum, remoteNodeId);
288       signalHeader.theSendersBlockRef = sBlockNum;
289 
290       doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
291 
292     }//while
293   } else {
294     /** state = HaltIO || state == HaltInput */
295 
296     while ((eodPtr >= readPtr + (1 + (sizeof(Protocol6) >> 2))) &&
297            (loop_count < MAX_RECEIVED_SIGNALS) &&
298 	   doStopReceiving == false &&
299            unpack_one(readPtr, eodPtr, prio, signalHeader, signalData, ptr, errorCode)) {
300       loop_count++;
301 
302       Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
303 
304       if(rBlockNum == QMGR){
305 	Uint32 sBlockNum = signalHeader.theSendersBlockRef;
306 	sBlockNum = numberToRef(sBlockNum, remoteNodeId);
307 	signalHeader.theSendersBlockRef = sBlockNum;
308 
309 	doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
310       } else {
311 	DEBUG("prepareReceive(...) - Discarding message to block: "
312 	      << rBlockNum << " from Node: " << remoteNodeId);
313       }//if
314     }//while
315   }//if
316 
317   if (errorCode != TE_NO_ERROR)
318   {
319     dump_and_report_bad_message(__FILE__, __LINE__,
320             recvHandle, readPtr, eodPtr - readPtr, remoteNodeId, state,
321             errorCode);
322   }
323 
324   stopReceiving = doStopReceiving;
325   return (readPtr - sodPtr) * sizeof(Uint32);
326 }
327 
328 Uint32 *
unpack(TransporterReceiveHandle & recvHandle,Uint32 * readPtr,Uint32 * eodPtr,NodeId remoteNodeId,IOState state,bool & stopReceiving)329 TransporterRegistry::unpack(TransporterReceiveHandle & recvHandle,
330                             Uint32 * readPtr,
331                             Uint32 * eodPtr,
332                             NodeId remoteNodeId,
333                             IOState state,
334                             bool & stopReceiving)
335 {
336   assert(stopReceiving == false);
337 
338   // If bad data detected in previous run
339   // skip all further data
340   if (unlikely(recvHandle.m_bad_data_transporters.get(remoteNodeId)))
341   {
342     return eodPtr;
343   }
344 
345   Uint8 prio;
346   SignalHeader signalHeader;
347   Uint32* signalData;
348   LinearSectionPtr ptr[3];
349   TransporterError errorCode = TE_NO_ERROR;
350 
351   Uint32 loop_count = 0;
352   bool doStopReceiving = false;
353 
354   if(likely(state == NoHalt || state == HaltOutput)){
355     while ((eodPtr >= readPtr + (1 + (sizeof(Protocol6) >> 2))) &&
356            (loop_count < MAX_RECEIVED_SIGNALS) &&
357 	   doStopReceiving == false &&
358            unpack_one(readPtr, eodPtr, prio, signalHeader, signalData, ptr, errorCode)) {
359       loop_count++;
360 
361       Uint32 sBlockNum = signalHeader.theSendersBlockRef;
362       sBlockNum = numberToRef(sBlockNum, remoteNodeId);
363       signalHeader.theSendersBlockRef = sBlockNum;
364 
365       doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
366 
367     }//while
368   } else {
369     /** state = HaltIO || state == HaltInput */
370 
371     while ((eodPtr >= readPtr + (1 + (sizeof(Protocol6) >> 2))) &&
372            (loop_count < MAX_RECEIVED_SIGNALS) &&
373 	   doStopReceiving == false &&
374            unpack_one(readPtr, eodPtr, prio, signalHeader, signalData, ptr, errorCode)) {
375       loop_count++;
376 
377       Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
378 
379       if(rBlockNum == 252){
380 	Uint32 sBlockNum = signalHeader.theSendersBlockRef;
381 	sBlockNum = numberToRef(sBlockNum, remoteNodeId);
382 	signalHeader.theSendersBlockRef = sBlockNum;
383 
384 	doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
385       } else {
386 	DEBUG("prepareReceive(...) - Discarding message to block: "
387 	      << rBlockNum << " from Node: " << remoteNodeId);
388       }//if
389     }//while
390   }//if
391 
392   if (errorCode != TE_NO_ERROR)
393   {
394     dump_and_report_bad_message(__FILE__, __LINE__,
395             recvHandle, readPtr, eodPtr - readPtr, remoteNodeId, state,
396             errorCode);
397   }
398 
399   stopReceiving = doStopReceiving;
400   return readPtr;
401 }
402 
Packer(bool signalId,bool checksum)403 Packer::Packer(bool signalId, bool checksum) {
404 
405   checksumUsed    = (checksum ? 1 : 0);
406   signalIdUsed    = (signalId ? 1 : 0);
407 
408   // Set the priority
409 
410   preComputedWord1 = 0;
411   Protocol6::setByteOrder(preComputedWord1, MY_OWN_BYTE_ORDER);
412   Protocol6::setSignalIdIncluded(preComputedWord1, signalIdUsed);
413   Protocol6::setCheckSumIncluded(preComputedWord1, checksumUsed);
414   Protocol6::setCompressed(preComputedWord1, 0);
415 }
416 
417 inline
418 void
import(Uint32 * & insertPtr,const LinearSectionPtr & ptr)419 import(Uint32 * & insertPtr, const LinearSectionPtr & ptr){
420   const Uint32 sz = ptr.sz;
421   memcpy(insertPtr, ptr.p, 4 * sz);
422   insertPtr += sz;
423 }
424 
425 inline
426 void
importGeneric(Uint32 * & insertPtr,const GenericSectionPtr & ptr)427 importGeneric(Uint32 * & insertPtr, const GenericSectionPtr & ptr){
428   /* Use the section iterator to obtain the words in this section */
429   Uint32 remain= ptr.sz;
430 
431   while (remain > 0)
432   {
433     Uint32 len= 0;
434     const Uint32* next= ptr.sectionIter->getNextWords(len);
435 
436     assert(len <= remain);
437     assert(next != NULL);
438 
439     memcpy(insertPtr, next, 4 * len);
440     insertPtr+= len;
441     remain-= len;
442   }
443 
444   /* Check that there were no more words available from the
445    * Signal iterator
446    */
447   assert(ptr.sectionIter->getNextWords(remain) == NULL);
448 }
449 
450 void copy(Uint32 * & insertPtr,
451 	  class SectionSegmentPool &, const SegmentedSectionPtr & ptr);
452 
453 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,const LinearSectionPtr ptr[3]) const454 Packer::pack(Uint32 * insertPtr,
455 	     Uint32 prio,
456 	     const SignalHeader * header,
457 	     const Uint32 * theData,
458 	     const LinearSectionPtr ptr[3]) const {
459   Uint32 i;
460 
461   Uint32 dataLen32 = header->theLength;
462   Uint32 no_segs = header->m_noOfSections;
463 
464   Uint32 len32 =
465     dataLen32 + no_segs +
466     checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
467 
468 
469   for(i = 0; i<no_segs; i++){
470     len32 += ptr[i].sz;
471   }
472 
473   /**
474    * Do insert of data
475    */
476   Uint32 word1 = preComputedWord1;
477   Uint32 word2 = 0;
478   Uint32 word3 = 0;
479 
480   Protocol6::setPrio(word1, prio);
481   Protocol6::setMessageLength(word1, len32);
482   Protocol6::createProtocol6Header(word1, word2, word3, header);
483 
484   insertPtr[0] = word1;
485   insertPtr[1] = word2;
486   insertPtr[2] = word3;
487 
488   Uint32 * tmpInserPtr = &insertPtr[3];
489 
490   if(signalIdUsed){
491     * tmpInserPtr = header->theSignalId;
492     tmpInserPtr++;
493   }
494 
495   memcpy(tmpInserPtr, theData, 4 * dataLen32);
496 
497   tmpInserPtr += dataLen32;
498   for(i = 0; i<no_segs; i++){
499     tmpInserPtr[i] = ptr[i].sz;
500   }
501 
502   tmpInserPtr += no_segs;
503   for(i = 0; i<no_segs; i++){
504     import(tmpInserPtr, ptr[i]);
505   }
506 
507   if(checksumUsed){
508     * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
509   }
510 }
511 
512 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3]) const513 Packer::pack(Uint32 * insertPtr,
514 	     Uint32 prio,
515 	     const SignalHeader * header,
516 	     const Uint32 * theData,
517 	     class SectionSegmentPool & thePool,
518 	     const SegmentedSectionPtr ptr[3]) const {
519   Uint32 i;
520 
521   Uint32 dataLen32 = header->theLength;
522   Uint32 no_segs = header->m_noOfSections;
523 
524   Uint32 len32 =
525     dataLen32 + no_segs +
526     checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
527 
528   for(i = 0; i<no_segs; i++){
529     len32 += ptr[i].sz;
530   }
531 
532   /**
533    * Do insert of data
534    */
535   Uint32 word1 = preComputedWord1;
536   Uint32 word2 = 0;
537   Uint32 word3 = 0;
538 
539   Protocol6::setPrio(word1, prio);
540   Protocol6::setMessageLength(word1, len32);
541   Protocol6::createProtocol6Header(word1, word2, word3, header);
542 
543   insertPtr[0] = word1;
544   insertPtr[1] = word2;
545   insertPtr[2] = word3;
546 
547   Uint32 * tmpInserPtr = &insertPtr[3];
548 
549   if(signalIdUsed){
550     * tmpInserPtr = header->theSignalId;
551     tmpInserPtr++;
552   }
553 
554   memcpy(tmpInserPtr, theData, 4 * dataLen32);
555 
556   tmpInserPtr += dataLen32;
557   for(i = 0; i<no_segs; i++){
558     tmpInserPtr[i] = ptr[i].sz;
559   }
560 
561   tmpInserPtr += no_segs;
562   for(i = 0; i<no_segs; i++){
563     copy(tmpInserPtr, thePool, ptr[i]);
564   }
565 
566   if(checksumUsed){
567     * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
568   }
569 }
570 
571 
572 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,const GenericSectionPtr ptr[3]) const573 Packer::pack(Uint32 * insertPtr,
574 	     Uint32 prio,
575 	     const SignalHeader * header,
576 	     const Uint32 * theData,
577 	     const GenericSectionPtr ptr[3]) const {
578   Uint32 i;
579 
580   Uint32 dataLen32 = header->theLength;
581   Uint32 no_segs = header->m_noOfSections;
582 
583   Uint32 len32 =
584     dataLen32 + no_segs +
585     checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
586 
587 
588   for(i = 0; i<no_segs; i++){
589     len32 += ptr[i].sz;
590   }
591 
592   /**
593    * Do insert of data
594    */
595   Uint32 word1 = preComputedWord1;
596   Uint32 word2 = 0;
597   Uint32 word3 = 0;
598 
599   Protocol6::setPrio(word1, prio);
600   Protocol6::setMessageLength(word1, len32);
601   Protocol6::createProtocol6Header(word1, word2, word3, header);
602 
603   insertPtr[0] = word1;
604   insertPtr[1] = word2;
605   insertPtr[2] = word3;
606 
607   Uint32 * tmpInsertPtr = &insertPtr[3];
608 
609   if(signalIdUsed){
610     * tmpInsertPtr = header->theSignalId;
611     tmpInsertPtr++;
612   }
613 
614   memcpy(tmpInsertPtr, theData, 4 * dataLen32);
615 
616   tmpInsertPtr += dataLen32;
617   for(i = 0; i<no_segs; i++){
618     tmpInsertPtr[i] = ptr[i].sz;
619   }
620 
621   tmpInsertPtr += no_segs;
622   for(i = 0; i<no_segs; i++){
623     importGeneric(tmpInsertPtr, ptr[i]);
624   }
625 
626   if(checksumUsed){
627     * tmpInsertPtr = computeChecksum(&insertPtr[0], len32-1);
628   }
629 }
630 
631 /**
632  * Find longest data size that does not exceed given maximum, and does not
633  * cause individual signals to be split.
634  *
635  * Used by SHM_Transporter, as it is designed to send data in Signal chunks,
636  * not bytes or words.
637  */
638 Uint32
unpack_length_words(const Uint32 * readPtr,Uint32 maxWords)639 TransporterRegistry::unpack_length_words(const Uint32 *readPtr, Uint32 maxWords)
640 {
641   Uint32 wordLength = 0;
642 
643   while (wordLength + 4 + sizeof(Protocol6) <= maxWords)
644   {
645     Uint32 word1 = readPtr[wordLength];
646     Uint16 messageLen32 = Protocol6::getMessageLength(word1);
647     if (wordLength + messageLen32 > maxWords)
648       break;
649     wordLength += messageLen32;
650   }
651   return wordLength;
652 }
653