1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include <TransporterRegistry.hpp>
28 #include <FastScheduler.hpp>
29 #include <Emulator.hpp>
30 #include <ErrorHandlingMacros.hpp>
31 
32 #include "LongSignal.hpp"
33 #include "LongSignalImpl.hpp"
34 
35 #include <signaldata/EventReport.hpp>
36 #include <signaldata/TestOrd.hpp>
37 #include <signaldata/SignalDroppedRep.hpp>
38 #include <signaldata/DisconnectRep.hpp>
39 
40 #include "VMSignal.hpp"
41 #include <NdbOut.hpp>
42 #include "TransporterCallbackKernel.hpp"
43 #include <DebuggerNames.hpp>
44 
45 #define JAM_FILE_ID 226
46 
47 
48 /**
49  * The instance
50  */
51 SectionSegmentPool g_sectionSegmentPool;
52 
53 /* Instance debugging vars
54  * Set from DBTC
55  */
56 int ErrorSignalReceive= 0;
57 int ErrorMaxSegmentsToSeize= 0;
58 
59 /**
60  * This variable controls if ErrorSignalReceive/ErrorMaxSegmentsToSeize
61  *   is active...This to make sure only received signals are affected
62  *   and not long signals sent inside node
63  */
64 extern bool ErrorImportActive;
65 
66 struct ConnectionError
67 {
68   enum TransporterError err;
69   const char *text;
70 };
71 
72 static const ConnectionError connectionError[] =
73 {
74   { TE_NO_ERROR, "No error"},
75   { TE_SHM_UNABLE_TO_CREATE_SEGMENT, "Unable to create shared memory segment"},
76   { (enum TransporterError) -1, "No connection error message available (please report a bug)"}
77 };
78 
lookupConnectionError(Uint32 err)79 const char *lookupConnectionError(Uint32 err)
80 {
81   for (Uint32 i = 0; i < NDB_ARRAY_SIZE(connectionError); i++)
82   {
83     if ((Uint32)connectionError[i].err == err)
84     {
85       return connectionError[i].text;
86     }
87   }
88   return "No connection error message available (please report a bug)";
89 }
90 
91 #ifndef NDBD_MULTITHREADED
92 extern TransporterRegistry globalTransporterRegistry; // Forward declaration
93 
94 class TransporterCallbackKernelNonMT :
95   public TransporterCallback,
96   public TransporterReceiveHandleKernel
97 {
98   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)99   Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
100   {
101     return globalTransporterRegistry.get_bytes_to_send_iovec(node, dst, max);
102   }
bytes_sent(NodeId node,Uint32 bytes)103   Uint32 bytes_sent(NodeId node, Uint32 bytes)
104   {
105     return globalTransporterRegistry.bytes_sent(node, bytes);
106   }
has_data_to_send(NodeId node)107   bool has_data_to_send(NodeId node)
108   {
109     return globalTransporterRegistry.has_data_to_send(node);
110   }
reset_send_buffer(NodeId node,bool should_be_empty)111   void reset_send_buffer(NodeId node, bool should_be_empty)
112   {
113     globalTransporterRegistry.reset_send_buffer(node, should_be_empty);
114   }
115 };
116 static TransporterCallbackKernelNonMT myTransporterCallback;
117 TransporterRegistry globalTransporterRegistry(&myTransporterCallback,
118                                               &myTransporterCallback);
119 #endif
120 
121 #ifdef NDBD_MULTITHREADED
122 static struct ReceiverThreadCache
123 {
124   SectionSegmentPool::Cache cache_instance;
125   char pad[64 - sizeof(SectionSegmentPool::Cache)];
126 } g_receiver_thread_cache[MAX_NDBMT_RECEIVE_THREADS];
127 
128 void
mt_init_receiver_cache()129 mt_init_receiver_cache()
130 {
131   for (unsigned i = 0; i < NDB_ARRAY_SIZE(g_receiver_thread_cache); i++)
132   {
133     g_receiver_thread_cache[i].cache_instance.init_cache(1024,1024);
134   }
135 }
136 
137 void
mt_set_section_chunk_size()138 mt_set_section_chunk_size()
139 {
140   g_sectionSegmentPool.setChunkSize(256);
141 }
142 
143 #else
mt_init_receiver_cache()144 void mt_init_receiver_cache(){}
mt_set_section_chunk_size()145 void mt_set_section_chunk_size(){}
146 #endif
147 
148 bool
deliver_signal(SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])149 TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header,
150                                                Uint8 prio,
151                                                Uint32 * const theData,
152                                                LinearSectionPtr ptr[3])
153 {
154 #ifdef NDBD_MULTITHREADED
155   SectionSegmentPool::Cache & cache =
156     g_receiver_thread_cache[m_receiver_thread_idx].cache_instance;
157 #endif
158 
159   const Uint32 secCount = header->m_noOfSections;
160   const Uint32 length = header->theLength;
161 
162   // if this node is not MT LQH then instance bits are stripped at execute
163 
164 #ifdef TRACE_DISTRIBUTED
165   ndbout_c("recv: %s(%d) from (%s, %d)",
166 	   getSignalName(header->theVerId_signalNumber),
167 	   header->theVerId_signalNumber,
168 	   getBlockName(refToBlock(header->theSendersBlockRef)),
169 	   refToNode(header->theSendersBlockRef));
170 #endif
171 
172   bool ok = true;
173   Ptr<SectionSegment> secPtr[3];
174   bzero(secPtr, sizeof(secPtr));
175   secPtr[0].p = secPtr[1].p = secPtr[2].p = 0;
176 
177   ErrorImportActive = true;
178   switch(secCount){
179   case 3:
180     ok &= import(SPC_CACHE_ARG secPtr[2], ptr[2].p, ptr[2].sz);
181   case 2:
182     ok &= import(SPC_CACHE_ARG secPtr[1], ptr[1].p, ptr[1].sz);
183   case 1:
184     ok &= import(SPC_CACHE_ARG secPtr[0], ptr[0].p, ptr[0].sz);
185   }
186   ErrorImportActive = false;
187 
188   /**
189    * Check that we haven't received a too long signal
190    */
191   ok &= (length + secCount <= 25);
192 
193   Uint32 secPtrI[3];
194   if(ok){
195     /**
196      * Normal path
197      */
198     secPtrI[0] = secPtr[0].i;
199     secPtrI[1] = secPtr[1].i;
200     secPtrI[2] = secPtr[2].i;
201 
202 #ifndef NDBD_MULTITHREADED
203     globalScheduler.execute(header, prio, theData, secPtrI);
204 #else
205     if (prio == JBB)
206       sendlocal(m_thr_no /* self */,
207                 header, theData, secPtrI);
208     else
209       sendprioa(m_thr_no /* self */,
210                 header, theData, secPtrI);
211 
212 #endif
213     return false;
214   }
215 
216   /**
217    * Out of memory
218    */
219   for(Uint32 i = 0; i<secCount; i++){
220     if(secPtr[i].p != 0){
221       g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
222                                        relSz(secPtr[i].p->m_sz),
223                                        secPtr[i].i,
224 				       secPtr[i].p->m_lastSegment);
225     }
226   }
227 
228   SignalDroppedRep * rep = (SignalDroppedRep*)theData;
229   Uint32 gsn = header->theVerId_signalNumber;
230   Uint32 len = header->theLength;
231   Uint32 newLen= (len > 22 ? 22 : len);
232   memmove(rep->originalData, theData, (4 * newLen));
233   rep->originalGsn = gsn;
234   rep->originalLength = len;
235   rep->originalSectionCount = secCount;
236   header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
237   header->theLength = newLen + 3;
238   header->m_noOfSections = 0;
239 #ifndef NDBD_MULTITHREADED
240   globalScheduler.execute(header, prio, theData, secPtrI);
241 #else
242   if (prio == JBB)
243     sendlocal(m_thr_no /* self */,
244               header, theData, NULL);
245   else
246     sendprioa(m_thr_no /* self */,
247               header, theData, NULL);
248 #endif
249   return false;
250 }
251 
252 NdbOut &
operator <<(NdbOut & out,const SectionSegment & ss)253 operator<<(NdbOut& out, const SectionSegment & ss){
254   out << "[ last= " << ss.m_lastSegment << " next= " << ss.nextPool << " ]";
255   return out;
256 }
257 
258 void
reportError(NodeId nodeId,TransporterError errorCode,const char * info)259 TransporterReceiveHandleKernel::reportError(NodeId nodeId,
260                                             TransporterError errorCode,
261                                             const char *info)
262 {
263 #ifdef DEBUG_TRANSPORTER
264   ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
265 #endif
266 
267   DBUG_ENTER("reportError");
268   DBUG_PRINT("info",("nodeId %d  errorCode: 0x%x  info: %s",
269 		     nodeId, errorCode, info));
270 
271   switch (errorCode)
272   {
273   case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
274   {
275     char msg[64];
276     BaseString::snprintf(msg, sizeof(msg), "Remote node id %d.%s%s", nodeId,
277 	     info ? " " : "", info ? info : "");
278     ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
279 			       msg, __FILE__, NST_ErrorHandler);
280   }
281   case TE_SIGNAL_LOST:
282   {
283     char msg[64];
284     BaseString::snprintf(msg, sizeof(msg), "Remote node id %d,%s%s", nodeId,
285 	     info ? " " : "", info ? info : "");
286     ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
287 			       msg, __FILE__, NST_ErrorHandler);
288   }
289   case TE_SHM_IPC_PERMANENT:
290   {
291     char msg[128];
292     BaseString::snprintf(msg, sizeof(msg),
293 	     "Remote node id %d.%s%s",
294 	     nodeId, info ? " " : "", info ? info : "");
295     ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
296 			       msg, __FILE__, NST_ErrorHandler);
297   }
298   default:
299     break;
300   }
301 
302   if(errorCode & TE_DO_DISCONNECT){
303     reportDisconnect(nodeId, errorCode);
304   }
305 
306   SignalT<3> signal;
307   memset(&signal.header, 0, sizeof(signal.header));
308 
309 
310   if(errorCode & TE_DO_DISCONNECT)
311     signal.theData[0] = NDB_LE_TransporterError;
312   else
313     signal.theData[0] = NDB_LE_TransporterWarning;
314 
315   signal.theData[1] = nodeId;
316   signal.theData[2] = errorCode;
317 
318   signal.header.theLength = 3;
319   signal.header.theSendersSignalId = 0;
320   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
321   signal.header.theReceiversBlockNumber = CMVMI;
322   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
323 #ifndef NDBD_MULTITHREADED
324   Uint32 secPtr[3];
325   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
326 #else
327   sendprioa(m_thr_no /* self */,
328             &signal.header, signal.theData, NULL);
329 #endif
330 
331   DBUG_VOID_RETURN;
332 }
333 
334 /**
335  * Report average send length in bytes (4096 last sends)
336  */
337 #ifndef NDBD_MULTITHREADED
338 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)339 TransporterCallbackKernelNonMT::reportSendLen(NodeId nodeId, Uint32 count,
340                                               Uint64 bytes)
341 {
342 
343   SignalT<3> signal;
344   memset(&signal.header, 0, sizeof(signal.header));
345 
346   signal.header.theLength = 3;
347   signal.header.theSendersSignalId = 0;
348   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
349   signal.header.theReceiversBlockNumber = CMVMI;
350   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
351 
352   signal.theData[0] = NDB_LE_SendBytesStatistic;
353   signal.theData[1] = nodeId;
354   signal.theData[2] = Uint32(bytes/count);
355 
356   Uint32 secPtr[3];
357   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
358 }
359 #endif
360 
361 /**
362  * Report average receive length in bytes (4096 last receives)
363  */
364 void
reportReceiveLen(NodeId nodeId,Uint32 count,Uint64 bytes)365 TransporterReceiveHandleKernel::reportReceiveLen(NodeId nodeId, Uint32 count,
366                                             Uint64 bytes)
367 {
368 
369   SignalT<3> signal;
370   memset(&signal.header, 0, sizeof(signal.header));
371 
372   signal.header.theLength = 3;
373   signal.header.theSendersSignalId = 0;
374   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
375   signal.header.theReceiversBlockNumber = CMVMI;
376   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
377 
378   signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
379   signal.theData[1] = nodeId;
380   signal.theData[2] = Uint32(bytes/count);
381 #ifndef NDBD_MULTITHREADED
382   Uint32 secPtr[3];
383   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
384 #else
385   sendprioa(m_thr_no /* self */,
386             &signal.header, signal.theData, NULL);
387 #endif
388 }
389 
390 /**
391  * Report connection established
392  */
393 
394 void
reportConnect(NodeId nodeId)395 TransporterReceiveHandleKernel::reportConnect(NodeId nodeId)
396 {
397 
398   SignalT<1> signal;
399   memset(&signal.header, 0, sizeof(signal.header));
400 
401 #ifndef NDBD_MULTITHREADED
402   Uint32 trpman_instance = 1;
403 #else
404   Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
405 #endif
406   signal.header.theLength = 1;
407   signal.header.theSendersSignalId = 0;
408   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
409   signal.header.theReceiversBlockNumber = numberToBlock(TRPMAN,trpman_instance);
410   signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
411 
412   signal.theData[0] = nodeId;
413 
414 #ifndef NDBD_MULTITHREADED
415   Uint32 secPtr[3];
416   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
417 #else
418   /**
419    * The first argument to sendprioa is from which thread number this
420    * signal is sent, it is always sent from a receive thread
421    */
422   sendprioa(m_thr_no /* self */,
423             &signal.header, signal.theData, NULL);
424 #endif
425 }
426 
427 /**
428  * Report connection broken
429  */
430 void
reportDisconnect(NodeId nodeId,Uint32 errNo)431 TransporterReceiveHandleKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
432 {
433   DBUG_ENTER("reportDisconnect");
434 
435   SignalT<sizeof(DisconnectRep)/4> signal;
436   memset(&signal.header, 0, sizeof(signal.header));
437 
438 #ifndef NDBD_MULTITHREADED
439   Uint32 trpman_instance = 1;
440 #else
441   Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
442 #endif
443   signal.header.theLength = DisconnectRep::SignalLength;
444   signal.header.theSendersSignalId = 0;
445   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
446   signal.header.theTrace = TestOrd::TraceDisconnect;
447   signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
448   signal.header.theReceiversBlockNumber = numberToBlock(TRPMAN,trpman_instance);
449 
450   DisconnectRep * rep = CAST_PTR(DisconnectRep, &signal.theData[0]);
451   rep->nodeId = nodeId;
452   rep->err = errNo;
453 
454 #ifndef NDBD_MULTITHREADED
455   Uint32 secPtr[3];
456   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
457 #else
458   sendprioa(m_thr_no /* self */,
459             &signal.header, signal.theData, NULL);
460 #endif
461 
462   DBUG_VOID_RETURN;
463 }
464 
465 void
printSegmentedSection(FILE * output,const SignalHeader & sh,const SegmentedSectionPtr ptr[3],unsigned i)466 SignalLoggerManager::printSegmentedSection(FILE * output,
467                                            const SignalHeader & sh,
468                                            const SegmentedSectionPtr ptr[3],
469                                            unsigned i)
470 {
471   fprintf(output, "SECTION %u type=segmented", i);
472   if (i >= 3) {
473     fprintf(output, " *** invalid ***\n");
474     return;
475   }
476   const Uint32 len = ptr[i].sz;
477   SectionSegment * ssp = ptr[i].p;
478   Uint32 pos = 0;
479   fprintf(output, " size=%u\n", (unsigned)len);
480   while (pos < len) {
481     if (pos > 0 && pos % SectionSegment::DataLength == 0) {
482       ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment);
483     }
484     printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);
485   }
486   if (len > 0)
487     putc('\n', output);
488 }
489 
490 /**
491  * Check to see if jobbbuffers are starting to get full
492  * and if so call doJob
493  */
494 int
checkJobBuffer()495 TransporterReceiveHandleKernel::checkJobBuffer()
496 {
497 #ifndef NDBD_MULTITHREADED
498   return globalScheduler.checkDoJob();
499 #else
500   return mt_checkDoJob(m_receiver_thread_idx);
501 #endif
502 }
503 
504 #ifdef NDBD_MULTITHREADED
505 void
assign_nodes(NodeId * recv_thread_idx_array)506 TransporterReceiveHandleKernel::assign_nodes(NodeId *recv_thread_idx_array)
507 {
508   m_transporters.clear(); /* Clear all first */
509   for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
510   {
511     if (recv_thread_idx_array[nodeId] == m_receiver_thread_idx)
512       m_transporters.set(nodeId); /* Belongs to our receive thread */
513   }
514   return;
515 }
516 #endif
517 
518 void
transporter_recv_from(NodeId nodeId)519 TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
520 {
521   if (globalData.get_hb_count(nodeId) != 0)
522   {
523     globalData.set_hb_count(nodeId) = 0;
524   }
525 }
526 
527 #ifndef NDBD_MULTITHREADED
528 class TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance)529 mt_get_trp_receive_handle(unsigned instance)
530 {
531   assert(instance == 0);
532   return &myTransporterCallback;
533 }
534 #endif
535 
536 /**
537  * #undef is needed since this file is included by TransporterCallback_nonmt.cpp
538  * and TransporterCallback_mt.cpp
539  */
540 #undef JAM_FILE_ID
541