1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include "Packer.hpp"
28 #include <TransporterRegistry.hpp>
29 #include <TransporterCallback.hpp>
30 #include <RefConvert.hpp>
31 #include "BaseString.hpp"
32 #include "EventLogger.hpp"
33 #include "BlockNumbers.h"
34
35 #ifdef ERROR_INSERT
36 Uint32 MAX_RECEIVED_SIGNALS = 1024;
37 #else
38 #define MAX_RECEIVED_SIGNALS 1024
39 #endif
40
41 extern EventLogger* g_eventLogger;
42
43 void
dump_and_report_bad_message(const char file[],unsigned line,TransporterReceiveHandle & recvHandle,Uint32 * readPtr,size_t sizeInWords,NodeId remoteNodeId,IOState state,TransporterError errorCode)44 TransporterRegistry::dump_and_report_bad_message(const char file[], unsigned line,
45 TransporterReceiveHandle & recvHandle,
46 Uint32 * readPtr,
47 size_t sizeInWords,
48 NodeId remoteNodeId,
49 IOState state,
50 TransporterError errorCode)
51 {
52 report_error(remoteNodeId, errorCode);
53
54 char msg[MAX_LOG_MESSAGE_SIZE];
55 const size_t sz = sizeof(msg);
56 Uint32 nextMsgOffset = Protocol6::getMessageLength(*readPtr);
57 if (sizeInWords >= nextMsgOffset)
58 {
59 nextMsgOffset = 0;
60 }
61
62 {
63 size_t offs = 0;
64 ssize_t nb;
65 nb = BaseString::snprintf(msg + offs, sz - offs, "%s: %u: ", file, line);
66 if (nb < 0) goto log_it;
67 offs += nb;
68
69 // Get error message for errorCode
70 LogLevel::EventCategory cat;
71 Uint32 threshold;
72 Logger::LoggerLevel severity;
73 EventLogger::EventTextFunction textF;
74 EventLoggerBase::event_lookup(NDB_LE_TransporterError,
75 cat, threshold, severity, textF);
76 Uint32 TE_words[3] = {0, remoteNodeId, errorCode};
77 g_eventLogger->getText(msg + offs, sz - offs, textF, TE_words, 3);
78 nb = strlen(msg + offs);
79 if (nb < 0) goto log_it;
80 offs += nb;
81
82 const bool bad_data = recvHandle.m_bad_data_transporters.get(remoteNodeId);
83 nb = BaseString::snprintf(msg + offs, sz - offs,
84 "\n"
85 "PerformState %u: IOState %u: bad_data %u\n"
86 "ptr %p: size %u bytes\n",
87 performStates[remoteNodeId], state, bad_data,
88 readPtr, (unsigned)(sizeInWords * 4));
89 if (nb < 0) goto log_it;
90 offs += nb;
91 size_t reserve;
92 if (!nextMsgOffset)
93 {
94 // If next message wont be dumped, print as much as possible
95 // from start of buffer.
96 reserve = 0;
97 }
98 else
99 {
100 // Keep some space for dumping next message, about 10 words
101 // plus 6 preceding words.
102 reserve = 200;
103 }
104 nb = BaseString::hexdump(msg + offs, sz - offs - reserve, readPtr, sizeInWords);
105 if (nb < 0) goto log_it;
106 offs += nb;
107 if (nextMsgOffset)
108 {
109 // Always print some words preceding next message. But assume
110 // at least 60 words will be printed for current message.
111 if (nextMsgOffset > 60)
112 {
113 nb = BaseString::snprintf(msg + offs, sz - offs,
114 "Before next ptr %p\n",
115 readPtr + nextMsgOffset - 6);
116 if (nb < 0) goto log_it;
117 offs += nb;
118 nb = BaseString::hexdump(msg + offs, sz - offs, readPtr + nextMsgOffset - 6, 6);
119 offs += nb;
120 }
121 // Dump words for next message.
122 nb = BaseString::snprintf(msg + offs, sz - offs,
123 "Next ptr %p\n",
124 readPtr + nextMsgOffset);
125 if (nb < 0) goto log_it;
126 offs += nb;
127 nb = BaseString::hexdump(msg + offs, sz - offs, readPtr + nextMsgOffset, sizeInWords - nextMsgOffset);
128 if (nb < 0) goto log_it;
129 offs += nb;
130 }
131 }
132
133 log_it:
134 g_eventLogger->error("%s", msg);
135 recvHandle.m_bad_data_transporters.set(remoteNodeId);
136 }
137
138 static
139 inline
140 bool
unpack_one(Uint32 * (& readPtr),Uint32 * eodPtr,Uint8 (& prio),SignalHeader (& signalHeader),Uint32 * (& signalData),LinearSectionPtr ptr[],TransporterError (& errorCode))141 unpack_one(Uint32* (&readPtr), Uint32* eodPtr,
142 Uint8 (&prio), SignalHeader (&signalHeader), Uint32* (&signalData), LinearSectionPtr ptr[],
143 TransporterError (&errorCode))
144 {
145 Uint32 word1 = readPtr[0];
146 Uint32 word2 = readPtr[1];
147 Uint32 word3 = readPtr[2];
148
149 if (unlikely(!Protocol6::verifyByteOrder(word1, MY_OWN_BYTE_ORDER)))
150 {
151 errorCode = TE_UNSUPPORTED_BYTE_ORDER;
152 return false;
153 }
154
155 if (unlikely(Protocol6::getCompressed(word1))){
156 errorCode = TE_COMPRESSED_UNSUPPORTED;
157 return false;
158 }
159
160 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
161
162 if(unlikely(messageLen32 == 0 ||
163 messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2))){
164 DEBUG("Message Size = " << messageLen32 << " words");
165 errorCode = TE_INVALID_MESSAGE_LENGTH;
166 return false;
167 }//if
168
169 if (unlikely(eodPtr < readPtr + messageLen32)) {
170 errorCode = TE_NO_ERROR;
171 return false;
172 }//if
173
174 if(unlikely(Protocol6::getCheckSumIncluded(word1))){
175 const Uint32 tmpLen = messageLen32 - 1;
176 const Uint32 checkSumSent = readPtr[tmpLen];
177 const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
178
179 if(unlikely(checkSumComputed != checkSumSent)){
180 errorCode = TE_INVALID_CHECKSUM;
181 return false;
182 }//if
183 }//if
184
185 signalData = &readPtr[3];
186
187 readPtr += messageLen32;
188
189 Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
190
191 prio = Protocol6::getPrio(word1);
192
193 if(Protocol6::getSignalIdIncluded(word1) == 0){
194 signalHeader.theSendersSignalId = ~0;
195 } else {
196 signalHeader.theSendersSignalId = * signalData;
197 signalData ++;
198 }//if
199 signalHeader.theSignalId= ~0;
200
201 Uint32 * sectionPtr = signalData + signalHeader.theLength;
202 Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
203 for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
204 Uint32 sz = * sectionPtr;
205 ptr[i].sz = sz;
206 ptr[i].p = sectionData;
207
208 sectionPtr ++;
209 sectionData += sz;
210 }
211
212 if(Protocol6::getCheckSumIncluded(word1))
213 {
214 sectionData ++;
215 }
216 if (sectionData != readPtr)
217 {
218 readPtr -= messageLen32;
219 errorCode = TE_INVALID_MESSAGE_LENGTH;
220 return false;
221 }
222
223 /* check of next message if possible before delivery */
224 if (eodPtr > readPtr)
225 { // check next message word1
226 Uint32 word1 = *readPtr;
227 // check byte order
228 if (unlikely(!Protocol6::verifyByteOrder(word1, MY_OWN_BYTE_ORDER)))
229 {
230 errorCode = TE_UNSUPPORTED_BYTE_ORDER;
231 return false;
232 }
233 if (unlikely(Protocol6::getCompressed(word1)))
234 {
235 errorCode = TE_COMPRESSED_UNSUPPORTED;
236 return false;
237 }
238 // check message size
239 const Uint16 messageLen32 = Protocol6::getMessageLength(word1);
240 if (unlikely(messageLen32 == 0 ||
241 messageLen32 > (MAX_RECV_MESSAGE_BYTESIZE >> 2)))
242 {
243 DEBUG("Message Size = " << messageLenBytes);
244 errorCode = TE_INVALID_MESSAGE_LENGTH;
245 return false;
246 }//if
247 }
248
249 return true;
250 }
251
252 Uint32
unpack(TransporterReceiveHandle & recvHandle,Uint32 * readPtr,Uint32 sizeOfData,NodeId remoteNodeId,IOState state,bool & stopReceiving)253 TransporterRegistry::unpack(TransporterReceiveHandle & recvHandle,
254 Uint32 * readPtr,
255 Uint32 sizeOfData,
256 NodeId remoteNodeId,
257 IOState state,
258 bool & stopReceiving)
259 {
260 assert(stopReceiving == false);
261 // If bad data detected in previous run
262 // skip all further data
263 if (unlikely(recvHandle.m_bad_data_transporters.get(remoteNodeId)))
264 {
265 return sizeOfData;
266 }
267
268 Uint8 prio;
269 SignalHeader signalHeader;
270 Uint32* signalData;
271 LinearSectionPtr ptr[3];
272 TransporterError errorCode = TE_NO_ERROR;
273
274 Uint32* const sodPtr = readPtr;
275 Uint32* const eodPtr = readPtr + (sizeOfData >> 2);
276 Uint32 loop_count = 0;
277 bool doStopReceiving = false;
278
279 if(likely(state == NoHalt || state == HaltOutput)){
280 while ((eodPtr >= readPtr + (1 + (sizeof(Protocol6) >> 2))) &&
281 (loop_count < MAX_RECEIVED_SIGNALS) &&
282 doStopReceiving == false &&
283 unpack_one(readPtr, eodPtr, prio, signalHeader, signalData, ptr, errorCode)) {
284 loop_count++;
285
286 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
287 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
288 signalHeader.theSendersBlockRef = sBlockNum;
289
290 doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
291
292 }//while
293 } else {
294 /** state = HaltIO || state == HaltInput */
295
296 while ((eodPtr >= readPtr + (1 + (sizeof(Protocol6) >> 2))) &&
297 (loop_count < MAX_RECEIVED_SIGNALS) &&
298 doStopReceiving == false &&
299 unpack_one(readPtr, eodPtr, prio, signalHeader, signalData, ptr, errorCode)) {
300 loop_count++;
301
302 Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
303
304 if(rBlockNum == QMGR){
305 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
306 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
307 signalHeader.theSendersBlockRef = sBlockNum;
308
309 doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
310 } else {
311 DEBUG("prepareReceive(...) - Discarding message to block: "
312 << rBlockNum << " from Node: " << remoteNodeId);
313 }//if
314 }//while
315 }//if
316
317 if (errorCode != TE_NO_ERROR)
318 {
319 dump_and_report_bad_message(__FILE__, __LINE__,
320 recvHandle, readPtr, eodPtr - readPtr, remoteNodeId, state,
321 errorCode);
322 }
323
324 stopReceiving = doStopReceiving;
325 return (readPtr - sodPtr) * sizeof(Uint32);
326 }
327
328 Uint32 *
unpack(TransporterReceiveHandle & recvHandle,Uint32 * readPtr,Uint32 * eodPtr,NodeId remoteNodeId,IOState state,bool & stopReceiving)329 TransporterRegistry::unpack(TransporterReceiveHandle & recvHandle,
330 Uint32 * readPtr,
331 Uint32 * eodPtr,
332 NodeId remoteNodeId,
333 IOState state,
334 bool & stopReceiving)
335 {
336 assert(stopReceiving == false);
337
338 // If bad data detected in previous run
339 // skip all further data
340 if (unlikely(recvHandle.m_bad_data_transporters.get(remoteNodeId)))
341 {
342 return eodPtr;
343 }
344
345 Uint8 prio;
346 SignalHeader signalHeader;
347 Uint32* signalData;
348 LinearSectionPtr ptr[3];
349 TransporterError errorCode = TE_NO_ERROR;
350
351 Uint32 loop_count = 0;
352 bool doStopReceiving = false;
353
354 if(likely(state == NoHalt || state == HaltOutput)){
355 while ((eodPtr >= readPtr + (1 + (sizeof(Protocol6) >> 2))) &&
356 (loop_count < MAX_RECEIVED_SIGNALS) &&
357 doStopReceiving == false &&
358 unpack_one(readPtr, eodPtr, prio, signalHeader, signalData, ptr, errorCode)) {
359 loop_count++;
360
361 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
362 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
363 signalHeader.theSendersBlockRef = sBlockNum;
364
365 doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
366
367 }//while
368 } else {
369 /** state = HaltIO || state == HaltInput */
370
371 while ((eodPtr >= readPtr + (1 + (sizeof(Protocol6) >> 2))) &&
372 (loop_count < MAX_RECEIVED_SIGNALS) &&
373 doStopReceiving == false &&
374 unpack_one(readPtr, eodPtr, prio, signalHeader, signalData, ptr, errorCode)) {
375 loop_count++;
376
377 Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
378
379 if(rBlockNum == 252){
380 Uint32 sBlockNum = signalHeader.theSendersBlockRef;
381 sBlockNum = numberToRef(sBlockNum, remoteNodeId);
382 signalHeader.theSendersBlockRef = sBlockNum;
383
384 doStopReceiving = recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
385 } else {
386 DEBUG("prepareReceive(...) - Discarding message to block: "
387 << rBlockNum << " from Node: " << remoteNodeId);
388 }//if
389 }//while
390 }//if
391
392 if (errorCode != TE_NO_ERROR)
393 {
394 dump_and_report_bad_message(__FILE__, __LINE__,
395 recvHandle, readPtr, eodPtr - readPtr, remoteNodeId, state,
396 errorCode);
397 }
398
399 stopReceiving = doStopReceiving;
400 return readPtr;
401 }
402
Packer(bool signalId,bool checksum)403 Packer::Packer(bool signalId, bool checksum) {
404
405 checksumUsed = (checksum ? 1 : 0);
406 signalIdUsed = (signalId ? 1 : 0);
407
408 // Set the priority
409
410 preComputedWord1 = 0;
411 Protocol6::setByteOrder(preComputedWord1, MY_OWN_BYTE_ORDER);
412 Protocol6::setSignalIdIncluded(preComputedWord1, signalIdUsed);
413 Protocol6::setCheckSumIncluded(preComputedWord1, checksumUsed);
414 Protocol6::setCompressed(preComputedWord1, 0);
415 }
416
417 inline
418 void
import(Uint32 * & insertPtr,const LinearSectionPtr & ptr)419 import(Uint32 * & insertPtr, const LinearSectionPtr & ptr){
420 const Uint32 sz = ptr.sz;
421 memcpy(insertPtr, ptr.p, 4 * sz);
422 insertPtr += sz;
423 }
424
425 inline
426 void
importGeneric(Uint32 * & insertPtr,const GenericSectionPtr & ptr)427 importGeneric(Uint32 * & insertPtr, const GenericSectionPtr & ptr){
428 /* Use the section iterator to obtain the words in this section */
429 Uint32 remain= ptr.sz;
430
431 while (remain > 0)
432 {
433 Uint32 len= 0;
434 const Uint32* next= ptr.sectionIter->getNextWords(len);
435
436 assert(len <= remain);
437 assert(next != NULL);
438
439 memcpy(insertPtr, next, 4 * len);
440 insertPtr+= len;
441 remain-= len;
442 }
443
444 /* Check that there were no more words available from the
445 * Signal iterator
446 */
447 assert(ptr.sectionIter->getNextWords(remain) == NULL);
448 }
449
450 void copy(Uint32 * & insertPtr,
451 class SectionSegmentPool &, const SegmentedSectionPtr & ptr);
452
453 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,const LinearSectionPtr ptr[3]) const454 Packer::pack(Uint32 * insertPtr,
455 Uint32 prio,
456 const SignalHeader * header,
457 const Uint32 * theData,
458 const LinearSectionPtr ptr[3]) const {
459 Uint32 i;
460
461 Uint32 dataLen32 = header->theLength;
462 Uint32 no_segs = header->m_noOfSections;
463
464 Uint32 len32 =
465 dataLen32 + no_segs +
466 checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
467
468
469 for(i = 0; i<no_segs; i++){
470 len32 += ptr[i].sz;
471 }
472
473 /**
474 * Do insert of data
475 */
476 Uint32 word1 = preComputedWord1;
477 Uint32 word2 = 0;
478 Uint32 word3 = 0;
479
480 Protocol6::setPrio(word1, prio);
481 Protocol6::setMessageLength(word1, len32);
482 Protocol6::createProtocol6Header(word1, word2, word3, header);
483
484 insertPtr[0] = word1;
485 insertPtr[1] = word2;
486 insertPtr[2] = word3;
487
488 Uint32 * tmpInserPtr = &insertPtr[3];
489
490 if(signalIdUsed){
491 * tmpInserPtr = header->theSignalId;
492 tmpInserPtr++;
493 }
494
495 memcpy(tmpInserPtr, theData, 4 * dataLen32);
496
497 tmpInserPtr += dataLen32;
498 for(i = 0; i<no_segs; i++){
499 tmpInserPtr[i] = ptr[i].sz;
500 }
501
502 tmpInserPtr += no_segs;
503 for(i = 0; i<no_segs; i++){
504 import(tmpInserPtr, ptr[i]);
505 }
506
507 if(checksumUsed){
508 * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
509 }
510 }
511
512 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3]) const513 Packer::pack(Uint32 * insertPtr,
514 Uint32 prio,
515 const SignalHeader * header,
516 const Uint32 * theData,
517 class SectionSegmentPool & thePool,
518 const SegmentedSectionPtr ptr[3]) const {
519 Uint32 i;
520
521 Uint32 dataLen32 = header->theLength;
522 Uint32 no_segs = header->m_noOfSections;
523
524 Uint32 len32 =
525 dataLen32 + no_segs +
526 checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
527
528 for(i = 0; i<no_segs; i++){
529 len32 += ptr[i].sz;
530 }
531
532 /**
533 * Do insert of data
534 */
535 Uint32 word1 = preComputedWord1;
536 Uint32 word2 = 0;
537 Uint32 word3 = 0;
538
539 Protocol6::setPrio(word1, prio);
540 Protocol6::setMessageLength(word1, len32);
541 Protocol6::createProtocol6Header(word1, word2, word3, header);
542
543 insertPtr[0] = word1;
544 insertPtr[1] = word2;
545 insertPtr[2] = word3;
546
547 Uint32 * tmpInserPtr = &insertPtr[3];
548
549 if(signalIdUsed){
550 * tmpInserPtr = header->theSignalId;
551 tmpInserPtr++;
552 }
553
554 memcpy(tmpInserPtr, theData, 4 * dataLen32);
555
556 tmpInserPtr += dataLen32;
557 for(i = 0; i<no_segs; i++){
558 tmpInserPtr[i] = ptr[i].sz;
559 }
560
561 tmpInserPtr += no_segs;
562 for(i = 0; i<no_segs; i++){
563 copy(tmpInserPtr, thePool, ptr[i]);
564 }
565
566 if(checksumUsed){
567 * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
568 }
569 }
570
571
572 void
pack(Uint32 * insertPtr,Uint32 prio,const SignalHeader * header,const Uint32 * theData,const GenericSectionPtr ptr[3]) const573 Packer::pack(Uint32 * insertPtr,
574 Uint32 prio,
575 const SignalHeader * header,
576 const Uint32 * theData,
577 const GenericSectionPtr ptr[3]) const {
578 Uint32 i;
579
580 Uint32 dataLen32 = header->theLength;
581 Uint32 no_segs = header->m_noOfSections;
582
583 Uint32 len32 =
584 dataLen32 + no_segs +
585 checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
586
587
588 for(i = 0; i<no_segs; i++){
589 len32 += ptr[i].sz;
590 }
591
592 /**
593 * Do insert of data
594 */
595 Uint32 word1 = preComputedWord1;
596 Uint32 word2 = 0;
597 Uint32 word3 = 0;
598
599 Protocol6::setPrio(word1, prio);
600 Protocol6::setMessageLength(word1, len32);
601 Protocol6::createProtocol6Header(word1, word2, word3, header);
602
603 insertPtr[0] = word1;
604 insertPtr[1] = word2;
605 insertPtr[2] = word3;
606
607 Uint32 * tmpInsertPtr = &insertPtr[3];
608
609 if(signalIdUsed){
610 * tmpInsertPtr = header->theSignalId;
611 tmpInsertPtr++;
612 }
613
614 memcpy(tmpInsertPtr, theData, 4 * dataLen32);
615
616 tmpInsertPtr += dataLen32;
617 for(i = 0; i<no_segs; i++){
618 tmpInsertPtr[i] = ptr[i].sz;
619 }
620
621 tmpInsertPtr += no_segs;
622 for(i = 0; i<no_segs; i++){
623 importGeneric(tmpInsertPtr, ptr[i]);
624 }
625
626 if(checksumUsed){
627 * tmpInsertPtr = computeChecksum(&insertPtr[0], len32-1);
628 }
629 }
630
631 /**
632 * Find longest data size that does not exceed given maximum, and does not
633 * cause individual signals to be split.
634 *
635 * Used by SHM_Transporter, as it is designed to send data in Signal chunks,
636 * not bytes or words.
637 */
638 Uint32
unpack_length_words(const Uint32 * readPtr,Uint32 maxWords)639 TransporterRegistry::unpack_length_words(const Uint32 *readPtr, Uint32 maxWords)
640 {
641 Uint32 wordLength = 0;
642
643 while (wordLength + 4 + sizeof(Protocol6) <= maxWords)
644 {
645 Uint32 word1 = readPtr[wordLength];
646 Uint16 messageLen32 = Protocol6::getMessageLength(word1);
647 if (wordLength + messageLen32 > maxWords)
648 break;
649 wordLength += messageLen32;
650 }
651 return wordLength;
652 }
653