1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include <TransporterRegistry.hpp>
28 #include <FastScheduler.hpp>
29 #include <Emulator.hpp>
30 #include <ErrorHandlingMacros.hpp>
31 
32 #include "LongSignal.hpp"
33 #include "LongSignalImpl.hpp"
34 
35 #include <signaldata/EventReport.hpp>
36 #include <signaldata/TestOrd.hpp>
37 #include <signaldata/SignalDroppedRep.hpp>
38 #include <signaldata/DisconnectRep.hpp>
39 
40 #include "VMSignal.hpp"
41 #include <NdbOut.hpp>
42 #include "TransporterCallbackKernel.hpp"
43 #include <DebuggerNames.hpp>
44 
45 #include <EventLogger.hpp>
46 extern EventLogger * g_eventLogger;
47 
48 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
49 //#define DEBUG_MULTI_TRP 1
50 #endif
51 
52 #ifdef DEBUG_MULTI_TRP
53 #define DEB_MULTI_TRP(arglist) do { g_eventLogger->info arglist ; } while (0)
54 #else
55 #define DEB_MULTI_TRP(arglist) do { } while (0)
56 #endif
57 
58 #define JAM_FILE_ID 226
59 
60 
61 /**
62  * The instance
63  */
64 SectionSegmentPool g_sectionSegmentPool;
65 
66 /* Instance debugging vars
67  * Set from DBTC
68  */
69 Uint32 ErrorSignalReceive= 0;      //Block to inject signal errors into
70 Uint32 ErrorMaxSegmentsToSeize= 0;
71 
72 /**
73  * This variable controls if ErrorSignalReceive/ErrorMaxSegmentsToSeize
74  *   is active...This to make sure only received signals are affected
75  *   and not long signals sent inside node
76  */
77 extern bool ErrorImportActive;
78 
79 struct ConnectionError
80 {
81   enum TransporterError err;
82   const char *text;
83 };
84 
85 static const ConnectionError connectionError[] =
86 {
87   { TE_NO_ERROR, "No error"},
88   { TE_SHM_UNABLE_TO_CREATE_SEGMENT, "Unable to create shared memory segment"},
89   { (enum TransporterError) -1, "No connection error message available (please report a bug)"}
90 };
91 
lookupConnectionError(Uint32 err)92 const char *lookupConnectionError(Uint32 err)
93 {
94   for (Uint32 i = 0; i < NDB_ARRAY_SIZE(connectionError); i++)
95   {
96     if ((Uint32)connectionError[i].err == err)
97     {
98       return connectionError[i].text;
99     }
100   }
101   return "No connection error message available (please report a bug)";
102 }
103 
104 #ifndef NDBD_MULTITHREADED
105 
106 class TransporterCallbackKernelNonMT :
107   public TransporterCallback,
108   public TransporterSendBufferHandle,
109   public TransporterReceiveHandleKernel
110 {
111   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
112 
113 public:
TransporterCallbackKernelNonMT()114   TransporterCallbackKernelNonMT()
115   : m_send_buffers(NULL), m_page_freelist(NULL), m_send_buffer_memory(NULL)
116   {}
117 
118   ~TransporterCallbackKernelNonMT();
119 
120   /**
121    * Allocate send buffer.
122    *
123    * Argument is the value of config parameter TotalSendBufferMemory. If 0,
124    * a default will be used of sum(max send buffer) over all transporters.
125    * The second is the config parameter ExtraSendBufferMemory
126    */
127   void allocate_send_buffers(Uint64 total_send_buffer,
128                              Uint64 extra_send_buffer);
129 
130   /**
131    * Implements TransporterCallback interface:
132    */
133   void enable_send_buffer(NodeId, TrpId);
134   void disable_send_buffer(NodeId, TrpId);
135 
136   Uint32 get_bytes_to_send_iovec(NodeId node_id,
137                                  TrpId trp_id,
138                                  struct iovec *dst,
139                                  Uint32 max);
140   Uint32 bytes_sent(NodeId, TrpId, Uint32 bytes);
141 
142   /**
143    * These are the TransporterSendBufferHandle methods used by the
144    * single-threaded ndbd.
145    */
146   Uint32 *getWritePtr(NodeId,
147                       TrpId,
148                       Uint32 lenBytes,
149                       Uint32 prio,
150                       Uint32 max_use,
151                       SendStatus *error);
152   Uint32 updateWritePtr(NodeId, TrpId, Uint32 lenBytes, Uint32 prio);
153   void getSendBufferLevel(NodeId node, SB_LevelType &level);
154   bool forceSend(NodeId, TrpId);
155 
156 private:
157   /* Send buffer pages. */
158   struct SendBufferPage {
159     /* This is the number of words that will fit in one page of send buffer. */
160     static const Uint32 PGSIZE = 32768;
max_data_bytesTransporterCallbackKernelNonMT::SendBufferPage161     static Uint32 max_data_bytes()
162     {
163       return PGSIZE - offsetof(SendBufferPage, m_data);
164     }
165 
166     /* Send buffer for one transporter is kept in a single-linked list. */
167     struct SendBufferPage *m_next;
168 
169     /* Bytes of send data available in this page. */
170     Uint16 m_bytes;
171     /* Start of unsent data */
172     Uint16 m_start;
173 
174     /* Data; real size is to the end of one page. */
175     char m_data[2];
176   };
177 
178   /* Send buffer for one transporter. */
179   struct SendBuffer {
180     bool m_enabled;
181     /* Total size of data in buffer, from m_offset_start_data to end. */
182     Uint32 m_used_bytes;
183     /* Linked list of active buffer pages with first and last pointer. */
184     SendBufferPage *m_first_page;
185     SendBufferPage *m_last_page;
186   };
187 
188   SendBufferPage *alloc_page();
189   void release_page(SendBufferPage *page);
190   void discard_send_buffer(TrpId trp_id);
191 
192   /* Send buffers. */
193   SendBuffer *m_send_buffers;
194 
195   /* Linked list of free pages. */
196   SendBufferPage *m_page_freelist;
197   /* Original block of memory for pages (so we can free it at exit). */
198   unsigned char *m_send_buffer_memory;
199 
200   Uint64 m_tot_send_buffer_memory;
201   Uint64 m_tot_used_buffer_memory;
202 }; //class TransporterCallbackKernelNonMT
203 
204 static TransporterCallbackKernelNonMT myTransporterCallback;
205 
getNonMTTransporterSendHandle()206 TransporterSendBufferHandle *getNonMTTransporterSendHandle()
207 {
208   return &myTransporterCallback;
209 }
210 
211 TransporterRegistry globalTransporterRegistry(&myTransporterCallback,
212 					      &myTransporterCallback);
213 
214 #else
215 
getNonMTTransporterSendHandle()216 TransporterSendBufferHandle *getNonMTTransporterSendHandle()
217 {
218   return NULL;
219 }
220 #endif // not NDBD_MULTITHREADED
221 
222 #ifdef NDBD_MULTITHREADED
223 static struct ReceiverThreadCache
224 {
225   SectionSegmentPool::Cache cache_instance;
226   char pad[64 - sizeof(SectionSegmentPool::Cache)];
227 } g_receiver_thread_cache[MAX_NDBMT_RECEIVE_THREADS];
228 
229 void
mt_init_receiver_cache()230 mt_init_receiver_cache()
231 {
232   for (unsigned i = 0; i < NDB_ARRAY_SIZE(g_receiver_thread_cache); i++)
233   {
234     g_receiver_thread_cache[i].cache_instance.init_cache(1024,1024);
235   }
236 }
237 
238 void
mt_set_section_chunk_size()239 mt_set_section_chunk_size()
240 {
241   g_sectionSegmentPool.setChunkSize(256);
242 }
243 
244 #else
mt_init_receiver_cache()245 void mt_init_receiver_cache(){}
mt_set_section_chunk_size()246 void mt_set_section_chunk_size(){}
247 #endif
248 
249 bool
deliver_signal(SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])250 TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header,
251                                                Uint8 prio,
252                                                Uint32 * const theData,
253                                                LinearSectionPtr ptr[3])
254 {
255 #ifdef NDBD_MULTITHREADED
256   SectionSegmentPool::Cache & cache =
257     g_receiver_thread_cache[m_receiver_thread_idx].cache_instance;
258 #endif
259 
260   const Uint32 secCount = header->m_noOfSections;
261   const Uint32 length = header->theLength;
262 
263   // if this node is not MT LQH then instance bits are stripped at execute
264 
265 #ifdef TRACE_DISTRIBUTED
266   ndbout_c("recv: %s(%d) from (%s, %d)",
267 	   getSignalName(header->theVerId_signalNumber),
268 	   header->theVerId_signalNumber,
269 	   getBlockName(refToBlock(header->theSendersBlockRef)),
270 	   refToNode(header->theSendersBlockRef));
271 #endif
272 
273   bool ok = true;
274   Ptr<SectionSegment> secPtr[3];
275   bzero(secPtr, sizeof(secPtr));
276   secPtr[0].p = secPtr[1].p = secPtr[2].p = 0;
277 
278 #ifdef NDB_DEBUG_RES_OWNERSHIP
279   /**
280    * Track sections seized as part of receiving signal with
281    * 1 as 'special' block number for receiver
282    */
283   setResOwner(0x1 << 16 | header->theVerId_signalNumber);
284 #endif
285 
286 #if defined(ERROR_INSERT)
287   if (secCount > 0)
288   {
289     const Uint32 receiverBlock = blockToMain(header->theReceiversBlockNumber);
290     if (unlikely(ErrorSignalReceive == receiverBlock))
291     {
292       ErrorImportActive = true;
293     }
294   }
295 #endif
296 
297   switch(secCount){
298   case 3:
299     ok &= import(SPC_CACHE_ARG secPtr[2], ptr[2].p, ptr[2].sz);
300     // Fall through
301   case 2:
302     ok &= import(SPC_CACHE_ARG secPtr[1], ptr[1].p, ptr[1].sz);
303     // Fall through
304   case 1:
305     ok &= import(SPC_CACHE_ARG secPtr[0], ptr[0].p, ptr[0].sz);
306   }
307 #if defined(ERROR_INSERT)
308   ErrorImportActive = false;
309 #endif
310 
311   /**
312    * Check that we haven't received a too long signal
313    */
314   ok &= (length + secCount <= 25);
315 
316   Uint32 secPtrI[3];
317   if(ok){
318     /**
319      * Normal path
320      */
321     secPtrI[0] = secPtr[0].i;
322     secPtrI[1] = secPtr[1].i;
323     secPtrI[2] = secPtr[2].i;
324 
325 #ifndef NDBD_MULTITHREADED
326     globalScheduler.execute(header, prio, theData, secPtrI);
327 #else
328     if (prio == JBB)
329       sendlocal(m_thr_no /* self */,
330                 header, theData, secPtrI);
331     else
332       sendprioa(m_thr_no /* self */,
333                 header, theData, secPtrI);
334 
335 #endif
336     return false;
337   }
338 
339   /**
340    * Out of memory
341    */
342   for(Uint32 i = 0; i<secCount; i++){
343     if(secPtr[i].p != 0){
344       g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
345                                        relSz(secPtr[i].p->m_sz),
346                                        secPtr[i].i,
347 				       secPtr[i].p->m_lastSegment);
348     }
349   }
350 
351   SignalDroppedRep * rep = (SignalDroppedRep*)theData;
352   Uint32 gsn = header->theVerId_signalNumber;
353   Uint32 len = header->theLength;
354   Uint32 newLen= (len > 22 ? 22 : len);
355   memmove(rep->originalData, theData, (4 * newLen));
356   rep->originalGsn = gsn;
357   rep->originalLength = len;
358   rep->originalSectionCount = secCount;
359   header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
360   header->theLength = newLen + 3;
361   header->m_noOfSections = 0;
362 #ifndef NDBD_MULTITHREADED
363   globalScheduler.execute(header, prio, theData, secPtrI);
364 #else
365   if (prio == JBB)
366     sendlocal(m_thr_no /* self */,
367               header, theData, NULL);
368   else
369     sendprioa(m_thr_no /* self */,
370               header, theData, NULL);
371 #endif
372   return false;
373 }
374 
375 NdbOut &
operator <<(NdbOut & out,const SectionSegment & ss)376 operator<<(NdbOut& out, const SectionSegment & ss){
377   out << "[ last= " << ss.m_lastSegment << " next= " << ss.nextPool << " ]";
378   return out;
379 }
380 
381 void
reportError(NodeId nodeId,TransporterError errorCode,const char * info)382 TransporterReceiveHandleKernel::reportError(NodeId nodeId,
383                                             TransporterError errorCode,
384                                             const char *info)
385 {
386 #ifdef DEBUG_TRANSPORTER
387   ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
388 #endif
389 
390   DBUG_ENTER("reportError");
391   DBUG_PRINT("info",("nodeId %d  errorCode: 0x%x  info: %s",
392 		     nodeId, errorCode, info));
393 
394   switch (errorCode)
395   {
396   case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
397   {
398     char msg[64];
399     BaseString::snprintf(msg, sizeof(msg), "Remote node id %d.%s%s", nodeId,
400 	     info ? " " : "", info ? info : "");
401     ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
402 			       msg, __FILE__, NST_ErrorHandler);
403   }
404   case TE_SIGNAL_LOST:
405   {
406     char msg[64];
407     BaseString::snprintf(msg, sizeof(msg), "Remote node id %d,%s%s", nodeId,
408 	     info ? " " : "", info ? info : "");
409     ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
410 			       msg, __FILE__, NST_ErrorHandler);
411   }
412   case TE_SHM_IPC_PERMANENT:
413   {
414     char msg[128];
415     BaseString::snprintf(msg, sizeof(msg),
416 	     "Remote node id %d.%s%s",
417 	     nodeId, info ? " " : "", info ? info : "");
418     ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
419 			       msg, __FILE__, NST_ErrorHandler);
420   }
421   default:
422     break;
423   }
424 
425   if(errorCode & TE_DO_DISCONNECT){
426     reportDisconnect(nodeId, errorCode);
427   }
428 
429   SignalT<3> signal;
430   memset(&signal.header, 0, sizeof(signal.header));
431 
432 
433   if(errorCode & TE_DO_DISCONNECT)
434     signal.theData[0] = NDB_LE_TransporterError;
435   else
436     signal.theData[0] = NDB_LE_TransporterWarning;
437 
438   signal.theData[1] = nodeId;
439   signal.theData[2] = errorCode;
440 
441   signal.header.theLength = 3;
442   signal.header.theSendersSignalId = 0;
443   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
444   signal.header.theReceiversBlockNumber = CMVMI;
445   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
446 #ifndef NDBD_MULTITHREADED
447   Uint32 secPtr[3];
448   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
449 #else
450   sendprioa(m_thr_no /* self */,
451             &signal.header, signal.theData, NULL);
452 #endif
453 
454   DBUG_VOID_RETURN;
455 }
456 
457 /**
458  * Report average send length in bytes (4096 last sends)
459  */
460 #ifndef NDBD_MULTITHREADED
~TransporterCallbackKernelNonMT()461 TransporterCallbackKernelNonMT::~TransporterCallbackKernelNonMT()
462 {
463    m_page_freelist = NULL;
464    delete[] m_send_buffers;
465    delete[] m_send_buffer_memory;
466 }
467 
468 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)469 TransporterCallbackKernelNonMT::reportSendLen(NodeId nodeId, Uint32 count,
470                                               Uint64 bytes)
471 {
472   SignalT<3> signal;
473   memset(&signal.header, 0, sizeof(signal.header));
474 
475   signal.header.theLength = 3;
476   signal.header.theSendersSignalId = 0;
477   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
478   signal.header.theReceiversBlockNumber = CMVMI;
479   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
480 
481   signal.theData[0] = NDB_LE_SendBytesStatistic;
482   signal.theData[1] = nodeId;
483   signal.theData[2] = Uint32(bytes/count);
484 
485   Uint32 secPtr[3];
486   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
487 }
488 
489 
490 #define MIN_SEND_BUFFER_SIZE (4 * 1024 * 1024)
491 
492 void
allocate_send_buffers(Uint64 total_send_buffer,Uint64 extra_send_buffer)493 TransporterCallbackKernelNonMT::allocate_send_buffers(
494                                            Uint64 total_send_buffer,
495                                            Uint64 extra_send_buffer)
496 {
497   const int maxTransporters = MAX_NTRANSPORTERS;
498   const int nTransporters   = globalTransporterRegistry.get_transporter_count();
499 
500   if (total_send_buffer == 0)
501     total_send_buffer = globalTransporterRegistry.get_total_max_send_buffer();
502 
503   total_send_buffer += extra_send_buffer;
504 
505   if (!extra_send_buffer)
506   {
507     /**
508      * If extra send buffer memory is 0 it means we can decide on an
509      * appropriate value for it. We select to always ensure that the
510      * minimum send buffer memory is 4M, otherwise we simply don't
511      * add any extra send buffer memory at all.
512      */
513     if (total_send_buffer < MIN_SEND_BUFFER_SIZE)
514     {
515       total_send_buffer = (Uint64)MIN_SEND_BUFFER_SIZE;
516     }
517   }
518 
519   if (m_send_buffers)
520   {
521     /* Send buffers already allocated -> resize the buffer pages */
522     assert(m_send_buffer_memory);
523 
524     // TODO resize send buffer pages
525 
526     return;
527   }
528 
529   /**
530    * Initialize transporter send buffers (initially empty).
531    * (Sparesely populated array of 'nTransporters')
532    */
533   assert(nTransporters <= maxTransporters);
534   m_send_buffers = new SendBuffer[maxTransporters];
535   for (int i = 0; i < maxTransporters; i++)
536   {
537     SendBuffer &b = m_send_buffers[i];
538     b.m_first_page = NULL;
539     b.m_last_page = NULL;
540     b.m_used_bytes = 0;
541     b.m_enabled = false;
542   }
543 
544   /* Initialize the page freelist. */
545   Uint64 send_buffer_pages =
546     (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
547   /* Add one extra page of internal fragmentation overhead per transporter. */
548   send_buffer_pages += nTransporters;
549 
550   m_send_buffer_memory =
551     new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
552   if (m_send_buffer_memory == NULL)
553   {
554     ndbout << "Unable to allocate "
555            << send_buffer_pages * SendBufferPage::PGSIZE
556            << " bytes of memory for send buffers, aborting." << endl;
557     abort();
558   }
559 
560   m_page_freelist = NULL;
561   for (unsigned i = 0; i < send_buffer_pages; i++)
562   {
563     SendBufferPage *page =
564       (SendBufferPage *)(m_send_buffer_memory + i * SendBufferPage::PGSIZE);
565     page->m_bytes = 0;
566     page->m_next = m_page_freelist;
567     m_page_freelist = page;
568   }
569   m_tot_send_buffer_memory = SendBufferPage::PGSIZE * send_buffer_pages;
570   m_tot_used_buffer_memory = 0;
571 }
572 
573 TransporterCallbackKernelNonMT::SendBufferPage *
alloc_page()574 TransporterCallbackKernelNonMT::alloc_page()
575 {
576   SendBufferPage *page = m_page_freelist;
577   if (page != NULL)
578   {
579     m_tot_used_buffer_memory += SendBufferPage::PGSIZE;
580     m_page_freelist = page->m_next;
581     return page;
582   }
583 
584   ndbout << "ERROR: out of send buffers in kernel." << endl;
585   return NULL;
586 }
587 
588 void
release_page(SendBufferPage * page)589 TransporterCallbackKernelNonMT::release_page(SendBufferPage *page)
590 {
591   assert(page != NULL);
592   page->m_next = m_page_freelist;
593   m_tot_used_buffer_memory -= SendBufferPage::PGSIZE;
594   m_page_freelist = page;
595 }
596 
597 Uint32
get_bytes_to_send_iovec(NodeId node,TrpId trp_id,struct iovec * dst,Uint32 max)598 TransporterCallbackKernelNonMT::get_bytes_to_send_iovec(NodeId node,
599                                                         TrpId trp_id,
600                                                         struct iovec *dst,
601                                                         Uint32 max)
602 {
603   (void)node;
604   SendBuffer *b = m_send_buffers + trp_id;
605 
606   if (unlikely(!b->m_enabled))
607   {
608     discard_send_buffer(trp_id);
609     return 0;
610   }
611   if (unlikely(max == 0))
612     return 0;
613 
614   Uint32 count = 0;
615   SendBufferPage *page = b->m_first_page;
616   while (page != NULL && count < max)
617   {
618     dst[count].iov_base = page->m_data+page->m_start;
619     dst[count].iov_len = page->m_bytes;
620     assert(page->m_start + page->m_bytes <= page->max_data_bytes());
621     page = page->m_next;
622     count++;
623   }
624 
625   return count;
626 }
627 
628 Uint32
bytes_sent(NodeId nodeId,TrpId trp_id,Uint32 bytes)629 TransporterCallbackKernelNonMT::bytes_sent(NodeId nodeId,
630                                            TrpId trp_id,
631                                            Uint32 bytes)
632 {
633   (void)nodeId;
634   SendBuffer *b = m_send_buffers + trp_id;
635   Uint32 used_bytes = b->m_used_bytes;
636 
637   if (bytes == 0)
638     return used_bytes;
639 
640   used_bytes -= bytes;
641   b->m_used_bytes = used_bytes;
642 
643   SendBufferPage *page = b->m_first_page;
644   while (bytes && bytes >= page->m_bytes)
645   {
646     SendBufferPage * tmp = page;
647     bytes -= page->m_bytes;
648     page = page->m_next;
649     release_page(tmp);
650   }
651 
652   if (used_bytes == 0)
653   {
654     b->m_first_page = 0;
655     b->m_last_page = 0;
656   }
657   else
658   {
659     page->m_start += bytes;
660     page->m_bytes -= bytes;
661     assert(page->m_start + page->m_bytes <= page->max_data_bytes());
662     b->m_first_page = page;
663   }
664 
665   return used_bytes;
666 }
667 
668 void
enable_send_buffer(NodeId nodeId,TrpId trp_id)669 TransporterCallbackKernelNonMT::enable_send_buffer(NodeId nodeId, TrpId trp_id)
670 {
671   (void)nodeId;
672   SendBuffer *b = m_send_buffers + trp_id;
673   assert(b->m_enabled == false);
674   assert(b->m_first_page == NULL);  //Disabled buffer is empty
675   b->m_enabled = true;
676 }
677 
678 void
disable_send_buffer(NodeId nodeId,TrpId trp_id)679 TransporterCallbackKernelNonMT::disable_send_buffer(NodeId nodeId,
680                                                     TrpId trp_id)
681 {
682   (void)nodeId;
683   SendBuffer *b = m_send_buffers + trp_id;
684   b->m_enabled = false;
685   discard_send_buffer(trp_id);
686 }
687 
688 void
discard_send_buffer(TrpId trp_id)689 TransporterCallbackKernelNonMT::discard_send_buffer(TrpId trp_id)
690 {
691   SendBuffer *b = m_send_buffers + trp_id;
692   SendBufferPage *page = b->m_first_page;
693   while (page != NULL)
694   {
695     SendBufferPage *next = page->m_next;
696     release_page(page);
697     page = next;
698   }
699   b->m_first_page = NULL;
700   b->m_last_page = NULL;
701   b->m_used_bytes = 0;
702 }
703 
704 /**
705  * These are the TransporterSendBufferHandle methods used by the
706  * single-threaded ndbd.
707  */
708 Uint32 *
getWritePtr(NodeId nodeId,TrpId trp_id,Uint32 lenBytes,Uint32 prio,Uint32 max_use,SendStatus * error)709 TransporterCallbackKernelNonMT::getWritePtr(NodeId nodeId,
710                                             TrpId trp_id,
711                                             Uint32 lenBytes,
712                                             Uint32 prio,
713                                             Uint32 max_use,
714                                             SendStatus *error)
715 {
716   (void)nodeId;
717   SendBuffer *b = m_send_buffers + trp_id;
718 
719   /* First check if we have room in already allocated page. */
720   SendBufferPage *page = b->m_last_page;
721   if (page != NULL &&
722       page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
723   {
724     return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
725   }
726 
727   if (unlikely(b->m_used_bytes + lenBytes > max_use))
728   {
729     *error = SEND_BUFFER_FULL;
730     return NULL;
731   }
732 
733   if (unlikely(lenBytes > SendBufferPage::max_data_bytes()))
734   {
735     *error = SEND_MESSAGE_TOO_BIG;
736     return NULL;
737   }
738 
739   /* Allocate a new page. */
740   page = alloc_page();
741   if (unlikely(page == NULL))
742   {
743     *error = SEND_BUFFER_FULL;
744     return NULL;
745   }
746   page->m_next = NULL;
747   page->m_bytes = 0;
748   page->m_start = 0;
749 
750   if (b->m_last_page == NULL)
751   {
752     b->m_first_page = page;
753     b->m_last_page = page;
754   }
755   else
756   {
757     assert(b->m_first_page != NULL);
758     b->m_last_page->m_next = page;
759     b->m_last_page = page;
760   }
761   return (Uint32 *)(page->m_data);
762 }
763 
764 Uint32
updateWritePtr(NodeId nodeId,TrpId trp_id,Uint32 lenBytes,Uint32 prio)765 TransporterCallbackKernelNonMT::updateWritePtr(NodeId nodeId,
766                                                TrpId trp_id,
767                                                Uint32 lenBytes,
768                                                Uint32 prio)
769 {
770   (void)nodeId;
771   SendBuffer *b = m_send_buffers + trp_id;
772   SendBufferPage *page = b->m_last_page;
773   assert(page != NULL);
774   assert(page->m_bytes + lenBytes <= page->max_data_bytes());
775   page->m_bytes += lenBytes;
776   b->m_used_bytes += lenBytes;
777   return b->m_used_bytes;
778 }
779 
780 /**
781  * This is used by the ndbd, so here only one thread is using this, so
782  * values will always be consistent.
783  */
784 void
getSendBufferLevel(NodeId nodeId,SB_LevelType & level)785 TransporterCallbackKernelNonMT::getSendBufferLevel(NodeId nodeId,
786                                                    SB_LevelType &level)
787 {
788   TrpId trp_ids;
789   Uint32 num_ids;
790   globalTransporterRegistry.get_trps_for_node(nodeId,
791                                               &trp_ids,
792                                               num_ids,
793                                               1);
794   SendBuffer *b = m_send_buffers + trp_ids;
795   calculate_send_buffer_level(b->m_used_bytes,
796                               m_tot_send_buffer_memory,
797                               m_tot_used_buffer_memory,
798                               0,
799                               level);
800   return;
801 }
802 
803 bool
forceSend(NodeId nodeId,TrpId trp_id)804 TransporterCallbackKernelNonMT::forceSend(NodeId nodeId, TrpId trp_id)
805 {
806   (void)nodeId;
807   return globalTransporterRegistry.performSend(trp_id);
808 }
809 
810 #endif //'not NDBD_MULTITHREADED'
811 
812 /**
813  * Report average receive length in bytes (4096 last receives)
814  */
815 void
reportReceiveLen(NodeId nodeId,Uint32 count,Uint64 bytes)816 TransporterReceiveHandleKernel::reportReceiveLen(NodeId nodeId, Uint32 count,
817                                             Uint64 bytes)
818 {
819 
820   SignalT<3> signal;
821   memset(&signal.header, 0, sizeof(signal.header));
822 
823   signal.header.theLength = 3;
824   signal.header.theSendersSignalId = 0;
825   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
826   signal.header.theReceiversBlockNumber = CMVMI;
827   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
828 
829   signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
830   signal.theData[1] = nodeId;
831   signal.theData[2] = Uint32(bytes/count);
832 #ifndef NDBD_MULTITHREADED
833   Uint32 secPtr[3];
834   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
835 #else
836   sendprioa(m_thr_no /* self */,
837             &signal.header, signal.theData, NULL);
838 #endif
839 }
840 
841 /**
842  * Report connection established
843  */
844 
845 void
reportConnect(NodeId nodeId)846 TransporterReceiveHandleKernel::reportConnect(NodeId nodeId)
847 {
848 
849   SignalT<1> signal;
850   memset(&signal.header, 0, sizeof(signal.header));
851 
852 #ifndef NDBD_MULTITHREADED
853   Uint32 trpman_instance = 1;
854 #else
855   Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
856 #endif
857   signal.header.theLength = 1;
858   signal.header.theSendersSignalId = 0;
859   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
860   signal.header.theReceiversBlockNumber = numberToBlock(TRPMAN,trpman_instance);
861   signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
862 
863   signal.theData[0] = nodeId;
864 
865 #ifndef NDBD_MULTITHREADED
866   Uint32 secPtr[3];
867   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
868 #else
869   /**
870    * The first argument to sendprioa is from which thread number this
871    * signal is sent, it is always sent from a receive thread
872    */
873   sendprioa(m_thr_no /* self */,
874             &signal.header, signal.theData, NULL);
875 #endif
876 }
877 
878 /**
879  * Report connection broken
880  */
881 void
reportDisconnect(NodeId nodeId,Uint32 errNo)882 TransporterReceiveHandleKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
883 {
884   DBUG_ENTER("reportDisconnect");
885 
886   SignalT<DisconnectRep::SignalLength> signal;
887   memset(&signal.header, 0, sizeof(signal.header));
888 
889 #ifndef NDBD_MULTITHREADED
890   Uint32 trpman_instance = 1;
891 #else
892   Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
893 #endif
894   signal.header.theLength = DisconnectRep::SignalLength;
895   signal.header.theSendersSignalId = 0;
896   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
897   signal.header.theTrace = TestOrd::TraceDisconnect;
898   signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
899   signal.header.theReceiversBlockNumber = numberToBlock(TRPMAN,trpman_instance);
900 
901   DisconnectRep * rep = CAST_PTR(DisconnectRep, &signal.theData[0]);
902   rep->nodeId = nodeId;
903   rep->err = errNo;
904 
905 #ifndef NDBD_MULTITHREADED
906   Uint32 secPtr[3];
907   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
908 #else
909   sendprioa(m_thr_no /* self */,
910             &signal.header, signal.theData, NULL);
911 #endif
912 
913   DBUG_VOID_RETURN;
914 }
915 
916 void
printSegmentedSection(FILE * output,const SignalHeader & sh,const SegmentedSectionPtr ptr[3],unsigned i)917 SignalLoggerManager::printSegmentedSection(FILE * output,
918                                            const SignalHeader & sh,
919                                            const SegmentedSectionPtr ptr[3],
920                                            unsigned i)
921 {
922   fprintf(output, "SECTION %u type=segmented", i);
923   if (i >= 3) {
924     fprintf(output, " *** invalid ***\n");
925     return;
926   }
927   const Uint32 len = ptr[i].sz;
928   SectionSegment * ssp = ptr[i].p;
929   Uint32 pos = 0;
930   fprintf(output, " size=%u\n", (unsigned)len);
931   while (pos < len) {
932     if (pos > 0 && pos % SectionSegment::DataLength == 0) {
933       ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment);
934     }
935     printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);
936   }
937   if (len > 0)
938     putc('\n', output);
939 }
940 
941 /**
942  * Check to see if jobbbuffers are starting to get full
943  * and if so call doJob
944  */
945 int
checkJobBuffer()946 TransporterReceiveHandleKernel::checkJobBuffer()
947 {
948 #ifndef NDBD_MULTITHREADED
949   return globalScheduler.checkDoJob();
950 #else
951   return mt_checkDoJob(m_receiver_thread_idx);
952 #endif
953 }
954 
955 #ifdef NDBD_MULTITHREADED
956 void
assign_trps(Uint32 * recv_thread_idx_array)957 TransporterReceiveHandleKernel::assign_trps(Uint32 *recv_thread_idx_array)
958 {
959   m_transporters.clear(); /* Clear all first */
960   for (Uint32 trp_id = 1; trp_id < MAX_NTRANSPORTERS; trp_id++)
961   {
962     if (recv_thread_idx_array[trp_id] == m_receiver_thread_idx)
963     {
964       DEB_MULTI_TRP(("trp_id %u assigned to recv thread %u",
965                      trp_id, m_receiver_thread_idx));
966       m_transporters.set(trp_id); /* Belongs to our receive thread */
967     }
968   }
969   return;
970 }
971 #endif
972 
973 void
transporter_recv_from(NodeId nodeId)974 TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
975 {
976   if (globalData.get_hb_count(nodeId) != 0)
977   {
978     globalData.set_hb_count(nodeId) = 0;
979   }
980 }
981 
982 #ifndef NDBD_MULTITHREADED
983 class TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance)984 mt_get_trp_receive_handle(unsigned instance)
985 {
986   assert(instance == 0);
987   return &myTransporterCallback;
988 }
989 #endif
990 
991 /**
992  * #undef is needed since this file is included by TransporterCallback_nonmt.cpp
993  * and TransporterCallback_mt.cpp
994  */
995 #undef JAM_FILE_ID
996