1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include <TransporterRegistry.hpp>
28 #include <FastScheduler.hpp>
29 #include <Emulator.hpp>
30 #include <ErrorHandlingMacros.hpp>
31
32 #include "LongSignal.hpp"
33 #include "LongSignalImpl.hpp"
34
35 #include <signaldata/EventReport.hpp>
36 #include <signaldata/TestOrd.hpp>
37 #include <signaldata/SignalDroppedRep.hpp>
38 #include <signaldata/DisconnectRep.hpp>
39
40 #include "VMSignal.hpp"
41 #include <NdbOut.hpp>
42 #include "TransporterCallbackKernel.hpp"
43 #include <DebuggerNames.hpp>
44
45 #define JAM_FILE_ID 226
46
47
48 /**
49 * The instance
50 */
51 SectionSegmentPool g_sectionSegmentPool;
52
53 /* Instance debugging vars
54 * Set from DBTC
55 */
56 int ErrorSignalReceive= 0;
57 int ErrorMaxSegmentsToSeize= 0;
58
59 /**
60 * This variable controls if ErrorSignalReceive/ErrorMaxSegmentsToSeize
61 * is active...This to make sure only received signals are affected
62 * and not long signals sent inside node
63 */
64 extern bool ErrorImportActive;
65
66 struct ConnectionError
67 {
68 enum TransporterError err;
69 const char *text;
70 };
71
72 static const ConnectionError connectionError[] =
73 {
74 { TE_NO_ERROR, "No error"},
75 { TE_SHM_UNABLE_TO_CREATE_SEGMENT, "Unable to create shared memory segment"},
76 { (enum TransporterError) -1, "No connection error message available (please report a bug)"}
77 };
78
lookupConnectionError(Uint32 err)79 const char *lookupConnectionError(Uint32 err)
80 {
81 for (Uint32 i = 0; i < NDB_ARRAY_SIZE(connectionError); i++)
82 {
83 if ((Uint32)connectionError[i].err == err)
84 {
85 return connectionError[i].text;
86 }
87 }
88 return "No connection error message available (please report a bug)";
89 }
90
91 #ifndef NDBD_MULTITHREADED
92 extern TransporterRegistry globalTransporterRegistry; // Forward declaration
93
94 class TransporterCallbackKernelNonMT :
95 public TransporterCallback,
96 public TransporterReceiveHandleKernel
97 {
98 void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)99 Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
100 {
101 return globalTransporterRegistry.get_bytes_to_send_iovec(node, dst, max);
102 }
bytes_sent(NodeId node,Uint32 bytes)103 Uint32 bytes_sent(NodeId node, Uint32 bytes)
104 {
105 return globalTransporterRegistry.bytes_sent(node, bytes);
106 }
has_data_to_send(NodeId node)107 bool has_data_to_send(NodeId node)
108 {
109 return globalTransporterRegistry.has_data_to_send(node);
110 }
reset_send_buffer(NodeId node,bool should_be_empty)111 void reset_send_buffer(NodeId node, bool should_be_empty)
112 {
113 globalTransporterRegistry.reset_send_buffer(node, should_be_empty);
114 }
115 };
116 static TransporterCallbackKernelNonMT myTransporterCallback;
117 TransporterRegistry globalTransporterRegistry(&myTransporterCallback,
118 &myTransporterCallback);
119 #endif
120
121 #ifdef NDBD_MULTITHREADED
122 static struct ReceiverThreadCache
123 {
124 SectionSegmentPool::Cache cache_instance;
125 char pad[64 - sizeof(SectionSegmentPool::Cache)];
126 } g_receiver_thread_cache[MAX_NDBMT_RECEIVE_THREADS];
127
128 void
mt_init_receiver_cache()129 mt_init_receiver_cache()
130 {
131 for (unsigned i = 0; i < NDB_ARRAY_SIZE(g_receiver_thread_cache); i++)
132 {
133 g_receiver_thread_cache[i].cache_instance.init_cache(1024,1024);
134 }
135 }
136
137 void
mt_set_section_chunk_size()138 mt_set_section_chunk_size()
139 {
140 g_sectionSegmentPool.setChunkSize(256);
141 }
142
143 #else
mt_init_receiver_cache()144 void mt_init_receiver_cache(){}
mt_set_section_chunk_size()145 void mt_set_section_chunk_size(){}
146 #endif
147
148 bool
deliver_signal(SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])149 TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header,
150 Uint8 prio,
151 Uint32 * const theData,
152 LinearSectionPtr ptr[3])
153 {
154 #ifdef NDBD_MULTITHREADED
155 SectionSegmentPool::Cache & cache =
156 g_receiver_thread_cache[m_receiver_thread_idx].cache_instance;
157 #endif
158
159 const Uint32 secCount = header->m_noOfSections;
160 const Uint32 length = header->theLength;
161
162 // if this node is not MT LQH then instance bits are stripped at execute
163
164 #ifdef TRACE_DISTRIBUTED
165 ndbout_c("recv: %s(%d) from (%s, %d)",
166 getSignalName(header->theVerId_signalNumber),
167 header->theVerId_signalNumber,
168 getBlockName(refToBlock(header->theSendersBlockRef)),
169 refToNode(header->theSendersBlockRef));
170 #endif
171
172 bool ok = true;
173 Ptr<SectionSegment> secPtr[3];
174 bzero(secPtr, sizeof(secPtr));
175 secPtr[0].p = secPtr[1].p = secPtr[2].p = 0;
176
177 ErrorImportActive = true;
178 switch(secCount){
179 case 3:
180 ok &= import(SPC_CACHE_ARG secPtr[2], ptr[2].p, ptr[2].sz);
181 case 2:
182 ok &= import(SPC_CACHE_ARG secPtr[1], ptr[1].p, ptr[1].sz);
183 case 1:
184 ok &= import(SPC_CACHE_ARG secPtr[0], ptr[0].p, ptr[0].sz);
185 }
186 ErrorImportActive = false;
187
188 /**
189 * Check that we haven't received a too long signal
190 */
191 ok &= (length + secCount <= 25);
192
193 Uint32 secPtrI[3];
194 if(ok){
195 /**
196 * Normal path
197 */
198 secPtrI[0] = secPtr[0].i;
199 secPtrI[1] = secPtr[1].i;
200 secPtrI[2] = secPtr[2].i;
201
202 #ifndef NDBD_MULTITHREADED
203 globalScheduler.execute(header, prio, theData, secPtrI);
204 #else
205 if (prio == JBB)
206 sendlocal(m_thr_no /* self */,
207 header, theData, secPtrI);
208 else
209 sendprioa(m_thr_no /* self */,
210 header, theData, secPtrI);
211
212 #endif
213 return false;
214 }
215
216 /**
217 * Out of memory
218 */
219 for(Uint32 i = 0; i<secCount; i++){
220 if(secPtr[i].p != 0){
221 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
222 relSz(secPtr[i].p->m_sz),
223 secPtr[i].i,
224 secPtr[i].p->m_lastSegment);
225 }
226 }
227
228 SignalDroppedRep * rep = (SignalDroppedRep*)theData;
229 Uint32 gsn = header->theVerId_signalNumber;
230 Uint32 len = header->theLength;
231 Uint32 newLen= (len > 22 ? 22 : len);
232 memmove(rep->originalData, theData, (4 * newLen));
233 rep->originalGsn = gsn;
234 rep->originalLength = len;
235 rep->originalSectionCount = secCount;
236 header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
237 header->theLength = newLen + 3;
238 header->m_noOfSections = 0;
239 #ifndef NDBD_MULTITHREADED
240 globalScheduler.execute(header, prio, theData, secPtrI);
241 #else
242 if (prio == JBB)
243 sendlocal(m_thr_no /* self */,
244 header, theData, NULL);
245 else
246 sendprioa(m_thr_no /* self */,
247 header, theData, NULL);
248 #endif
249 return false;
250 }
251
252 NdbOut &
operator <<(NdbOut & out,const SectionSegment & ss)253 operator<<(NdbOut& out, const SectionSegment & ss){
254 out << "[ last= " << ss.m_lastSegment << " next= " << ss.nextPool << " ]";
255 return out;
256 }
257
258 void
reportError(NodeId nodeId,TransporterError errorCode,const char * info)259 TransporterReceiveHandleKernel::reportError(NodeId nodeId,
260 TransporterError errorCode,
261 const char *info)
262 {
263 #ifdef DEBUG_TRANSPORTER
264 ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
265 #endif
266
267 DBUG_ENTER("reportError");
268 DBUG_PRINT("info",("nodeId %d errorCode: 0x%x info: %s",
269 nodeId, errorCode, info));
270
271 switch (errorCode)
272 {
273 case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
274 {
275 char msg[64];
276 BaseString::snprintf(msg, sizeof(msg), "Remote node id %d.%s%s", nodeId,
277 info ? " " : "", info ? info : "");
278 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
279 msg, __FILE__, NST_ErrorHandler);
280 }
281 case TE_SIGNAL_LOST:
282 {
283 char msg[64];
284 BaseString::snprintf(msg, sizeof(msg), "Remote node id %d,%s%s", nodeId,
285 info ? " " : "", info ? info : "");
286 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
287 msg, __FILE__, NST_ErrorHandler);
288 }
289 case TE_SHM_IPC_PERMANENT:
290 {
291 char msg[128];
292 BaseString::snprintf(msg, sizeof(msg),
293 "Remote node id %d.%s%s",
294 nodeId, info ? " " : "", info ? info : "");
295 ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
296 msg, __FILE__, NST_ErrorHandler);
297 }
298 default:
299 break;
300 }
301
302 if(errorCode & TE_DO_DISCONNECT){
303 reportDisconnect(nodeId, errorCode);
304 }
305
306 SignalT<3> signal;
307 memset(&signal.header, 0, sizeof(signal.header));
308
309
310 if(errorCode & TE_DO_DISCONNECT)
311 signal.theData[0] = NDB_LE_TransporterError;
312 else
313 signal.theData[0] = NDB_LE_TransporterWarning;
314
315 signal.theData[1] = nodeId;
316 signal.theData[2] = errorCode;
317
318 signal.header.theLength = 3;
319 signal.header.theSendersSignalId = 0;
320 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
321 signal.header.theReceiversBlockNumber = CMVMI;
322 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
323 #ifndef NDBD_MULTITHREADED
324 Uint32 secPtr[3];
325 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
326 #else
327 sendprioa(m_thr_no /* self */,
328 &signal.header, signal.theData, NULL);
329 #endif
330
331 DBUG_VOID_RETURN;
332 }
333
334 /**
335 * Report average send length in bytes (4096 last sends)
336 */
337 #ifndef NDBD_MULTITHREADED
338 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)339 TransporterCallbackKernelNonMT::reportSendLen(NodeId nodeId, Uint32 count,
340 Uint64 bytes)
341 {
342
343 SignalT<3> signal;
344 memset(&signal.header, 0, sizeof(signal.header));
345
346 signal.header.theLength = 3;
347 signal.header.theSendersSignalId = 0;
348 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
349 signal.header.theReceiversBlockNumber = CMVMI;
350 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
351
352 signal.theData[0] = NDB_LE_SendBytesStatistic;
353 signal.theData[1] = nodeId;
354 signal.theData[2] = Uint32(bytes/count);
355
356 Uint32 secPtr[3];
357 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
358 }
359 #endif
360
361 /**
362 * Report average receive length in bytes (4096 last receives)
363 */
364 void
reportReceiveLen(NodeId nodeId,Uint32 count,Uint64 bytes)365 TransporterReceiveHandleKernel::reportReceiveLen(NodeId nodeId, Uint32 count,
366 Uint64 bytes)
367 {
368
369 SignalT<3> signal;
370 memset(&signal.header, 0, sizeof(signal.header));
371
372 signal.header.theLength = 3;
373 signal.header.theSendersSignalId = 0;
374 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
375 signal.header.theReceiversBlockNumber = CMVMI;
376 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
377
378 signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
379 signal.theData[1] = nodeId;
380 signal.theData[2] = Uint32(bytes/count);
381 #ifndef NDBD_MULTITHREADED
382 Uint32 secPtr[3];
383 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
384 #else
385 sendprioa(m_thr_no /* self */,
386 &signal.header, signal.theData, NULL);
387 #endif
388 }
389
390 /**
391 * Report connection established
392 */
393
394 void
reportConnect(NodeId nodeId)395 TransporterReceiveHandleKernel::reportConnect(NodeId nodeId)
396 {
397
398 SignalT<1> signal;
399 memset(&signal.header, 0, sizeof(signal.header));
400
401 #ifndef NDBD_MULTITHREADED
402 Uint32 trpman_instance = 1;
403 #else
404 Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
405 #endif
406 signal.header.theLength = 1;
407 signal.header.theSendersSignalId = 0;
408 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
409 signal.header.theReceiversBlockNumber = numberToBlock(TRPMAN,trpman_instance);
410 signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
411
412 signal.theData[0] = nodeId;
413
414 #ifndef NDBD_MULTITHREADED
415 Uint32 secPtr[3];
416 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
417 #else
418 /**
419 * The first argument to sendprioa is from which thread number this
420 * signal is sent, it is always sent from a receive thread
421 */
422 sendprioa(m_thr_no /* self */,
423 &signal.header, signal.theData, NULL);
424 #endif
425 }
426
427 /**
428 * Report connection broken
429 */
430 void
reportDisconnect(NodeId nodeId,Uint32 errNo)431 TransporterReceiveHandleKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
432 {
433 DBUG_ENTER("reportDisconnect");
434
435 SignalT<sizeof(DisconnectRep)/4> signal;
436 memset(&signal.header, 0, sizeof(signal.header));
437
438 #ifndef NDBD_MULTITHREADED
439 Uint32 trpman_instance = 1;
440 #else
441 Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
442 #endif
443 signal.header.theLength = DisconnectRep::SignalLength;
444 signal.header.theSendersSignalId = 0;
445 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
446 signal.header.theTrace = TestOrd::TraceDisconnect;
447 signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
448 signal.header.theReceiversBlockNumber = numberToBlock(TRPMAN,trpman_instance);
449
450 DisconnectRep * rep = CAST_PTR(DisconnectRep, &signal.theData[0]);
451 rep->nodeId = nodeId;
452 rep->err = errNo;
453
454 #ifndef NDBD_MULTITHREADED
455 Uint32 secPtr[3];
456 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
457 #else
458 sendprioa(m_thr_no /* self */,
459 &signal.header, signal.theData, NULL);
460 #endif
461
462 DBUG_VOID_RETURN;
463 }
464
465 void
printSegmentedSection(FILE * output,const SignalHeader & sh,const SegmentedSectionPtr ptr[3],unsigned i)466 SignalLoggerManager::printSegmentedSection(FILE * output,
467 const SignalHeader & sh,
468 const SegmentedSectionPtr ptr[3],
469 unsigned i)
470 {
471 fprintf(output, "SECTION %u type=segmented", i);
472 if (i >= 3) {
473 fprintf(output, " *** invalid ***\n");
474 return;
475 }
476 const Uint32 len = ptr[i].sz;
477 SectionSegment * ssp = ptr[i].p;
478 Uint32 pos = 0;
479 fprintf(output, " size=%u\n", (unsigned)len);
480 while (pos < len) {
481 if (pos > 0 && pos % SectionSegment::DataLength == 0) {
482 ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment);
483 }
484 printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);
485 }
486 if (len > 0)
487 putc('\n', output);
488 }
489
490 /**
491 * Check to see if jobbbuffers are starting to get full
492 * and if so call doJob
493 */
494 int
checkJobBuffer()495 TransporterReceiveHandleKernel::checkJobBuffer()
496 {
497 #ifndef NDBD_MULTITHREADED
498 return globalScheduler.checkDoJob();
499 #else
500 return mt_checkDoJob(m_receiver_thread_idx);
501 #endif
502 }
503
504 #ifdef NDBD_MULTITHREADED
505 void
assign_nodes(NodeId * recv_thread_idx_array)506 TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array)
507 {
508 m_transporters.clear(); /* Clear all first */
509 for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
510 {
511 if (recv_thread_idx_array[nodeId] == m_receiver_thread_idx)
512 m_transporters.set(nodeId); /* Belongs to our receive thread */
513 }
514 return;
515 }
516 #endif
517
518 void
transporter_recv_from(NodeId nodeId)519 TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
520 {
521 if (globalData.get_hb_count(nodeId) != 0)
522 {
523 globalData.set_hb_count(nodeId) = 0;
524 }
525 }
526
527 #ifndef NDBD_MULTITHREADED
528 class TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance)529 mt_get_trp_receive_handle(unsigned instance)
530 {
531 assert(instance == 0);
532 return &myTransporterCallback;
533 }
534 #endif
535
536 /**
537 * #undef is needed since this file is included by TransporterCallback_nonmt.cpp
538 * and TransporterCallback_mt.cpp
539 */
540 #undef JAM_FILE_ID
541