1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include <TransporterRegistry.hpp>
28 #include <FastScheduler.hpp>
29 #include <Emulator.hpp>
30 #include <ErrorHandlingMacros.hpp>
31
32 #include "LongSignal.hpp"
33 #include "LongSignalImpl.hpp"
34
35 #include <signaldata/EventReport.hpp>
36 #include <signaldata/TestOrd.hpp>
37 #include <signaldata/SignalDroppedRep.hpp>
38 #include <signaldata/DisconnectRep.hpp>
39
40 #include "VMSignal.hpp"
41 #include <NdbOut.hpp>
42 #include "TransporterCallbackKernel.hpp"
43 #include <DebuggerNames.hpp>
44
45 #include <EventLogger.hpp>
46 extern EventLogger * g_eventLogger;
47
48 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
49 //#define DEBUG_MULTI_TRP 1
50 #endif
51
52 #ifdef DEBUG_MULTI_TRP
53 #define DEB_MULTI_TRP(arglist) do { g_eventLogger->info arglist ; } while (0)
54 #else
55 #define DEB_MULTI_TRP(arglist) do { } while (0)
56 #endif
57
58 #define JAM_FILE_ID 226
59
60
61 /**
62 * The instance
63 */
64 SectionSegmentPool g_sectionSegmentPool;
65
66 /* Instance debugging vars
67 * Set from DBTC
68 */
69 Uint32 ErrorSignalReceive= 0; //Block to inject signal errors into
70 Uint32 ErrorMaxSegmentsToSeize= 0;
71
72 /**
73 * This variable controls if ErrorSignalReceive/ErrorMaxSegmentsToSeize
74 * is active...This to make sure only received signals are affected
75 * and not long signals sent inside node
76 */
77 extern bool ErrorImportActive;
78
79 struct ConnectionError
80 {
81 enum TransporterError err;
82 const char *text;
83 };
84
85 static const ConnectionError connectionError[] =
86 {
87 { TE_NO_ERROR, "No error"},
88 { TE_SHM_UNABLE_TO_CREATE_SEGMENT, "Unable to create shared memory segment"},
89 { (enum TransporterError) -1, "No connection error message available (please report a bug)"}
90 };
91
lookupConnectionError(Uint32 err)92 const char *lookupConnectionError(Uint32 err)
93 {
94 for (Uint32 i = 0; i < NDB_ARRAY_SIZE(connectionError); i++)
95 {
96 if ((Uint32)connectionError[i].err == err)
97 {
98 return connectionError[i].text;
99 }
100 }
101 return "No connection error message available (please report a bug)";
102 }
103
104 #ifndef NDBD_MULTITHREADED
105
106 class TransporterCallbackKernelNonMT :
107 public TransporterCallback,
108 public TransporterSendBufferHandle,
109 public TransporterReceiveHandleKernel
110 {
111 void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
112
113 public:
TransporterCallbackKernelNonMT()114 TransporterCallbackKernelNonMT()
115 : m_send_buffers(NULL), m_page_freelist(NULL), m_send_buffer_memory(NULL)
116 {}
117
118 ~TransporterCallbackKernelNonMT();
119
120 /**
121 * Allocate send buffer.
122 *
123 * Argument is the value of config parameter TotalSendBufferMemory. If 0,
124 * a default will be used of sum(max send buffer) over all transporters.
125 * The second is the config parameter ExtraSendBufferMemory
126 */
127 void allocate_send_buffers(Uint64 total_send_buffer,
128 Uint64 extra_send_buffer);
129
130 /**
131 * Implements TransporterCallback interface:
132 */
133 void enable_send_buffer(NodeId, TrpId);
134 void disable_send_buffer(NodeId, TrpId);
135
136 Uint32 get_bytes_to_send_iovec(NodeId node_id,
137 TrpId trp_id,
138 struct iovec *dst,
139 Uint32 max);
140 Uint32 bytes_sent(NodeId, TrpId, Uint32 bytes);
141
142 /**
143 * These are the TransporterSendBufferHandle methods used by the
144 * single-threaded ndbd.
145 */
146 Uint32 *getWritePtr(NodeId,
147 TrpId,
148 Uint32 lenBytes,
149 Uint32 prio,
150 Uint32 max_use,
151 SendStatus *error);
152 Uint32 updateWritePtr(NodeId, TrpId, Uint32 lenBytes, Uint32 prio);
153 void getSendBufferLevel(NodeId node, SB_LevelType &level);
154 bool forceSend(NodeId, TrpId);
155
156 private:
157 /* Send buffer pages. */
158 struct SendBufferPage {
159 /* This is the number of words that will fit in one page of send buffer. */
160 static const Uint32 PGSIZE = 32768;
max_data_bytesTransporterCallbackKernelNonMT::SendBufferPage161 static Uint32 max_data_bytes()
162 {
163 return PGSIZE - offsetof(SendBufferPage, m_data);
164 }
165
166 /* Send buffer for one transporter is kept in a single-linked list. */
167 struct SendBufferPage *m_next;
168
169 /* Bytes of send data available in this page. */
170 Uint16 m_bytes;
171 /* Start of unsent data */
172 Uint16 m_start;
173
174 /* Data; real size is to the end of one page. */
175 char m_data[2];
176 };
177
178 /* Send buffer for one transporter. */
179 struct SendBuffer {
180 bool m_enabled;
181 /* Total size of data in buffer, from m_offset_start_data to end. */
182 Uint32 m_used_bytes;
183 /* Linked list of active buffer pages with first and last pointer. */
184 SendBufferPage *m_first_page;
185 SendBufferPage *m_last_page;
186 };
187
188 SendBufferPage *alloc_page();
189 void release_page(SendBufferPage *page);
190 void discard_send_buffer(TrpId trp_id);
191
192 /* Send buffers. */
193 SendBuffer *m_send_buffers;
194
195 /* Linked list of free pages. */
196 SendBufferPage *m_page_freelist;
197 /* Original block of memory for pages (so we can free it at exit). */
198 unsigned char *m_send_buffer_memory;
199
200 Uint64 m_tot_send_buffer_memory;
201 Uint64 m_tot_used_buffer_memory;
202 }; //class TransporterCallbackKernelNonMT
203
204 static TransporterCallbackKernelNonMT myTransporterCallback;
205
getNonMTTransporterSendHandle()206 TransporterSendBufferHandle *getNonMTTransporterSendHandle()
207 {
208 return &myTransporterCallback;
209 }
210
211 TransporterRegistry globalTransporterRegistry(&myTransporterCallback,
212 &myTransporterCallback);
213
214 #else
215
getNonMTTransporterSendHandle()216 TransporterSendBufferHandle *getNonMTTransporterSendHandle()
217 {
218 return NULL;
219 }
220 #endif // not NDBD_MULTITHREADED
221
222 #ifdef NDBD_MULTITHREADED
223 static struct ReceiverThreadCache
224 {
225 SectionSegmentPool::Cache cache_instance;
226 char pad[64 - sizeof(SectionSegmentPool::Cache)];
227 } g_receiver_thread_cache[MAX_NDBMT_RECEIVE_THREADS];
228
229 void
mt_init_receiver_cache()230 mt_init_receiver_cache()
231 {
232 for (unsigned i = 0; i < NDB_ARRAY_SIZE(g_receiver_thread_cache); i++)
233 {
234 g_receiver_thread_cache[i].cache_instance.init_cache(1024,1024);
235 }
236 }
237
238 void
mt_set_section_chunk_size()239 mt_set_section_chunk_size()
240 {
241 g_sectionSegmentPool.setChunkSize(256);
242 }
243
244 #else
mt_init_receiver_cache()245 void mt_init_receiver_cache(){}
mt_set_section_chunk_size()246 void mt_set_section_chunk_size(){}
247 #endif
248
249 bool
deliver_signal(SignalHeader * const header,Uint8 prio,Uint32 * const theData,LinearSectionPtr ptr[3])250 TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header,
251 Uint8 prio,
252 Uint32 * const theData,
253 LinearSectionPtr ptr[3])
254 {
255 #ifdef NDBD_MULTITHREADED
256 SectionSegmentPool::Cache & cache =
257 g_receiver_thread_cache[m_receiver_thread_idx].cache_instance;
258 #endif
259
260 const Uint32 secCount = header->m_noOfSections;
261 const Uint32 length = header->theLength;
262
263 // if this node is not MT LQH then instance bits are stripped at execute
264
265 #ifdef TRACE_DISTRIBUTED
266 ndbout_c("recv: %s(%d) from (%s, %d)",
267 getSignalName(header->theVerId_signalNumber),
268 header->theVerId_signalNumber,
269 getBlockName(refToBlock(header->theSendersBlockRef)),
270 refToNode(header->theSendersBlockRef));
271 #endif
272
273 bool ok = true;
274 Ptr<SectionSegment> secPtr[3];
275 bzero(secPtr, sizeof(secPtr));
276 secPtr[0].p = secPtr[1].p = secPtr[2].p = 0;
277
278 #ifdef NDB_DEBUG_RES_OWNERSHIP
279 /**
280 * Track sections seized as part of receiving signal with
281 * 1 as 'special' block number for receiver
282 */
283 setResOwner(0x1 << 16 | header->theVerId_signalNumber);
284 #endif
285
286 #if defined(ERROR_INSERT)
287 if (secCount > 0)
288 {
289 const Uint32 receiverBlock = blockToMain(header->theReceiversBlockNumber);
290 if (unlikely(ErrorSignalReceive == receiverBlock))
291 {
292 ErrorImportActive = true;
293 }
294 }
295 #endif
296
297 switch(secCount){
298 case 3:
299 ok &= import(SPC_CACHE_ARG secPtr[2], ptr[2].p, ptr[2].sz);
300 // Fall through
301 case 2:
302 ok &= import(SPC_CACHE_ARG secPtr[1], ptr[1].p, ptr[1].sz);
303 // Fall through
304 case 1:
305 ok &= import(SPC_CACHE_ARG secPtr[0], ptr[0].p, ptr[0].sz);
306 }
307 #if defined(ERROR_INSERT)
308 ErrorImportActive = false;
309 #endif
310
311 /**
312 * Check that we haven't received a too long signal
313 */
314 ok &= (length + secCount <= 25);
315
316 Uint32 secPtrI[3];
317 if(ok){
318 /**
319 * Normal path
320 */
321 secPtrI[0] = secPtr[0].i;
322 secPtrI[1] = secPtr[1].i;
323 secPtrI[2] = secPtr[2].i;
324
325 #ifndef NDBD_MULTITHREADED
326 globalScheduler.execute(header, prio, theData, secPtrI);
327 #else
328 if (prio == JBB)
329 sendlocal(m_thr_no /* self */,
330 header, theData, secPtrI);
331 else
332 sendprioa(m_thr_no /* self */,
333 header, theData, secPtrI);
334
335 #endif
336 return false;
337 }
338
339 /**
340 * Out of memory
341 */
342 for(Uint32 i = 0; i<secCount; i++){
343 if(secPtr[i].p != 0){
344 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
345 relSz(secPtr[i].p->m_sz),
346 secPtr[i].i,
347 secPtr[i].p->m_lastSegment);
348 }
349 }
350
351 SignalDroppedRep * rep = (SignalDroppedRep*)theData;
352 Uint32 gsn = header->theVerId_signalNumber;
353 Uint32 len = header->theLength;
354 Uint32 newLen= (len > 22 ? 22 : len);
355 memmove(rep->originalData, theData, (4 * newLen));
356 rep->originalGsn = gsn;
357 rep->originalLength = len;
358 rep->originalSectionCount = secCount;
359 header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
360 header->theLength = newLen + 3;
361 header->m_noOfSections = 0;
362 #ifndef NDBD_MULTITHREADED
363 globalScheduler.execute(header, prio, theData, secPtrI);
364 #else
365 if (prio == JBB)
366 sendlocal(m_thr_no /* self */,
367 header, theData, NULL);
368 else
369 sendprioa(m_thr_no /* self */,
370 header, theData, NULL);
371 #endif
372 return false;
373 }
374
375 NdbOut &
operator <<(NdbOut & out,const SectionSegment & ss)376 operator<<(NdbOut& out, const SectionSegment & ss){
377 out << "[ last= " << ss.m_lastSegment << " next= " << ss.nextPool << " ]";
378 return out;
379 }
380
381 void
reportError(NodeId nodeId,TransporterError errorCode,const char * info)382 TransporterReceiveHandleKernel::reportError(NodeId nodeId,
383 TransporterError errorCode,
384 const char *info)
385 {
386 #ifdef DEBUG_TRANSPORTER
387 ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
388 #endif
389
390 DBUG_ENTER("reportError");
391 DBUG_PRINT("info",("nodeId %d errorCode: 0x%x info: %s",
392 nodeId, errorCode, info));
393
394 switch (errorCode)
395 {
396 case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
397 {
398 char msg[64];
399 BaseString::snprintf(msg, sizeof(msg), "Remote node id %d.%s%s", nodeId,
400 info ? " " : "", info ? info : "");
401 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
402 msg, __FILE__, NST_ErrorHandler);
403 }
404 case TE_SIGNAL_LOST:
405 {
406 char msg[64];
407 BaseString::snprintf(msg, sizeof(msg), "Remote node id %d,%s%s", nodeId,
408 info ? " " : "", info ? info : "");
409 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
410 msg, __FILE__, NST_ErrorHandler);
411 }
412 case TE_SHM_IPC_PERMANENT:
413 {
414 char msg[128];
415 BaseString::snprintf(msg, sizeof(msg),
416 "Remote node id %d.%s%s",
417 nodeId, info ? " " : "", info ? info : "");
418 ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
419 msg, __FILE__, NST_ErrorHandler);
420 }
421 default:
422 break;
423 }
424
425 if(errorCode & TE_DO_DISCONNECT){
426 reportDisconnect(nodeId, errorCode);
427 }
428
429 SignalT<3> signal;
430 memset(&signal.header, 0, sizeof(signal.header));
431
432
433 if(errorCode & TE_DO_DISCONNECT)
434 signal.theData[0] = NDB_LE_TransporterError;
435 else
436 signal.theData[0] = NDB_LE_TransporterWarning;
437
438 signal.theData[1] = nodeId;
439 signal.theData[2] = errorCode;
440
441 signal.header.theLength = 3;
442 signal.header.theSendersSignalId = 0;
443 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
444 signal.header.theReceiversBlockNumber = CMVMI;
445 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
446 #ifndef NDBD_MULTITHREADED
447 Uint32 secPtr[3];
448 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
449 #else
450 sendprioa(m_thr_no /* self */,
451 &signal.header, signal.theData, NULL);
452 #endif
453
454 DBUG_VOID_RETURN;
455 }
456
457 /**
458 * Report average send length in bytes (4096 last sends)
459 */
460 #ifndef NDBD_MULTITHREADED
~TransporterCallbackKernelNonMT()461 TransporterCallbackKernelNonMT::~TransporterCallbackKernelNonMT()
462 {
463 m_page_freelist = NULL;
464 delete[] m_send_buffers;
465 delete[] m_send_buffer_memory;
466 }
467
468 void
reportSendLen(NodeId nodeId,Uint32 count,Uint64 bytes)469 TransporterCallbackKernelNonMT::reportSendLen(NodeId nodeId, Uint32 count,
470 Uint64 bytes)
471 {
472 SignalT<3> signal;
473 memset(&signal.header, 0, sizeof(signal.header));
474
475 signal.header.theLength = 3;
476 signal.header.theSendersSignalId = 0;
477 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
478 signal.header.theReceiversBlockNumber = CMVMI;
479 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
480
481 signal.theData[0] = NDB_LE_SendBytesStatistic;
482 signal.theData[1] = nodeId;
483 signal.theData[2] = Uint32(bytes/count);
484
485 Uint32 secPtr[3];
486 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
487 }
488
489
490 #define MIN_SEND_BUFFER_SIZE (4 * 1024 * 1024)
491
492 void
allocate_send_buffers(Uint64 total_send_buffer,Uint64 extra_send_buffer)493 TransporterCallbackKernelNonMT::allocate_send_buffers(
494 Uint64 total_send_buffer,
495 Uint64 extra_send_buffer)
496 {
497 const int maxTransporters = MAX_NTRANSPORTERS;
498 const int nTransporters = globalTransporterRegistry.get_transporter_count();
499
500 if (total_send_buffer == 0)
501 total_send_buffer = globalTransporterRegistry.get_total_max_send_buffer();
502
503 total_send_buffer += extra_send_buffer;
504
505 if (!extra_send_buffer)
506 {
507 /**
508 * If extra send buffer memory is 0 it means we can decide on an
509 * appropriate value for it. We select to always ensure that the
510 * minimum send buffer memory is 4M, otherwise we simply don't
511 * add any extra send buffer memory at all.
512 */
513 if (total_send_buffer < MIN_SEND_BUFFER_SIZE)
514 {
515 total_send_buffer = (Uint64)MIN_SEND_BUFFER_SIZE;
516 }
517 }
518
519 if (m_send_buffers)
520 {
521 /* Send buffers already allocated -> resize the buffer pages */
522 assert(m_send_buffer_memory);
523
524 // TODO resize send buffer pages
525
526 return;
527 }
528
529 /**
530 * Initialize transporter send buffers (initially empty).
531 * (Sparesely populated array of 'nTransporters')
532 */
533 assert(nTransporters <= maxTransporters);
534 m_send_buffers = new SendBuffer[maxTransporters];
535 for (int i = 0; i < maxTransporters; i++)
536 {
537 SendBuffer &b = m_send_buffers[i];
538 b.m_first_page = NULL;
539 b.m_last_page = NULL;
540 b.m_used_bytes = 0;
541 b.m_enabled = false;
542 }
543
544 /* Initialize the page freelist. */
545 Uint64 send_buffer_pages =
546 (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
547 /* Add one extra page of internal fragmentation overhead per transporter. */
548 send_buffer_pages += nTransporters;
549
550 m_send_buffer_memory =
551 new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
552 if (m_send_buffer_memory == NULL)
553 {
554 ndbout << "Unable to allocate "
555 << send_buffer_pages * SendBufferPage::PGSIZE
556 << " bytes of memory for send buffers, aborting." << endl;
557 abort();
558 }
559
560 m_page_freelist = NULL;
561 for (unsigned i = 0; i < send_buffer_pages; i++)
562 {
563 SendBufferPage *page =
564 (SendBufferPage *)(m_send_buffer_memory + i * SendBufferPage::PGSIZE);
565 page->m_bytes = 0;
566 page->m_next = m_page_freelist;
567 m_page_freelist = page;
568 }
569 m_tot_send_buffer_memory = SendBufferPage::PGSIZE * send_buffer_pages;
570 m_tot_used_buffer_memory = 0;
571 }
572
573 TransporterCallbackKernelNonMT::SendBufferPage *
alloc_page()574 TransporterCallbackKernelNonMT::alloc_page()
575 {
576 SendBufferPage *page = m_page_freelist;
577 if (page != NULL)
578 {
579 m_tot_used_buffer_memory += SendBufferPage::PGSIZE;
580 m_page_freelist = page->m_next;
581 return page;
582 }
583
584 ndbout << "ERROR: out of send buffers in kernel." << endl;
585 return NULL;
586 }
587
588 void
release_page(SendBufferPage * page)589 TransporterCallbackKernelNonMT::release_page(SendBufferPage *page)
590 {
591 assert(page != NULL);
592 page->m_next = m_page_freelist;
593 m_tot_used_buffer_memory -= SendBufferPage::PGSIZE;
594 m_page_freelist = page;
595 }
596
597 Uint32
get_bytes_to_send_iovec(NodeId node,TrpId trp_id,struct iovec * dst,Uint32 max)598 TransporterCallbackKernelNonMT::get_bytes_to_send_iovec(NodeId node,
599 TrpId trp_id,
600 struct iovec *dst,
601 Uint32 max)
602 {
603 (void)node;
604 SendBuffer *b = m_send_buffers + trp_id;
605
606 if (unlikely(!b->m_enabled))
607 {
608 discard_send_buffer(trp_id);
609 return 0;
610 }
611 if (unlikely(max == 0))
612 return 0;
613
614 Uint32 count = 0;
615 SendBufferPage *page = b->m_first_page;
616 while (page != NULL && count < max)
617 {
618 dst[count].iov_base = page->m_data+page->m_start;
619 dst[count].iov_len = page->m_bytes;
620 assert(page->m_start + page->m_bytes <= page->max_data_bytes());
621 page = page->m_next;
622 count++;
623 }
624
625 return count;
626 }
627
628 Uint32
bytes_sent(NodeId nodeId,TrpId trp_id,Uint32 bytes)629 TransporterCallbackKernelNonMT::bytes_sent(NodeId nodeId,
630 TrpId trp_id,
631 Uint32 bytes)
632 {
633 (void)nodeId;
634 SendBuffer *b = m_send_buffers + trp_id;
635 Uint32 used_bytes = b->m_used_bytes;
636
637 if (bytes == 0)
638 return used_bytes;
639
640 used_bytes -= bytes;
641 b->m_used_bytes = used_bytes;
642
643 SendBufferPage *page = b->m_first_page;
644 while (bytes && bytes >= page->m_bytes)
645 {
646 SendBufferPage * tmp = page;
647 bytes -= page->m_bytes;
648 page = page->m_next;
649 release_page(tmp);
650 }
651
652 if (used_bytes == 0)
653 {
654 b->m_first_page = 0;
655 b->m_last_page = 0;
656 }
657 else
658 {
659 page->m_start += bytes;
660 page->m_bytes -= bytes;
661 assert(page->m_start + page->m_bytes <= page->max_data_bytes());
662 b->m_first_page = page;
663 }
664
665 return used_bytes;
666 }
667
668 void
enable_send_buffer(NodeId nodeId,TrpId trp_id)669 TransporterCallbackKernelNonMT::enable_send_buffer(NodeId nodeId, TrpId trp_id)
670 {
671 (void)nodeId;
672 SendBuffer *b = m_send_buffers + trp_id;
673 assert(b->m_enabled == false);
674 assert(b->m_first_page == NULL); //Disabled buffer is empty
675 b->m_enabled = true;
676 }
677
678 void
disable_send_buffer(NodeId nodeId,TrpId trp_id)679 TransporterCallbackKernelNonMT::disable_send_buffer(NodeId nodeId,
680 TrpId trp_id)
681 {
682 (void)nodeId;
683 SendBuffer *b = m_send_buffers + trp_id;
684 b->m_enabled = false;
685 discard_send_buffer(trp_id);
686 }
687
688 void
discard_send_buffer(TrpId trp_id)689 TransporterCallbackKernelNonMT::discard_send_buffer(TrpId trp_id)
690 {
691 SendBuffer *b = m_send_buffers + trp_id;
692 SendBufferPage *page = b->m_first_page;
693 while (page != NULL)
694 {
695 SendBufferPage *next = page->m_next;
696 release_page(page);
697 page = next;
698 }
699 b->m_first_page = NULL;
700 b->m_last_page = NULL;
701 b->m_used_bytes = 0;
702 }
703
704 /**
705 * These are the TransporterSendBufferHandle methods used by the
706 * single-threaded ndbd.
707 */
708 Uint32 *
getWritePtr(NodeId nodeId,TrpId trp_id,Uint32 lenBytes,Uint32 prio,Uint32 max_use,SendStatus * error)709 TransporterCallbackKernelNonMT::getWritePtr(NodeId nodeId,
710 TrpId trp_id,
711 Uint32 lenBytes,
712 Uint32 prio,
713 Uint32 max_use,
714 SendStatus *error)
715 {
716 (void)nodeId;
717 SendBuffer *b = m_send_buffers + trp_id;
718
719 /* First check if we have room in already allocated page. */
720 SendBufferPage *page = b->m_last_page;
721 if (page != NULL &&
722 page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
723 {
724 return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
725 }
726
727 if (unlikely(b->m_used_bytes + lenBytes > max_use))
728 {
729 *error = SEND_BUFFER_FULL;
730 return NULL;
731 }
732
733 if (unlikely(lenBytes > SendBufferPage::max_data_bytes()))
734 {
735 *error = SEND_MESSAGE_TOO_BIG;
736 return NULL;
737 }
738
739 /* Allocate a new page. */
740 page = alloc_page();
741 if (unlikely(page == NULL))
742 {
743 *error = SEND_BUFFER_FULL;
744 return NULL;
745 }
746 page->m_next = NULL;
747 page->m_bytes = 0;
748 page->m_start = 0;
749
750 if (b->m_last_page == NULL)
751 {
752 b->m_first_page = page;
753 b->m_last_page = page;
754 }
755 else
756 {
757 assert(b->m_first_page != NULL);
758 b->m_last_page->m_next = page;
759 b->m_last_page = page;
760 }
761 return (Uint32 *)(page->m_data);
762 }
763
764 Uint32
updateWritePtr(NodeId nodeId,TrpId trp_id,Uint32 lenBytes,Uint32 prio)765 TransporterCallbackKernelNonMT::updateWritePtr(NodeId nodeId,
766 TrpId trp_id,
767 Uint32 lenBytes,
768 Uint32 prio)
769 {
770 (void)nodeId;
771 SendBuffer *b = m_send_buffers + trp_id;
772 SendBufferPage *page = b->m_last_page;
773 assert(page != NULL);
774 assert(page->m_bytes + lenBytes <= page->max_data_bytes());
775 page->m_bytes += lenBytes;
776 b->m_used_bytes += lenBytes;
777 return b->m_used_bytes;
778 }
779
780 /**
781 * This is used by the ndbd, so here only one thread is using this, so
782 * values will always be consistent.
783 */
784 void
getSendBufferLevel(NodeId nodeId,SB_LevelType & level)785 TransporterCallbackKernelNonMT::getSendBufferLevel(NodeId nodeId,
786 SB_LevelType &level)
787 {
788 TrpId trp_ids;
789 Uint32 num_ids;
790 globalTransporterRegistry.get_trps_for_node(nodeId,
791 &trp_ids,
792 num_ids,
793 1);
794 SendBuffer *b = m_send_buffers + trp_ids;
795 calculate_send_buffer_level(b->m_used_bytes,
796 m_tot_send_buffer_memory,
797 m_tot_used_buffer_memory,
798 0,
799 level);
800 return;
801 }
802
803 bool
forceSend(NodeId nodeId,TrpId trp_id)804 TransporterCallbackKernelNonMT::forceSend(NodeId nodeId, TrpId trp_id)
805 {
806 (void)nodeId;
807 return globalTransporterRegistry.performSend(trp_id);
808 }
809
810 #endif //'not NDBD_MULTITHREADED'
811
812 /**
813 * Report average receive length in bytes (4096 last receives)
814 */
815 void
reportReceiveLen(NodeId nodeId,Uint32 count,Uint64 bytes)816 TransporterReceiveHandleKernel::reportReceiveLen(NodeId nodeId, Uint32 count,
817 Uint64 bytes)
818 {
819
820 SignalT<3> signal;
821 memset(&signal.header, 0, sizeof(signal.header));
822
823 signal.header.theLength = 3;
824 signal.header.theSendersSignalId = 0;
825 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
826 signal.header.theReceiversBlockNumber = CMVMI;
827 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
828
829 signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
830 signal.theData[1] = nodeId;
831 signal.theData[2] = Uint32(bytes/count);
832 #ifndef NDBD_MULTITHREADED
833 Uint32 secPtr[3];
834 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
835 #else
836 sendprioa(m_thr_no /* self */,
837 &signal.header, signal.theData, NULL);
838 #endif
839 }
840
841 /**
842 * Report connection established
843 */
844
845 void
reportConnect(NodeId nodeId)846 TransporterReceiveHandleKernel::reportConnect(NodeId nodeId)
847 {
848
849 SignalT<1> signal;
850 memset(&signal.header, 0, sizeof(signal.header));
851
852 #ifndef NDBD_MULTITHREADED
853 Uint32 trpman_instance = 1;
854 #else
855 Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
856 #endif
857 signal.header.theLength = 1;
858 signal.header.theSendersSignalId = 0;
859 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
860 signal.header.theReceiversBlockNumber = numberToBlock(TRPMAN,trpman_instance);
861 signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
862
863 signal.theData[0] = nodeId;
864
865 #ifndef NDBD_MULTITHREADED
866 Uint32 secPtr[3];
867 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
868 #else
869 /**
870 * The first argument to sendprioa is from which thread number this
871 * signal is sent, it is always sent from a receive thread
872 */
873 sendprioa(m_thr_no /* self */,
874 &signal.header, signal.theData, NULL);
875 #endif
876 }
877
878 /**
879 * Report connection broken
880 */
881 void
reportDisconnect(NodeId nodeId,Uint32 errNo)882 TransporterReceiveHandleKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
883 {
884 DBUG_ENTER("reportDisconnect");
885
886 SignalT<DisconnectRep::SignalLength> signal;
887 memset(&signal.header, 0, sizeof(signal.header));
888
889 #ifndef NDBD_MULTITHREADED
890 Uint32 trpman_instance = 1;
891 #else
892 Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
893 #endif
894 signal.header.theLength = DisconnectRep::SignalLength;
895 signal.header.theSendersSignalId = 0;
896 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
897 signal.header.theTrace = TestOrd::TraceDisconnect;
898 signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP;
899 signal.header.theReceiversBlockNumber = numberToBlock(TRPMAN,trpman_instance);
900
901 DisconnectRep * rep = CAST_PTR(DisconnectRep, &signal.theData[0]);
902 rep->nodeId = nodeId;
903 rep->err = errNo;
904
905 #ifndef NDBD_MULTITHREADED
906 Uint32 secPtr[3];
907 globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
908 #else
909 sendprioa(m_thr_no /* self */,
910 &signal.header, signal.theData, NULL);
911 #endif
912
913 DBUG_VOID_RETURN;
914 }
915
916 void
printSegmentedSection(FILE * output,const SignalHeader & sh,const SegmentedSectionPtr ptr[3],unsigned i)917 SignalLoggerManager::printSegmentedSection(FILE * output,
918 const SignalHeader & sh,
919 const SegmentedSectionPtr ptr[3],
920 unsigned i)
921 {
922 fprintf(output, "SECTION %u type=segmented", i);
923 if (i >= 3) {
924 fprintf(output, " *** invalid ***\n");
925 return;
926 }
927 const Uint32 len = ptr[i].sz;
928 SectionSegment * ssp = ptr[i].p;
929 Uint32 pos = 0;
930 fprintf(output, " size=%u\n", (unsigned)len);
931 while (pos < len) {
932 if (pos > 0 && pos % SectionSegment::DataLength == 0) {
933 ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment);
934 }
935 printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);
936 }
937 if (len > 0)
938 putc('\n', output);
939 }
940
941 /**
942 * Check to see if jobbbuffers are starting to get full
943 * and if so call doJob
944 */
945 int
checkJobBuffer()946 TransporterReceiveHandleKernel::checkJobBuffer()
947 {
948 #ifndef NDBD_MULTITHREADED
949 return globalScheduler.checkDoJob();
950 #else
951 return mt_checkDoJob(m_receiver_thread_idx);
952 #endif
953 }
954
955 #ifdef NDBD_MULTITHREADED
956 void
assign_trps(Uint32 * recv_thread_idx_array)957 TransporterReceiveHandleKernel::assign_trps(Uint32 *recv_thread_idx_array)
958 {
959 m_transporters.clear(); /* Clear all first */
960 for (Uint32 trp_id = 1; trp_id < MAX_NTRANSPORTERS; trp_id++)
961 {
962 if (recv_thread_idx_array[trp_id] == m_receiver_thread_idx)
963 {
964 DEB_MULTI_TRP(("trp_id %u assigned to recv thread %u",
965 trp_id, m_receiver_thread_idx));
966 m_transporters.set(trp_id); /* Belongs to our receive thread */
967 }
968 }
969 return;
970 }
971 #endif
972
973 void
transporter_recv_from(NodeId nodeId)974 TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
975 {
976 if (globalData.get_hb_count(nodeId) != 0)
977 {
978 globalData.set_hb_count(nodeId) = 0;
979 }
980 }
981
982 #ifndef NDBD_MULTITHREADED
983 class TransporterReceiveHandle *
mt_get_trp_receive_handle(unsigned instance)984 mt_get_trp_receive_handle(unsigned instance)
985 {
986 assert(instance == 0);
987 return &myTransporterCallback;
988 }
989 #endif
990
991 /**
992 * #undef is needed since this file is included by TransporterCallback_nonmt.cpp
993 * and TransporterCallback_mt.cpp
994 */
995 #undef JAM_FILE_ID
996