1 /*
2    Copyright (c) 2003, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 /*
26   This file is used to build both the multithreaded and the singlethreaded
27   ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
28   the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
29   macro not defined).
30 */
31 
32 #include <ndb_global.h>
33 
34 #include "SimulatedBlock.hpp"
35 #include <NdbOut.hpp>
36 #include <OutputStream.hpp>
37 #include <GlobalData.hpp>
38 #include <Emulator.hpp>
39 #include <WatchDog.hpp>
40 #include <ErrorHandlingMacros.hpp>
41 #include <TimeQueue.hpp>
42 #include <TransporterRegistry.hpp>
43 #include <SignalLoggerManager.hpp>
44 #include <FastScheduler.hpp>
45 #include "ndbd_malloc.hpp"
46 #include "signaldata/DumpStateOrd.hpp"
47 #include <signaldata/EventReport.hpp>
48 #include <signaldata/ContinueFragmented.hpp>
49 #include <signaldata/NodeStateSignalData.hpp>
50 #include <signaldata/FsRef.hpp>
51 #include <signaldata/SignalDroppedRep.hpp>
52 #include <signaldata/LocalRouteOrd.hpp>
53 #include <signaldata/TransIdAI.hpp>
54 #include <signaldata/Sync.hpp>
55 #include <DebuggerNames.hpp>
56 #include "LongSignal.hpp"
57 
58 #include <Properties.hpp>
59 #include "Configuration.hpp"
60 #include <AttributeDescriptor.hpp>
61 #include <NdbSqlUtil.hpp>
62 
63 #include "../blocks/dbdih/Dbdih.hpp"
64 #include <signaldata/CallbackSignal.hpp>
65 #include "LongSignalImpl.hpp"
66 
67 #include "KeyDescriptor.hpp"
68 
69 #include <EventLogger.hpp>
70 
71 #define JAM_FILE_ID 252
72 
73 extern EventLogger * g_eventLogger;
74 
75 //
76 // Constructor, Destructor
77 //
SimulatedBlock(BlockNumber blockNumber,struct Block_context & ctx,Uint32 instanceNumber)78 SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
79 			       struct Block_context & ctx,
80                                Uint32 instanceNumber)
81   : theNodeId(globalData.ownId),
82     theNumber(blockNumber),
83     theInstance(instanceNumber),
84     theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
85     theInstanceList(0),
86     theMainInstance(0),
87     m_pHighResTimer(0),
88     m_ctx(ctx),
89     m_global_page_pool(globalData.m_global_page_pool),
90     m_shared_page_pool(globalData.m_shared_page_pool),
91     c_fragmentInfoHash(c_fragmentInfoPool),
92     c_linearFragmentSendList(c_fragmentSendPool),
93     c_segmentedFragmentSendList(c_fragmentSendPool),
94     c_mutexMgr(* this),
95     c_counterMgr(* this)
96 #ifdef VM_TRACE_TIME
97     ,m_currentGsn(0)
98 #endif
99 #ifdef VM_TRACE
100     ,debugOutFile(globalSignalLoggers.getOutputStream())
101     ,debugOut(debugOutFile)
102 #endif
103 {
104   m_threadId = 0;
105   m_watchDogCounter = NULL;
106   m_jamBuffer = NDB_THREAD_TLS_JAM;
107   NewVarRef = 0;
108 
109   SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
110 
111   if (theInstance == 0) {
112     ndbrequire(mainBlock == 0);
113     mainBlock = this;
114     theMainInstance = mainBlock;
115     globalData.setBlock(blockNumber, mainBlock);
116     mainBlock->addInstance(this, theInstance);
117   } else {
118     ndbrequire(mainBlock != 0);
119     mainBlock->addInstance(this, theInstance);
120     theMainInstance = mainBlock;
121   }
122 
123   c_fragmentIdCounter = 1;
124   c_fragSenderRunning = false;
125 
126 #ifdef VM_TRACE_TIME
127   clearTimes();
128 #endif
129 
130   for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++)
131     theExecArray[i] = 0;
132 
133   installSimulatedBlockFunctions();
134 
135   m_callbackTableAddr = 0;
136 
137   CLEAR_ERROR_INSERT_VALUE;
138 
139 #ifndef NDBD_MULTITHREADED
140   /* Ndbd, init from GlobalScheduler */
141   m_pHighResTimer = globalScheduler.getHighResTimerPtr();
142 #endif
143 }
144 
145 void
addInstance(SimulatedBlock * b,Uint32 theInstance)146 SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
147 {
148   ndbrequire(theMainInstance == this);
149   ndbrequire(number() == b->number());
150   if (theInstanceList == 0)
151   {
152     theInstanceList = new SimulatedBlock* [MaxInstances];
153     ndbrequire(theInstanceList != 0);
154     for (Uint32 i = 0; i < MaxInstances; i++)
155       theInstanceList[i] = 0;
156   }
157   ndbrequire(theInstance < MaxInstances);
158   ndbrequire(theInstanceList[theInstance] == 0);
159   theInstanceList[theInstance] = b;
160 }
161 
162 void
initCommon()163 SimulatedBlock::initCommon()
164 {
165   NDB_STATIC_ASSERT(RG_COUNT == MM_RG_COUNT + 1);
166 
167   Uint32 count = 10;
168   this->getParam("FragmentSendPool", &count);
169   c_fragmentSendPool.setSize(count);
170 
171   count = 10;
172   this->getParam("FragmentInfoPool", &count);
173   c_fragmentInfoPool.setSize(count);
174 
175   count = 10;
176   this->getParam("FragmentInfoHash", &count);
177   c_fragmentInfoHash.setSize(count);
178 
179   Uint32 def = 5;
180 #ifdef NDBD_MULTITHREADED
181   def += globalData.getBlockThreads();
182 #endif
183 
184   count = def;
185   this->getParam("ActiveMutexes", &count);
186   c_mutexMgr.setSize(count);
187 
188   count = def;
189   this->getParam("ActiveCounters", &count);
190   c_counterMgr.setSize(count);
191 
192   count = def;
193   this->getParam("ActiveThreadSync", &count);
194   c_syncThreadPool.setSize(count);
195 }
196 
~SimulatedBlock()197 SimulatedBlock::~SimulatedBlock()
198 {
199   freeBat();
200 #ifdef VM_TRACE_TIME
201   printTimes(stdout);
202 #endif
203 
204   if (theInstanceList != 0) {
205     Uint32 i;
206     for (i = 0; i < MaxInstances; i++)
207     {
208       if (theInstanceList[i] != this)
209       {
210         delete theInstanceList[i];
211       }
212     }
213     delete [] theInstanceList;
214   }
215   theInstanceList = 0;
216 }
217 
218 void
installSimulatedBlockFunctions()219 SimulatedBlock::installSimulatedBlockFunctions(){
220   ExecFunction * a = theExecArray;
221   a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
222   a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ;
223   a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
224   a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
225   a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
226   a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
227   a[GSN_UTIL_CREATE_LOCK_REF]   = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
228   a[GSN_UTIL_CREATE_LOCK_CONF]  = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
229   a[GSN_UTIL_DESTROY_LOCK_REF]  = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
230   a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
231   a[GSN_UTIL_LOCK_REF]    = &SimulatedBlock::execUTIL_LOCK_REF;
232   a[GSN_UTIL_LOCK_CONF]   = &SimulatedBlock::execUTIL_LOCK_CONF;
233   a[GSN_UTIL_UNLOCK_REF]  = &SimulatedBlock::execUTIL_UNLOCK_REF;
234   a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
235   a[GSN_FSOPENREF]    = &SimulatedBlock::execFSOPENREF;
236   a[GSN_FSCLOSEREF]   = &SimulatedBlock::execFSCLOSEREF;
237   a[GSN_FSWRITEREF]   = &SimulatedBlock::execFSWRITEREF;
238   a[GSN_FSREADREF]    = &SimulatedBlock::execFSREADREF;
239   a[GSN_FSREMOVEREF]  = &SimulatedBlock::execFSREMOVEREF;
240   a[GSN_FSSYNCREF]    = &SimulatedBlock::execFSSYNCREF;
241   a[GSN_FSAPPENDREF]  = &SimulatedBlock::execFSAPPENDREF;
242   a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
243   a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
244   a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
245   a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
246   a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
247   a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
248   a[GSN_LOCAL_ROUTE_ORD] = &SimulatedBlock::execLOCAL_ROUTE_ORD;
249   a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ;
250   a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ;
251   a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF;
252 }
253 
254 void
addRecSignalImpl(GlobalSignalNumber gsn,ExecFunction f,bool force)255 SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
256 				 ExecFunction f, bool force){
257   if(gsn > MAX_GSN || (!force &&  theExecArray[gsn] != 0)){
258     char errorMsg[255];
259     BaseString::snprintf(errorMsg, 255,
260  	     "GSN %d(%d))", gsn, MAX_GSN);
261     ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
262   }
263   theExecArray[gsn] = f;
264 }
265 
266 void
assignToThread(ThreadContext ctx)267 SimulatedBlock::assignToThread(ThreadContext ctx)
268 {
269   m_threadId = ctx.threadId;
270   m_jamBuffer = ctx.jamBuffer;
271   m_watchDogCounter = ctx.watchDogCounter;
272   m_sectionPoolCache = ctx.sectionPoolCache;
273   m_pHighResTimer = ctx.pHighResTimer;
274 }
275 
276 Uint32
getInstanceKeyCanFail(Uint32 tabId,Uint32 fragId)277 SimulatedBlock::getInstanceKeyCanFail(Uint32 tabId, Uint32 fragId)
278 {
279   Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
280   Uint32 instanceKey = dbdih->dihGetInstanceKeyCanFail(tabId, fragId);
281   return instanceKey;
282 }
283 
284 Uint32
getInstanceKey(Uint32 tabId,Uint32 fragId)285 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
286 {
287   Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
288   Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
289   return instanceKey;
290 }
291 
292 Uint32
getInstanceFromKey(Uint32 instanceKey)293 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey)
294 {
295   Uint32 lqhWorkers = globalData.ndbMtLqhWorkers;
296   Uint32 instanceNo;
297   if (lqhWorkers == 0) {
298     instanceNo = 0;
299   } else {
300     assert(instanceKey != 0);
301     instanceNo = 1 + (instanceKey - 1) % lqhWorkers;
302   }
303   return instanceNo;
304 }
305 
306 void
signal_error(Uint32 gsn,Uint32 len,Uint32 recBlockNo,const char * filename,int lineno) const307 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
308 			     const char* filename, int lineno) const
309 {
310   char objRef[255];
311   BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno);
312   char probData[255];
313   BaseString::snprintf(probData, 255,
314 	   "Signal (GSN: %d, Length: %d, Rec Block No: %d)",
315 	   gsn, len, recBlockNo);
316 
317   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
318 			     probData,
319 			     objRef);
320 }
321 
322 
323 extern class SectionSegmentPool g_sectionSegmentPool;
324 
325 void
handle_invalid_sections_in_send_signal(const Signal * signal) const326 SimulatedBlock::handle_invalid_sections_in_send_signal(const Signal* signal)
327 const
328 {
329   char errMsg[160];
330   BaseString::snprintf(errMsg, sizeof errMsg,
331                        "Unhandled sections in sendSignal for GSN %u (%s).",
332                        signal->header.theVerId_signalNumber,
333                        getSignalName(signal->header.theVerId_signalNumber));
334   // Print message and terminate.
335   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
336                              errMsg,
337                              "");
338 }
339 
340 void
handle_lingering_sections_after_execute(const Signal * signal) const341 SimulatedBlock::handle_lingering_sections_after_execute(const Signal* signal)
342 const
343 {
344   char errMsg[160];
345   BaseString::snprintf(errMsg, sizeof errMsg,
346                       "Unhandled sections after execute for GSN %u (%s).",
347                       signal->header.theVerId_signalNumber,
348                       getSignalName(signal->header.theVerId_signalNumber));
349   // Print message and terminate.
350   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
351                              errMsg,
352                              "");
353 }
354 
355 void
handle_invalid_fragmentInfo(Signal * signal) const356 SimulatedBlock::handle_invalid_fragmentInfo(Signal* signal) const
357 {
358   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
359                              "Incorrect header->m_fragmentInfo in sendSignal()",
360                              "");
361 }
362 
363 void
handle_out_of_longsignal_memory(Signal * signal) const364 SimulatedBlock::handle_out_of_longsignal_memory(Signal * signal) const
365 {
366   ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
367 			     "Out of LongMessageBuffer in sendSignal",
368 			     "");
369 }
370 
371 template<typename SecPtr>
372 void
handle_send_failed(SendStatus ss,Signal * signal,Uint32 recNode,SecPtr ptr[]) const373 SimulatedBlock::handle_send_failed(SendStatus ss,
374                                    Signal * signal,
375                                    Uint32 recNode,
376                                    SecPtr ptr[]) const
377 {
378   switch(ss){
379   case SEND_BUFFER_FULL:
380     ErrorReporter::handleError(NDBD_EXIT_GENERIC,
381                                "Out of SendBufferMemory in sendSignal", "");
382     break;
383   case SEND_MESSAGE_TOO_BIG:
384     /* If message is too big when sending CmvmiDummySignal log a convinient
385      * message about it to.
386      * Note that CmvmiDummySignal is not intended for production usage but for
387      * use by test cases.
388      */
389     if (signal->header.theVerId_signalNumber == GSN_DUMP_STATE_ORD &&
390         signal->theData[0] == DumpStateOrd::CmvmiDummySignal)
391     {
392       jam();
393       const Uint32 num_secs = signal->getNoOfSections();
394       char msg[24*4];
395       snprintf(msg,
396                sizeof(msg),
397                "Failed sending CmvmiDummySignal"
398                " (size %u+%u+%u+%u+%u) from %u to %u.",
399                signal->getLength(), num_secs,
400                (num_secs > 0) ? ptr[0].sz : 0,
401                (num_secs > 1) ? ptr[1].sz : 0,
402                (num_secs > 2) ? ptr[2].sz : 0,
403                signal->theData[2],
404                recNode);
405       g_eventLogger->info("%s", msg);
406       infoEvent("%s", msg);
407       return;
408     }
409     ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
410                                "Message too big in sendSignal", "");
411     break;
412   case SEND_UNKNOWN_NODE:
413     ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
414                                "Unknown node in sendSignal", "");
415     break;
416   case SEND_OK:
417   case SEND_BLOCKED:
418   case SEND_DISCONNECTED:
419     // Should never happen
420     ndbabort();
421   }
422   ndbabort();
423 }
424 
425 static void
linkSegments(Uint32 head,Uint32 tail)426 linkSegments(Uint32 head, Uint32 tail){
427 
428   Ptr<SectionSegment> headPtr;
429   g_sectionSegmentPool.getPtr(headPtr, head);
430 
431   Ptr<SectionSegment> tailPtr;
432   g_sectionSegmentPool.getPtr(tailPtr, tail);
433 
434   Ptr<SectionSegment> oldTailPtr;
435   g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
436 
437   /* Can only efficiently link segments if linking to the end of a
438    * multiple-of-segment-size sized chunk
439    */
440   if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0)
441   {
442 #if defined VM_TRACE || defined ERROR_INSERT
443     ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
444                                "Bad head segment size",
445                                "");
446 #else
447     ndbout_c("linkSegments : Bad head segment size");
448 #endif
449   }
450 
451   headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
452   headPtr.p->m_sz += tailPtr.p->m_sz;
453 
454   oldTailPtr.p->m_nextSegment = tailPtr.i;
455 }
456 
457 void
getSections(Uint32 secCount,SegmentedSectionPtr ptr[3])458 getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
459   Uint32 tSec0 = ptr[0].i;
460   Uint32 tSec1 = ptr[1].i;
461   Uint32 tSec2 = ptr[2].i;
462   SectionSegment * p;
463   switch(secCount){
464   case 3:
465     p = g_sectionSegmentPool.getPtr(tSec2);
466     ptr[2].p = p;
467     ptr[2].sz = p->m_sz;
468     // Fall through
469   case 2:
470     p = g_sectionSegmentPool.getPtr(tSec1);
471     ptr[1].p = p;
472     ptr[1].sz = p->m_sz;
473     // Fall through
474   case 1:
475     p = g_sectionSegmentPool.getPtr(tSec0);
476     ptr[0].p = p;
477     ptr[0].sz = p->m_sz;
478     // Fall through
479   case 0:
480     return;
481   }
482   char msg[40];
483   sprintf(msg, "secCount=%d", secCount);
484   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
485 }
486 
487 void
getSection(SegmentedSectionPtr & ptr,Uint32 i)488 getSection(SegmentedSectionPtr & ptr, Uint32 i){
489   ptr.i = i;
490   SectionSegment * p = g_sectionSegmentPool.getPtr(i);
491   ptr.p = p;
492   ptr.sz = p->m_sz;
493 }
494 
getSectionSz(Uint32 id)495 Uint32 getSectionSz(Uint32 id)
496 {
497   return g_sectionSegmentPool.getPtr(id)->m_sz;
498 }
499 
getLastWordPtr(Uint32 id)500 Uint32* getLastWordPtr(Uint32 id)
501 {
502   SectionSegment* first= g_sectionSegmentPool.getPtr(id);
503   SectionSegment* last= g_sectionSegmentPool.getPtr(first->m_lastSegment);
504   Uint32 offset= (first->m_sz -1) % SectionSegment::DataLength;
505   return &last->theData[offset];
506 }
507 
508 #ifdef NDBD_MULTITHREADED
509 #define SB_SP_ARG *m_sectionPoolCache,
510 #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache,
511 #else
512 #define SB_SP_ARG
513 #define SB_SP_REL_ARG
514 #endif
515 
516 static
517 void
releaseSections(SPC_ARG Uint32 secCount,SegmentedSectionPtr ptr[3])518 releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){
519   Uint32 tSec0 = ptr[0].i;
520   Uint32 tSz0 = ptr[0].sz;
521   Uint32 tSec1 = ptr[1].i;
522   Uint32 tSz1 = ptr[1].sz;
523   Uint32 tSec2 = ptr[2].i;
524   Uint32 tSz2 = ptr[2].sz;
525   switch(secCount){
526   case 3:
527     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
528                                      relSz(tSz2), tSec2,
529 				     ptr[2].p->m_lastSegment);
530     // Fall through
531   case 2:
532     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
533                                      relSz(tSz1), tSec1,
534 				     ptr[1].p->m_lastSegment);
535     // Fall through
536   case 1:
537     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
538                                      relSz(tSz0), tSec0,
539 				     ptr[0].p->m_lastSegment);
540     // Fall through
541   case 0:
542     return;
543   }
544   char msg[40];
545   sprintf(msg, "secCount=%d", secCount);
546   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
547 }
548 
549 void
getSendBufferLevel(NodeId node,SB_LevelType & level)550 SimulatedBlock::getSendBufferLevel(NodeId node, SB_LevelType &level)
551 {
552 #ifdef NDBD_MULTITHREADED
553   mt_getSendBufferLevel(m_threadId, node, level);
554 #else
555   getNonMTTransporterSendHandle()->getSendBufferLevel(node, level);
556 #endif
557 }
558 
559 Uint32
getSignalsInJBB()560 SimulatedBlock::getSignalsInJBB()
561 {
562   Uint32 num_signals;
563 #ifdef NDBD_MULTITHREADED
564   num_signals = mt_getSignalsInJBB(m_threadId);
565 #else
566   num_signals = globalScheduler.getBOccupancy();
567 #endif
568   return num_signals;
569 }
570 
571 void
startChangeNeighbourNode()572 SimulatedBlock::startChangeNeighbourNode()
573 {
574   /* We only treat neighbour nodes in a special manner in ndbmtd. */
575 #ifdef NDBD_MULTITHREADED
576   mt_startChangeNeighbourNode();
577 #endif
578 }
579 
580 void
setNeighbourNode(NodeId node)581 SimulatedBlock::setNeighbourNode(NodeId node)
582 {
583   /* We only treat neighbour nodes in a special manner in ndbmtd. */
584 #ifdef NDBD_MULTITHREADED
585   mt_setNeighbourNode(node);
586 #endif
587 }
588 
589 void
setNoSend()590 SimulatedBlock::setNoSend()
591 {
592 #ifdef NDBD_MULTITHREADED
593   mt_setNoSend(m_threadId);
594 #endif
595 }
596 
597 void
endChangeNeighbourNode()598 SimulatedBlock::endChangeNeighbourNode()
599 {
600   /* We only treat neighbour nodes in a special manner in ndbmtd. */
601 #ifdef NDBD_MULTITHREADED
602   mt_endChangeNeighbourNode();
603 #endif
604 }
605 
606 void
setWakeupThread(Uint32 wakeup_instance)607 SimulatedBlock::setWakeupThread(Uint32 wakeup_instance)
608 {
609 #ifdef NDBD_MULTITHREADED
610   mt_setWakeupThread(m_threadId, wakeup_instance);
611 #endif
612 }
613 
614 void
setOverloadStatus(OverloadStatus new_status)615 SimulatedBlock::setOverloadStatus(OverloadStatus new_status)
616 {
617 #ifdef NDBD_MULTITHREADED
618   mt_setOverloadStatus(m_threadId, new_status);
619 #endif
620 }
621 
622 void
setNodeOverloadStatus(OverloadStatus new_status)623 SimulatedBlock::setNodeOverloadStatus(OverloadStatus new_status)
624 {
625 #ifdef NDBD_MULTITHREADED
626   mt_setNodeOverloadStatus(m_threadId, new_status);
627 #endif
628 }
629 
630 void
setSendNodeOverloadStatus(OverloadStatus new_status)631 SimulatedBlock::setSendNodeOverloadStatus(OverloadStatus new_status)
632 {
633 #ifdef NDBD_MULTITHREADED
634   mt_setSendNodeOverloadStatus(new_status);
635 #endif
636 }
637 
638 Uint32
getConfiguredSpintime()639 SimulatedBlock::getConfiguredSpintime()
640 {
641 #ifdef NDBD_MULTITHREADED
642   return mt_getConfiguredSpintime(m_threadId);
643 #else
644   return 0;
645 #endif
646 }
647 
648 void
setSpintime(Uint32 new_spintime)649 SimulatedBlock::setSpintime(Uint32 new_spintime)
650 {
651 #ifdef NDBD_MULTITHREADED
652   mt_setSpintime(m_threadId, new_spintime);
653 #endif
654 }
655 
656 Uint32
getWakeupLatency()657 SimulatedBlock::getWakeupLatency()
658 {
659 #ifdef NDBD_MULTITHREADED
660   return mt_getWakeupLatency();
661 #else
662   return 25;
663 #endif
664 }
665 
666 void
setWakeupLatency(Uint32 latency)667 SimulatedBlock::setWakeupLatency(Uint32 latency)
668 {
669 #ifdef NDBD_MULTITHREADED
670   mt_setWakeupLatency(latency);
671 #endif
672 }
673 
674 void
getPerformanceTimers(Uint64 & micros_sleep,Uint64 & spin_time,Uint64 & buffer_full_micros_sleep,Uint64 & micros_send)675 SimulatedBlock::getPerformanceTimers(Uint64 & micros_sleep,
676                                      Uint64 & spin_time,
677                                      Uint64 & buffer_full_micros_sleep,
678                                      Uint64 & micros_send)
679 {
680 #ifdef NDBD_MULTITHREADED
681   mt_getPerformanceTimers(m_threadId,
682                           micros_sleep,
683                           spin_time,
684                           buffer_full_micros_sleep,
685                           micros_send);
686 #else
687   micros_sleep = globalData.theMicrosSleep;
688   spin_time = globalData.theMicrosSpin;
689   buffer_full_micros_sleep = globalData.theBufferFullMicrosSleep;
690   micros_send = globalData.theMicrosSend;
691 #endif
692 }
693 
694 const char *
getThreadDescription()695 SimulatedBlock::getThreadDescription()
696 {
697   const char *desc;
698 #ifdef NDBD_MULTITHREADED
699   desc = mt_getThreadDescription(m_threadId);
700 #else
701   desc = "ndbd single thread";
702 #endif
703   return desc;
704 }
705 
706 const char *
getThreadName()707 SimulatedBlock::getThreadName()
708 {
709   const char *name;
710 #ifdef NDBD_MULTITHREADED
711   name = mt_getThreadName(m_threadId);
712 #else
713   name = "main";
714 #endif
715   return name;
716 }
717 
718 void
getSendPerformanceTimers(Uint32 send_instance,Uint64 & exec_time,Uint64 & sleep_time,Uint64 & spin_time,Uint64 & user_time_os,Uint64 & kernel_time_os,Uint64 & elapsed_time_os)719 SimulatedBlock::getSendPerformanceTimers(Uint32 send_instance,
720                                          Uint64 & exec_time,
721                                          Uint64 & sleep_time,
722                                          Uint64 & spin_time,
723                                          Uint64 & user_time_os,
724                                          Uint64 & kernel_time_os,
725                                          Uint64 & elapsed_time_os)
726 {
727   /* No send thread in ndbd */
728 #ifdef NDBD_MULTITHREADED
729   mt_getSendPerformanceTimers(send_instance,
730                               exec_time,
731                               sleep_time,
732                               spin_time,
733                               user_time_os,
734                               kernel_time_os,
735                               elapsed_time_os);
736 #else
737   exec_time = 0;
738   sleep_time = 0;
739   spin_time = 0;
740   user_time_os = 0;
741   kernel_time_os = 0;
742   elapsed_time_os = 0;
743 #endif
744 }
745 
746 Uint32
getNumSendThreads()747 SimulatedBlock::getNumSendThreads()
748 {
749 #ifdef NDBD_MULTITHREADED
750   return mt_getNumSendThreads();
751 #else
752   return 0;
753 #endif
754 }
755 
756 Uint32
getNumThreads()757 SimulatedBlock::getNumThreads()
758 {
759 #ifdef NDBD_MULTITHREADED
760   return mt_getNumThreads();
761 #else
762   return 1;
763 #endif
764 }
765 
766 void
flush_send_buffers()767 SimulatedBlock::flush_send_buffers()
768 {
769 #ifdef NDBD_MULTITHREADED
770   mt_flush_send_buffers(m_threadId);
771 #endif
772 }
773 
774 void
set_watchdog_counter()775 SimulatedBlock::set_watchdog_counter()
776 {
777 #ifdef NDBD_MULTITHREADED
778   mt_set_watchdog_counter(m_threadId);
779 #endif
780 }
781 
782 void
assign_recv_thread_new_trp(Uint32 trp_id)783 SimulatedBlock::assign_recv_thread_new_trp(Uint32 trp_id)
784 {
785 #ifdef NDBD_MULTITHREADED
786   mt_assign_recv_thread_new_trp(trp_id);
787 #endif
788 }
789 
790 void
assign_multi_trps_to_send_threads()791 SimulatedBlock::assign_multi_trps_to_send_threads()
792 {
793 #ifdef NDBD_MULTITHREADED
794   mt_assign_multi_trps_to_send_threads();
795 #endif
796 }
797 
798 bool
epoll_add_trp(NodeId node_id,TrpId trp_id)799 SimulatedBlock::epoll_add_trp(NodeId node_id, TrpId trp_id)
800 {
801 #ifdef NDBD_MULTITHREADED
802   return mt_epoll_add_trp(m_threadId, node_id, trp_id);
803 #else
804   require(false);
805   return false;
806 #endif
807 }
808 
809 bool
is_recv_thread_for_new_trp(NodeId node_id,TrpId trp_id)810 SimulatedBlock::is_recv_thread_for_new_trp(NodeId node_id, TrpId trp_id)
811 {
812 #ifdef NDBD_MULTITHREADED
813   return mt_is_recv_thread_for_new_trp(m_threadId, node_id, trp_id);
814 #else
815   require(false);
816   return false;
817 #endif
818 }
819 
820 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const821 SimulatedBlock::sendSignal(BlockReference ref,
822 			   GlobalSignalNumber gsn,
823 			   Signal* signal,
824 			   Uint32 length,
825 			   JobBufferLevel jobBuffer) const {
826 
827   BlockReference sendBRef = reference();
828 
829   Uint32 recBlock = refToBlock(ref);
830   Uint32 recNode   = refToNode(ref);
831   Uint32 ourProcessor         = globalData.ownId;
832 
833   ndbrequire(signal->header.m_noOfSections == 0);
834   check_sections(signal, signal->header.m_noOfSections, 0);
835 
836   signal->header.theLength = length;
837   signal->header.theVerId_signalNumber = gsn;
838   signal->header.theReceiversBlockNumber = recBlock;
839   signal->header.m_noOfSections = 0;
840 
841   Uint32 tSignalId = signal->header.theSignalId;
842 
843   if (unlikely((length == 0) || length > 25 || (recBlock == 0)))
844   {
845     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
846     return;
847   }//if
848 #ifdef VM_TRACE
849   if(globalData.testOn){
850     Uint16 proc =
851       (recNode == 0 ? globalData.ownId : recNode);
852     signal->header.theSendersBlockRef = sendBRef;
853     globalSignalLoggers.sendSignal(signal->header,
854 				   jobBuffer,
855 				   &signal->theData[0],
856 				   proc);
857   }
858 #endif
859 
860   if(recNode == ourProcessor || recNode == 0) {
861     signal->header.theSendersSignalId = tSignalId;
862     signal->header.theSendersBlockRef = sendBRef;
863 #ifdef NDBD_MULTITHREADED
864     if (jobBuffer == JBB)
865       sendlocal(m_threadId, &signal->header, signal->theData, NULL);
866     else
867       sendprioa(m_threadId, &signal->header, signal->theData, NULL);
868 #else
869     globalScheduler.execute(signal, jobBuffer, recBlock,
870 			    gsn);
871 #endif
872     return;
873   } else {
874     // send distributed Signal
875     SignalHeader sh;
876 
877     Uint32 tTrace = signal->getTrace();
878 
879     sh.theVerId_signalNumber   = gsn;
880     sh.theReceiversBlockNumber = recBlock;
881     sh.theSendersBlockRef      = refToBlock(sendBRef);
882     sh.theLength               = length;
883     sh.theTrace                = tTrace;
884     sh.theSignalId             = tSignalId;
885     sh.m_noOfSections          = 0;
886     sh.m_fragmentInfo          = 0;
887 
888 #ifdef TRACE_DISTRIBUTED
889     ndbout_c("send: %s(%d) to (%s, %d)",
890 	     getSignalName(gsn), gsn, getBlockName(recBlock),
891 	     recNode);
892 #endif
893 
894     SendStatus ss;
895 #ifdef NDBD_MULTITHREADED
896     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
897                         recNode, 0);
898 #else
899     TrpId trp_id = 0;
900     ss = globalTransporterRegistry.
901            prepareSend(getNonMTTransporterSendHandle(),
902                        &sh, jobBuffer,
903                        &signal->theData[0], recNode, trp_id,
904                        (LinearSectionPtr*)0);
905 #endif
906 
907     if (unlikely(! (ss == SEND_OK ||
908                     ss == SEND_BLOCKED ||
909                     ss == SEND_DISCONNECTED)))
910     {
911       handle_send_failed(ss, signal, recNode, (LinearSectionPtr*)NULL);
912     }
913   }
914   return;
915 }
916 
917 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const918 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
919 			   GlobalSignalNumber gsn,
920 			   Signal* signal,
921 			   Uint32 length,
922 			   JobBufferLevel jobBuffer) const {
923 
924   Uint32 noOfSections = signal->header.m_noOfSections;
925   Uint32 tSignalId = signal->header.theSignalId;
926   Uint32 tTrace = signal->getTrace();
927 
928   Uint32 ourProcessor = globalData.ownId;
929   Uint32 recBlock = rg.m_block;
930 
931   signal->header.theLength = length;
932   signal->header.theVerId_signalNumber = gsn;
933   signal->header.theReceiversBlockNumber = recBlock;
934   signal->header.theSendersSignalId = tSignalId;
935   signal->header.theSendersBlockRef = reference();
936   signal->header.m_noOfSections = 0;
937 
938 ndbrequire(noOfSections == 0);
939   check_sections(signal, noOfSections, 0);
940 
941   if ((length == 0) || (length > 25) || (recBlock == 0)) {
942     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
943     return;
944   }//if
945 
946   SignalHeader sh;
947 
948   sh.theVerId_signalNumber   = gsn;
949   sh.theReceiversBlockNumber = recBlock;
950   sh.theSendersBlockRef      = refToBlock(reference());
951   sh.theLength               = length;
952   sh.theTrace                = tTrace;
953   sh.theSignalId             = tSignalId;
954   sh.m_noOfSections          = 0;
955   sh.m_fragmentInfo          = 0;
956 
957   /**
958    * Check own node
959    */
960   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
961 #ifdef VM_TRACE
962     if(globalData.testOn){
963       globalSignalLoggers.sendSignal(signal->header,
964 				     jobBuffer,
965 				     &signal->theData[0],
966 				     ourProcessor);
967     }
968 #endif
969 
970 #ifdef NDBD_MULTITHREADED
971     if (jobBuffer == JBB)
972       sendlocal(m_threadId, &signal->header, signal->theData, NULL);
973     else
974       sendprioa(m_threadId, &signal->header, signal->theData, NULL);
975 #else
976     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
977 #endif
978 
979     rg.m_nodes.clear((Uint32)0);
980     rg.m_nodes.clear(ourProcessor);
981   }
982 
983   /**
984    * Do the big loop
985    */
986   Uint32 recNode = 0;
987   while(!rg.m_nodes.isclear()){
988     recNode = rg.m_nodes.find(recNode + 1);
989     rg.m_nodes.clear(recNode);
990 #ifdef VM_TRACE
991     if(globalData.testOn){
992       globalSignalLoggers.sendSignal(signal->header,
993 				     jobBuffer,
994 				     &signal->theData[0],
995 				     recNode);
996     }
997 #endif
998 
999 #ifdef TRACE_DISTRIBUTED
1000     ndbout_c("send: %s(%d) to (%s, %d)",
1001 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1002 	     recNode);
1003 #endif
1004 
1005     SendStatus ss;
1006 #ifdef NDBD_MULTITHREADED
1007     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1008                         recNode, 0);
1009 #else
1010     TrpId trp_id = 0;
1011     ss = globalTransporterRegistry.
1012            prepareSend(getNonMTTransporterSendHandle(),
1013                        &sh, jobBuffer,
1014                        &signal->theData[0], recNode, trp_id,
1015                        (LinearSectionPtr*)0);
1016 #endif
1017 
1018     if (unlikely(! (ss == SEND_OK ||
1019                     ss == SEND_BLOCKED ||
1020                     ss == SEND_DISCONNECTED)))
1021     {
1022       handle_send_failed(ss, signal, recNode, (LinearSectionPtr*)NULL);
1023     }
1024   }
1025 
1026   return;
1027 }
1028 
1029 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
1030 
1031 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const1032 SimulatedBlock::sendSignal(BlockReference ref,
1033 			   GlobalSignalNumber gsn,
1034 			   Signal* signal,
1035 			   Uint32 length,
1036 			   JobBufferLevel jobBuffer,
1037 			   LinearSectionPtr ptr[3],
1038 			   Uint32 noOfSections) const {
1039 
1040   BlockReference sendBRef = reference();
1041 
1042   Uint32 recBlock = refToBlock(ref);
1043   Uint32 recNode   = refToNode(ref);
1044   Uint32 ourProcessor         = globalData.ownId;
1045 
1046 ndbrequire(signal->header.m_noOfSections == 0);
1047   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1048 
1049   signal->header.theLength = length;
1050   signal->header.theVerId_signalNumber = gsn;
1051   signal->header.theReceiversBlockNumber = recBlock;
1052   signal->header.m_noOfSections = noOfSections;
1053 
1054   Uint32 tSignalId = signal->header.theSignalId;
1055   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1056 
1057   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1058     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1059     return;
1060   }//if
1061 #ifdef VM_TRACE
1062   if(globalData.testOn){
1063     Uint16 proc =
1064       (recNode == 0 ? globalData.ownId : recNode);
1065     signal->header.theSendersBlockRef = sendBRef;
1066     globalSignalLoggers.sendSignal(signal->header,
1067 				   jobBuffer,
1068 				   &signal->theData[0],
1069 				   proc,
1070                                    ptr, noOfSections);
1071   }
1072 #endif
1073 
1074   if(recNode == ourProcessor || recNode == 0) {
1075     signal->header.theSendersSignalId = tSignalId;
1076     signal->header.theSendersBlockRef = sendBRef;
1077 
1078     /**
1079      * We have to copy the data
1080      */
1081     bool ok = true;
1082     Ptr<SectionSegment> segptr[3];
1083     for(Uint32 i = 0; i<noOfSections; i++){
1084       ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
1085       signal->theData[length+i] = segptr[i].i;
1086     }
1087 
1088     if (unlikely(! ok))
1089     {
1090       handle_out_of_longsignal_memory(signal);
1091     }
1092 
1093 #ifdef NDBD_MULTITHREADED
1094     if (jobBuffer == JBB)
1095       sendlocal(m_threadId, &signal->header, signal->theData,
1096                 signal->theData+length);
1097     else
1098       sendprioa(m_threadId, &signal->header, signal->theData,
1099                 signal->theData+length);
1100 #else
1101     globalScheduler.execute(signal, jobBuffer, recBlock,
1102 			    gsn);
1103 #endif
1104     signal->header.m_noOfSections = 0;
1105     return;
1106   } else {
1107     // send distributed Signal
1108     SignalHeader sh;
1109 
1110     Uint32 tTrace = signal->getTrace();
1111     Uint32 noOfSections = signal->header.m_noOfSections;
1112 
1113     sh.theVerId_signalNumber   = gsn;
1114     sh.theReceiversBlockNumber = recBlock;
1115     sh.theSendersBlockRef      = refToBlock(sendBRef);
1116     sh.theLength               = length;
1117     sh.theTrace                = tTrace;
1118     sh.theSignalId             = tSignalId;
1119     sh.m_noOfSections          = noOfSections;
1120     sh.m_fragmentInfo          = tFragInfo;
1121 
1122 #ifdef TRACE_DISTRIBUTED
1123     ndbout_c("send: %s(%d) to (%s, %d)",
1124 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1125 	     recNode);
1126 #endif
1127 
1128     SendStatus ss;
1129 #ifdef NDBD_MULTITHREADED
1130     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1131                         recNode, ptr);
1132 #else
1133     TrpId trp_id = 0;
1134     ss = globalTransporterRegistry.
1135            prepareSend(getNonMTTransporterSendHandle(),
1136                        &sh, jobBuffer,
1137                        &signal->theData[0], recNode, trp_id,
1138                        ptr);
1139 #endif
1140 
1141     if (unlikely(! (ss == SEND_OK ||
1142                     ss == SEND_BLOCKED ||
1143                     ss == SEND_DISCONNECTED)))
1144     {
1145       handle_send_failed(ss, signal, recNode, ptr);
1146     }
1147   }
1148 
1149   signal->header.m_noOfSections = 0;
1150   signal->header.m_fragmentInfo = 0;
1151   return;
1152 }
1153 
1154 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const1155 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1156 			   GlobalSignalNumber gsn,
1157 			   Signal* signal,
1158 			   Uint32 length,
1159 			   JobBufferLevel jobBuffer,
1160 			   LinearSectionPtr ptr[3],
1161 			   Uint32 noOfSections) const {
1162 
1163   Uint32 tSignalId = signal->header.theSignalId;
1164   Uint32 tTrace    = signal->getTrace();
1165   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1166 
1167   Uint32 ourProcessor = globalData.ownId;
1168   Uint32 recBlock = rg.m_block;
1169 
1170 ndbrequire(signal->header.m_noOfSections == 0);
1171   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1172 
1173   signal->header.theLength = length;
1174   signal->header.theVerId_signalNumber = gsn;
1175   signal->header.theReceiversBlockNumber = recBlock;
1176   signal->header.theSendersSignalId = tSignalId;
1177   signal->header.theSendersBlockRef = reference();
1178   signal->header.m_noOfSections = noOfSections;
1179 
1180   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1181     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1182     return;
1183   }//if
1184 
1185   SignalHeader sh;
1186   sh.theVerId_signalNumber   = gsn;
1187   sh.theReceiversBlockNumber = recBlock;
1188   sh.theSendersBlockRef      = refToBlock(reference());
1189   sh.theLength               = length;
1190   sh.theTrace                = tTrace;
1191   sh.theSignalId             = tSignalId;
1192   sh.m_noOfSections          = noOfSections;
1193   sh.m_fragmentInfo          = tFragInfo;
1194 
1195   /**
1196    * Check own node
1197    */
1198   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
1199 #ifdef VM_TRACE
1200     if(globalData.testOn){
1201       globalSignalLoggers.sendSignal(signal->header,
1202 				     jobBuffer,
1203 				     &signal->theData[0],
1204 				     ourProcessor,
1205                                      ptr, noOfSections);
1206     }
1207 #endif
1208     /**
1209      * We have to copy the data
1210      */
1211     bool ok = true;
1212     Ptr<SectionSegment> segptr[3];
1213     for(Uint32 i = 0; i<noOfSections; i++){
1214       ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
1215       signal->theData[length+i] = segptr[i].i;
1216     }
1217 
1218     if (unlikely(! ok))
1219     {
1220       handle_out_of_longsignal_memory(signal);
1221     }
1222 
1223 #ifdef NDBD_MULTITHREADED
1224     if (jobBuffer == JBB)
1225       sendlocal(m_threadId, &signal->header, signal->theData,
1226                 signal->theData + length);
1227     else
1228       sendprioa(m_threadId, &signal->header, signal->theData,
1229                 signal->theData + length);
1230 #else
1231     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1232 #endif
1233 
1234     rg.m_nodes.clear((Uint32)0);
1235     rg.m_nodes.clear(ourProcessor);
1236   }
1237 
1238   /**
1239    * Do the big loop
1240    */
1241   Uint32 recNode = 0;
1242   while(!rg.m_nodes.isclear()){
1243     recNode = rg.m_nodes.find(recNode + 1);
1244     rg.m_nodes.clear(recNode);
1245 
1246 #ifdef VM_TRACE
1247     if(globalData.testOn){
1248       globalSignalLoggers.sendSignal(signal->header,
1249 				     jobBuffer,
1250 				     &signal->theData[0],
1251 				     recNode,
1252                                      ptr, noOfSections);
1253     }
1254 #endif
1255 
1256 #ifdef TRACE_DISTRIBUTED
1257     ndbout_c("send: %s(%d) to (%s, %d)",
1258 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1259 	     recNode);
1260 #endif
1261 
1262     SendStatus ss;
1263 #ifdef NDBD_MULTITHREADED
1264     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1265                         recNode, ptr);
1266 #else
1267     TrpId trp_id = 0;
1268     ss = globalTransporterRegistry.
1269            prepareSend(getNonMTTransporterSendHandle(),
1270                        &sh, jobBuffer,
1271                        &signal->theData[0], recNode, trp_id,
1272                        ptr);
1273 #endif
1274 
1275     if (unlikely(! (ss == SEND_OK ||
1276                     ss == SEND_BLOCKED ||
1277                     ss == SEND_DISCONNECTED)))
1278     {
1279       handle_send_failed(ss, signal, recNode, ptr);
1280     }
1281   }
1282 
1283   signal->header.m_noOfSections = 0;
1284   signal->header.m_fragmentInfo = 0;
1285 
1286   return;
1287 }
1288 
1289 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1290 SimulatedBlock::sendSignal(BlockReference ref,
1291 			   GlobalSignalNumber gsn,
1292 			   Signal* signal,
1293 			   Uint32 length,
1294 			   JobBufferLevel jobBuffer,
1295 			   SectionHandle* sections) const {
1296 
1297   Uint32 noOfSections = sections->m_cnt;
1298   BlockReference sendBRef = reference();
1299 
1300   Uint32 recBlock = refToBlock(ref);
1301   Uint32 recNode   = refToNode(ref);
1302   Uint32 ourProcessor         = globalData.ownId;
1303 
1304 ndbrequire(signal->header.m_noOfSections == 0);
1305   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1306 
1307   signal->header.theLength = length;
1308   signal->header.theVerId_signalNumber = gsn;
1309   signal->header.theReceiversBlockNumber = recBlock;
1310   signal->header.m_noOfSections = noOfSections;
1311 
1312   Uint32 tSignalId = signal->header.theSignalId;
1313   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1314 
1315   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1316     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1317     return;
1318   }//if
1319 #ifdef VM_TRACE
1320   if(globalData.testOn){
1321     Uint16 proc =
1322       (recNode == 0 ? globalData.ownId : recNode);
1323     signal->header.theSendersBlockRef = sendBRef;
1324     globalSignalLoggers.sendSignal(signal->header,
1325 				   jobBuffer,
1326 				   &signal->theData[0],
1327 				   proc,
1328                                    sections->m_ptr, noOfSections);
1329   }
1330 #endif
1331 
1332   if(recNode == ourProcessor || recNode == 0) {
1333     signal->header.theSendersSignalId = tSignalId;
1334     signal->header.theSendersBlockRef = sendBRef;
1335 
1336     /**
1337      * We have to copy the data
1338      */
1339     Uint32 * dst = signal->theData + length;
1340     * dst ++ = sections->m_ptr[0].i;
1341     * dst ++ = sections->m_ptr[1].i;
1342     * dst ++ = sections->m_ptr[2].i;
1343 
1344 #ifdef NDBD_MULTITHREADED
1345     if (jobBuffer == JBB)
1346       sendlocal(m_threadId, &signal->header, signal->theData,
1347                 signal->theData + length);
1348     else
1349       sendprioa(m_threadId, &signal->header, signal->theData,
1350                 signal->theData + length);
1351 #else
1352     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1353 #endif
1354   } else {
1355     // send distributed Signal
1356     SignalHeader sh;
1357 
1358     Uint32 tTrace = signal->getTrace();
1359 
1360     sh.theVerId_signalNumber   = gsn;
1361     sh.theReceiversBlockNumber = recBlock;
1362     sh.theSendersBlockRef      = refToBlock(sendBRef);
1363     sh.theLength               = length;
1364     sh.theTrace                = tTrace;
1365     sh.theSignalId             = tSignalId;
1366     sh.m_noOfSections          = noOfSections;
1367     sh.m_fragmentInfo          = tFragInfo;
1368 
1369 #ifdef TRACE_DISTRIBUTED
1370     ndbout_c("send: %s(%d) to (%s, %d)",
1371 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1372 	     recNode);
1373 #endif
1374 
1375     SendStatus ss;
1376 #ifdef NDBD_MULTITHREADED
1377     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1378                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1379 #else
1380     TrpId trp_id = 0;
1381     ss = globalTransporterRegistry.
1382            prepareSend(getNonMTTransporterSendHandle(),
1383                        &sh, jobBuffer,
1384                        &signal->theData[0], recNode, trp_id,
1385                        g_sectionSegmentPool, sections->m_ptr);
1386 #endif
1387 
1388     if (unlikely(! (ss == SEND_OK ||
1389                     ss == SEND_BLOCKED ||
1390                     ss == SEND_DISCONNECTED)))
1391     {
1392       handle_send_failed(ss, signal, recNode, sections->m_ptr);
1393     }
1394 
1395     ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1396   }
1397 
1398   signal->header.m_noOfSections = 0;
1399   signal->header.m_fragmentInfo = 0;
1400   sections->m_cnt = 0;
1401   return;
1402 }
1403 
1404 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1405 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1406 			   GlobalSignalNumber gsn,
1407 			   Signal* signal,
1408 			   Uint32 length,
1409 			   JobBufferLevel jobBuffer,
1410 			   SectionHandle * sections) const {
1411 
1412   Uint32 noOfSections = sections->m_cnt;
1413   Uint32 tSignalId = signal->header.theSignalId;
1414   Uint32 tTrace    = signal->getTrace();
1415   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1416 
1417   Uint32 ourProcessor = globalData.ownId;
1418   Uint32 recBlock = rg.m_block;
1419 
1420 ndbrequire(signal->header.m_noOfSections == 0);
1421   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1422 
1423   signal->header.theLength = length;
1424   signal->header.theVerId_signalNumber = gsn;
1425   signal->header.theReceiversBlockNumber = recBlock;
1426   signal->header.theSendersSignalId = tSignalId;
1427   signal->header.theSendersBlockRef = reference();
1428   signal->header.m_noOfSections = noOfSections;
1429 
1430   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1431     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1432     return;
1433   }//if
1434 
1435   SignalHeader sh;
1436   sh.theVerId_signalNumber   = gsn;
1437   sh.theReceiversBlockNumber = recBlock;
1438   sh.theSendersBlockRef      = refToBlock(reference());
1439   sh.theLength               = length;
1440   sh.theTrace                = tTrace;
1441   sh.theSignalId             = tSignalId;
1442   sh.m_noOfSections          = noOfSections;
1443   sh.m_fragmentInfo          = tFragInfo;
1444 
1445   /**
1446    * Check own node
1447    */
1448   bool release = true;
1449   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1450   {
1451     release = false;
1452 #ifdef VM_TRACE
1453     if(globalData.testOn){
1454       globalSignalLoggers.sendSignal(signal->header,
1455 				     jobBuffer,
1456 				     &signal->theData[0],
1457 				     ourProcessor,
1458                                      sections->m_ptr, noOfSections);
1459     }
1460 #endif
1461     /**
1462      * We have to copy the data
1463      */
1464     Uint32 * dst = signal->theData + length;
1465     * dst ++ = sections->m_ptr[0].i;
1466     * dst ++ = sections->m_ptr[1].i;
1467     * dst ++ = sections->m_ptr[2].i;
1468 #ifdef NDBD_MULTITHREADED
1469     if (jobBuffer == JBB)
1470       sendlocal(m_threadId, &signal->header, signal->theData,
1471                 signal->theData + length);
1472     else
1473       sendprioa(m_threadId, &signal->header, signal->theData,
1474                 signal->theData + length);
1475 #else
1476     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1477 #endif
1478 
1479     rg.m_nodes.clear((Uint32)0);
1480     rg.m_nodes.clear(ourProcessor);
1481   }
1482 
1483   /**
1484    * Do the big loop
1485    */
1486   Uint32 recNode = 0;
1487   while(!rg.m_nodes.isclear()){
1488     recNode = rg.m_nodes.find(recNode + 1);
1489     rg.m_nodes.clear(recNode);
1490 
1491 #ifdef VM_TRACE
1492     if(globalData.testOn){
1493       globalSignalLoggers.sendSignal(signal->header,
1494 				     jobBuffer,
1495 				     &signal->theData[0],
1496 				     recNode,
1497                                      sections->m_ptr, noOfSections);
1498     }
1499 #endif
1500 
1501 #ifdef TRACE_DISTRIBUTED
1502     ndbout_c("send: %s(%d) to (%s, %d)",
1503 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1504 	     recNode);
1505 #endif
1506 
1507     SendStatus ss;
1508 #ifdef NDBD_MULTITHREADED
1509     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1510                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1511 #else
1512     TrpId trp_id = 0;
1513     ss = globalTransporterRegistry.
1514            prepareSend(getNonMTTransporterSendHandle(),
1515                        &sh, jobBuffer,
1516                        &signal->theData[0], recNode, trp_id,
1517                        g_sectionSegmentPool, sections->m_ptr);
1518 #endif
1519 
1520     if (unlikely(! (ss == SEND_OK ||
1521                     ss == SEND_BLOCKED ||
1522                     ss == SEND_DISCONNECTED)))
1523     {
1524       handle_send_failed(ss, signal, recNode, sections->m_ptr);
1525     }
1526   }
1527 
1528   if (release)
1529   {
1530     ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1531   }
1532 
1533   sections->m_cnt = 0;
1534   signal->header.m_noOfSections = 0;
1535   signal->header.m_fragmentInfo = 0;
1536 
1537   return;
1538 }
1539 
1540 void
sendSignalNoRelease(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1541 SimulatedBlock::sendSignalNoRelease(BlockReference ref,
1542                                     GlobalSignalNumber gsn,
1543                                     Signal* signal,
1544                                     Uint32 length,
1545                                     JobBufferLevel jobBuffer,
1546                                     SectionHandle* sections) const {
1547 
1548   /**
1549    * Implementation the same as sendSignal(), except that
1550    * the sections are duplicated when sending locally, and
1551    * not released
1552    */
1553 
1554   Uint32 noOfSections = sections->m_cnt;
1555   BlockReference sendBRef = reference();
1556 
1557   Uint32 recBlock = refToBlock(ref);
1558   Uint32 recNode   = refToNode(ref);
1559   Uint32 ourProcessor         = globalData.ownId;
1560 
1561 ndbrequire(signal->header.m_noOfSections == 0);
1562   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1563 
1564   signal->header.theLength = length;
1565   signal->header.theVerId_signalNumber = gsn;
1566   signal->header.theReceiversBlockNumber = recBlock;
1567   signal->header.m_noOfSections = noOfSections;
1568 
1569   Uint32 tSignalId = signal->header.theSignalId;
1570   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1571 
1572   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1573     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1574     return;
1575   }//if
1576 #ifdef VM_TRACE
1577   if(globalData.testOn){
1578     Uint16 proc =
1579       (recNode == 0 ? globalData.ownId : recNode);
1580     signal->header.theSendersBlockRef = sendBRef;
1581     globalSignalLoggers.sendSignal(signal->header,
1582 				   jobBuffer,
1583 				   &signal->theData[0],
1584 				   proc,
1585                                    sections->m_ptr, noOfSections);
1586   }
1587 #endif
1588 
1589   if(recNode == ourProcessor || recNode == 0) {
1590     signal->header.theSendersSignalId = tSignalId;
1591     signal->header.theSendersBlockRef = sendBRef;
1592 
1593     Uint32 * dst = signal->theData + length;
1594 
1595     /* We need to copy the segmented section data into separate
1596      * sections when sending locally and keeping a copy ourselves
1597      */
1598     for (Uint32 sec=0; sec < noOfSections; sec++)
1599     {
1600       Uint32 secCopy;
1601       if (unlikely(! ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i)))
1602       {
1603         handle_out_of_longsignal_memory(signal);
1604         return;
1605       }
1606       * dst ++ = secCopy;
1607     }
1608 
1609 #ifdef NDBD_MULTITHREADED
1610     if (jobBuffer == JBB)
1611       sendlocal(m_threadId, &signal->header, signal->theData,
1612                 signal->theData + length);
1613     else
1614       sendprioa(m_threadId, &signal->header, signal->theData,
1615                 signal->theData + length);
1616 #else
1617     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1618 #endif
1619   } else {
1620     // send distributed Signal
1621     SignalHeader sh;
1622 
1623     Uint32 tTrace = signal->getTrace();
1624 
1625     sh.theVerId_signalNumber   = gsn;
1626     sh.theReceiversBlockNumber = recBlock;
1627     sh.theSendersBlockRef      = refToBlock(sendBRef);
1628     sh.theLength               = length;
1629     sh.theTrace                = tTrace;
1630     sh.theSignalId             = tSignalId;
1631     sh.m_noOfSections          = noOfSections;
1632     sh.m_fragmentInfo          = tFragInfo;
1633 
1634 #ifdef TRACE_DISTRIBUTED
1635     ndbout_c("send: %s(%d) to (%s, %d)",
1636 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1637 	     recNode);
1638 #endif
1639 
1640     SendStatus ss;
1641 #ifdef NDBD_MULTITHREADED
1642     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1643                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1644 #else
1645     TrpId trp_id = 0;
1646     ss = globalTransporterRegistry.
1647            prepareSend(getNonMTTransporterSendHandle(),
1648                        &sh, jobBuffer,
1649                        &signal->theData[0], recNode, trp_id,
1650                        g_sectionSegmentPool, sections->m_ptr);
1651 #endif
1652 
1653     if (unlikely(! (ss == SEND_OK ||
1654                     ss == SEND_BLOCKED ||
1655                     ss == SEND_DISCONNECTED)))
1656     {
1657       handle_send_failed(ss, signal, recNode, sections->m_ptr);
1658     }
1659   }
1660 
1661   signal->header.m_noOfSections = 0;
1662   signal->header.m_fragmentInfo = 0;
1663   return;
1664 }
1665 
1666 void
sendSignalNoRelease(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1667 SimulatedBlock::sendSignalNoRelease(NodeReceiverGroup rg,
1668                                     GlobalSignalNumber gsn,
1669                                     Signal* signal,
1670                                     Uint32 length,
1671                                     JobBufferLevel jobBuffer,
1672                                     SectionHandle * sections) const {
1673   /**
1674    * Implementation the same as sendSignal(), except that
1675    * the sections are duplicated when sending locally, and
1676    * not released
1677    */
1678 
1679   Uint32 noOfSections = sections->m_cnt;
1680   Uint32 tSignalId = signal->header.theSignalId;
1681   Uint32 tTrace    = signal->getTrace();
1682   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1683 
1684   Uint32 ourProcessor = globalData.ownId;
1685   Uint32 recBlock = rg.m_block;
1686 
1687 ndbrequire(signal->header.m_noOfSections == 0);
1688   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1689 
1690   signal->header.theLength = length;
1691   signal->header.theVerId_signalNumber = gsn;
1692   signal->header.theReceiversBlockNumber = recBlock;
1693   signal->header.theSendersSignalId = tSignalId;
1694   signal->header.theSendersBlockRef = reference();
1695   signal->header.m_noOfSections = noOfSections;
1696 
1697   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1698     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1699     return;
1700   }//if
1701 
1702   SignalHeader sh;
1703   sh.theVerId_signalNumber   = gsn;
1704   sh.theReceiversBlockNumber = recBlock;
1705   sh.theSendersBlockRef      = refToBlock(reference());
1706   sh.theLength               = length;
1707   sh.theTrace                = tTrace;
1708   sh.theSignalId             = tSignalId;
1709   sh.m_noOfSections          = noOfSections;
1710   sh.m_fragmentInfo          = tFragInfo;
1711 
1712   /**
1713    * Check own node
1714    */
1715   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1716   {
1717 #ifdef VM_TRACE
1718     if(globalData.testOn){
1719       globalSignalLoggers.sendSignal(signal->header,
1720 				     jobBuffer,
1721 				     &signal->theData[0],
1722 				     ourProcessor,
1723                                      sections->m_ptr, noOfSections);
1724     }
1725 #endif
1726 
1727     Uint32 * dst = signal->theData + length;
1728 
1729     /* We need to copy the segmented section data into separate
1730      * sections when sending locally and keeping a copy ourselves
1731      */
1732     for (Uint32 sec=0; sec < noOfSections; sec++)
1733     {
1734       Uint32 secCopy;
1735       if (unlikely(! ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i)))
1736       {
1737         handle_out_of_longsignal_memory(signal);
1738         return;
1739       }
1740       * dst ++ = secCopy;
1741     }
1742 
1743 #ifdef NDBD_MULTITHREADED
1744     if (jobBuffer == JBB)
1745       sendlocal(m_threadId, &signal->header, signal->theData,
1746                 signal->theData + length);
1747     else
1748       sendprioa(m_threadId, &signal->header, signal->theData,
1749                 signal->theData + length);
1750 #else
1751     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1752 #endif
1753 
1754     rg.m_nodes.clear((Uint32)0);
1755     rg.m_nodes.clear(ourProcessor);
1756   }
1757 
1758   /**
1759    * Do the big loop
1760    */
1761   Uint32 recNode = 0;
1762   while(!rg.m_nodes.isclear()){
1763     recNode = rg.m_nodes.find(recNode + 1);
1764     rg.m_nodes.clear(recNode);
1765 
1766 #ifdef VM_TRACE
1767     if(globalData.testOn){
1768       globalSignalLoggers.sendSignal(signal->header,
1769 				     jobBuffer,
1770 				     &signal->theData[0],
1771 				     recNode,
1772                                      sections->m_ptr, noOfSections);
1773     }
1774 #endif
1775 
1776 #ifdef TRACE_DISTRIBUTED
1777     ndbout_c("send: %s(%d) to (%s, %d)",
1778 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1779 	     recNode);
1780 #endif
1781 
1782     SendStatus ss;
1783 #ifdef NDBD_MULTITHREADED
1784     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1785                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1786 #else
1787     TrpId trp_id = 0;
1788     ss = globalTransporterRegistry.
1789            prepareSend(getNonMTTransporterSendHandle(),
1790                        &sh, jobBuffer,
1791                        &signal->theData[0], recNode, trp_id,
1792                        g_sectionSegmentPool, sections->m_ptr);
1793 #endif
1794 
1795     if (unlikely(! (ss == SEND_OK ||
1796                     ss == SEND_BLOCKED ||
1797                     ss == SEND_DISCONNECTED)))
1798     {
1799       handle_send_failed(ss, signal, recNode, sections->m_ptr);
1800     }
1801   }
1802 
1803   signal->header.m_noOfSections = 0;
1804   signal->header.m_fragmentInfo = 0;
1805 
1806   return;
1807 }
1808 
1809 
1810 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length) const1811 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1812 				    GlobalSignalNumber gsn,
1813 				    Signal* signal,
1814 				    Uint32 delayInMilliSeconds,
1815 				    Uint32 length) const {
1816 
1817   BlockNumber bnr = refToBlock(ref);
1818 
1819 ndbrequire(signal->header.m_noOfSections == 0);
1820   check_sections(signal, signal->header.m_noOfSections, 0);
1821 
1822   signal->header.theLength = length;
1823   signal->header.theSendersSignalId = signal->header.theSignalId;
1824   signal->header.theVerId_signalNumber = gsn;
1825   signal->header.theReceiversBlockNumber = bnr;
1826   signal->header.theSendersBlockRef = reference();
1827 
1828   assert(length <= 25);
1829 #ifdef VM_TRACE
1830   {
1831     if(globalData.testOn){
1832       globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1833 					      signal->header,
1834 					      0,
1835 					      &signal->theData[0],
1836 					      globalData.ownId);
1837     }
1838   }
1839 #endif
1840 
1841 #ifdef NDBD_MULTITHREADED
1842   senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1843 #else
1844   globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1845 #endif
1846 
1847   // befor 2nd parameter to globalTimeQueue.insert
1848   // (Priority)theSendSig[sigIndex].jobBuffer
1849 }
1850 
1851 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length,SectionHandle * sections) const1852 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1853 				    GlobalSignalNumber gsn,
1854 				    Signal* signal,
1855 				    Uint32 delayInMilliSeconds,
1856 				    Uint32 length,
1857 				    SectionHandle * sections) const {
1858 
1859   Uint32 noOfSections = sections->m_cnt;
1860   BlockNumber bnr = refToBlock(ref);
1861 
1862   BlockReference sendBRef = reference();
1863 
1864   if (bnr == 0) {
1865     bnr_error();
1866   }//if
1867 
1868 ndbrequire(signal->header.m_noOfSections == 0);
1869   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1870 
1871   signal->header.theLength = length;
1872   signal->header.theSendersSignalId = signal->header.theSignalId;
1873   signal->header.theSendersBlockRef = sendBRef;
1874   signal->header.theVerId_signalNumber = gsn;
1875   signal->header.theReceiversBlockNumber = bnr;
1876   signal->header.m_noOfSections = noOfSections;
1877 
1878   assert(length + noOfSections <= 25);
1879   Uint32 * dst = signal->theData + length;
1880   * dst ++ = sections->m_ptr[0].i;
1881   * dst ++ = sections->m_ptr[1].i;
1882   * dst ++ = sections->m_ptr[2].i;
1883 
1884 #ifdef VM_TRACE
1885   {
1886     if(globalData.testOn){
1887       globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1888 					      signal->header,
1889 					      0,
1890 					      &signal->theData[0],
1891 					      globalData.ownId);
1892     }
1893   }
1894 #endif
1895 
1896 #ifdef NDBD_MULTITHREADED
1897   senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1898 #else
1899   globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1900 #endif
1901 
1902   signal->header.m_noOfSections = 0;
1903   signal->header.m_fragmentInfo = 0;
1904   sections->m_cnt = 0;
1905 }
1906 
1907 void
release(SegmentedSectionPtr & ptr)1908 SimulatedBlock::release(SegmentedSectionPtr & ptr)
1909 {
1910   ::release(SB_SP_ARG ptr);
1911 }
1912 
1913 void
releaseSection(Uint32 firstSegmentIVal)1914 SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
1915 {
1916   ::releaseSection(SB_SP_ARG firstSegmentIVal);
1917 }
1918 
1919 void
releaseSections(SectionHandle & handle)1920 SimulatedBlock::releaseSections(SectionHandle& handle)
1921 {
1922   ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
1923   handle.m_cnt = 0;
1924 }
1925 
1926 bool
appendToSection(Uint32 & firstSegmentIVal,const Uint32 * src,Uint32 len)1927 SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
1928 {
1929   return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
1930 }
1931 
1932 bool
import(Ptr<SectionSegment> & first,const Uint32 * src,Uint32 len)1933 SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len)
1934 {
1935   return ::import(SB_SP_ARG first, src, len);
1936 }
1937 
1938 bool
import(SegmentedSectionPtr & ptr,const Uint32 * src,Uint32 len) const1939 SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len) const
1940 {
1941   Ptr<SectionSegment> tmp;
1942   if (::import(SB_SP_ARG tmp, src, len))
1943   {
1944     ptr.i = tmp.i;
1945     ptr.p = tmp.p;
1946     ptr.sz = len;
1947     return true;
1948   }
1949   return false;
1950 }
1951 
1952 bool
import(SectionHandle * dst,LinearSectionPtr src[3],Uint32 cnt)1953 SimulatedBlock::import(SectionHandle * dst,
1954                        LinearSectionPtr src[3],
1955                        Uint32 cnt)
1956 {
1957   ndbassert(dst->m_cnt == 0);
1958   if (dst->m_cnt)
1959   {
1960     releaseSections(* dst);
1961   }
1962 
1963   for (Uint32 i = 0; i < cnt; i++)
1964   {
1965     if (unlikely(!import(dst->m_ptr[i], src[i].p, src[i].sz)))
1966     {
1967       if (i)
1968       {
1969         dst->m_cnt = i - 1;
1970         releaseSections(* dst);
1971         return false;
1972       }
1973     }
1974   }
1975   dst->m_cnt = cnt;
1976   return true;
1977 }
1978 
1979 
1980 bool
dupSection(Uint32 & copyFirstIVal,Uint32 srcFirstIVal)1981 SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
1982 {
1983   return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
1984 }
1985 
1986 bool
writeToSection(Uint32 firstSegmentIVal,Uint32 offset,const Uint32 * src,Uint32 len)1987 SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32 offset,
1988                                const Uint32* src, Uint32 len)
1989 {
1990   return ::writeToSection(firstSegmentIVal, offset, src, len);
1991 }
1992 
1993 class SectionSegmentPool&
getSectionSegmentPool()1994 SimulatedBlock::getSectionSegmentPool(){
1995   return g_sectionSegmentPool;
1996 }
1997 
1998 NewVARIABLE *
allocateBat(int batSize)1999 SimulatedBlock::allocateBat(int batSize){
2000   NewVARIABLE* bat = NewVarRef;
2001   bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE));
2002   NewVarRef = bat;
2003   theBATSize = batSize;
2004   return bat;
2005 }
2006 
2007 void
freeBat()2008 SimulatedBlock::freeBat(){
2009   if(NewVarRef != 0){
2010     free(NewVarRef);
2011     NewVarRef = 0;
2012   }
2013 }
2014 
2015 const NewVARIABLE *
getBat(Uint16 blockNo,Uint32 instanceNo)2016 SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
2017   assert(blockNo == blockToMain(blockNo));
2018   SimulatedBlock * sb = globalData.getBlock(blockNo);
2019   if (sb != 0 && instanceNo != 0)
2020     sb = sb->getInstance(instanceNo);
2021   if(sb == 0)
2022     return 0;
2023   return sb->NewVarRef;
2024 }
2025 
2026 Uint16
getBatSize(Uint16 blockNo,Uint32 instanceNo)2027 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
2028   assert(blockNo == blockToMain(blockNo));
2029   SimulatedBlock * sb = globalData.getBlock(blockNo);
2030   if (sb != 0 && instanceNo != 0)
2031     sb = sb->getInstance(instanceNo);
2032   if(sb == 0)
2033     return 0;
2034   return sb->theBATSize;
2035 }
2036 
allocRecord(const char * type,size_t s,size_t n,bool clear,Uint32 paramId)2037 void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
2038 {
2039   return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
2040 }
2041 
2042 void*
allocRecordAligned(const char * type,size_t s,size_t n,void ** unaligned_buffer,Uint32 align,bool clear,Uint32 paramId)2043 SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
2044 {
2045 
2046   void * p = NULL;
2047   Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
2048   size_t size = n*s + over_alloc;
2049   Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
2050   refresh_watch_dog(9);
2051   if (real_size > 0){
2052 #if defined(VM_TRACE_MEM) || defined(VM_TRACE) || defined(ERROR_INSERT)
2053     g_eventLogger->info("%s::allocRecord(%s, %zu, %zu) = %llu bytes",
2054 	                getBlockName(number()),
2055 	                type,
2056 	                s,
2057 	                n,
2058 	                real_size);
2059 #endif
2060     if( real_size == (Uint64)size )
2061       p = ndbd_malloc_watched(size, get_watch_dog());
2062     if (p == NULL){
2063       char buf1[255];
2064       char buf2[255];
2065       struct ndb_mgm_param_info param_info;
2066       size_t size = sizeof(ndb_mgm_param_info);
2067 
2068       if(0 != paramId && 0 == ndb_mgm_get_db_parameter_info(paramId, &param_info, &size)) {
2069         BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s",
2070 	         getBlockName(number()), param_info.m_name);
2071       } else {
2072         BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
2073 	         getBlockName(number()), type);
2074       }
2075       BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes",
2076 	       (Uint32)s, (Uint32)n, (Uint64)real_size);
2077       ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
2078     }
2079 
2080     if(clear){
2081       char * ptr = (char*)p;
2082       const Uint32 chunk = 128 * 1024;
2083       while(size > chunk){
2084 	refresh_watch_dog(9);
2085 	memset(ptr, 0, chunk);
2086 	ptr += chunk;
2087 	size -= chunk;
2088       }
2089       refresh_watch_dog(9);
2090       memset(ptr, 0, size);
2091     }
2092     if (unaligned_buffer)
2093     {
2094       *unaligned_buffer = p;
2095       p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
2096 #ifdef VM_TRACE
2097       g_eventLogger->info("'%s' (%u) %llu %llu, alignment correction %u bytes",
2098                           type, align, (Uint64)p, (Uint64)p+n*s,
2099                           (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
2100 #endif
2101     }
2102   }
2103   return p;
2104 }
2105 
2106 void
deallocRecord(void ** ptr,const char * type,size_t s,size_t n)2107 SimulatedBlock::deallocRecord(void ** ptr,
2108 			      const char * type, size_t s, size_t n){
2109   (void)type;
2110 
2111   if(* ptr != 0){
2112       ndbd_free(* ptr, n*s);
2113     * ptr = 0;
2114   }
2115 }
2116 
2117 int
sortchunks(const void * _e0,const void * _e1)2118 SimulatedBlock::sortchunks(const void * _e0, const void * _e1)
2119 {
2120   const AllocChunk *p0 = (const AllocChunk*)_e0;
2121   const AllocChunk *p1 = (const AllocChunk*)_e1;
2122 
2123   if (p0->ptrI > p1->ptrI)
2124     return 1;
2125   if (p0->ptrI < p1->ptrI)
2126     return -1;
2127   return 0;
2128 }
2129 
2130 Uint32
allocChunks(AllocChunk dst[],Uint32 arraysize,Uint32 rg,Uint32 pages,Uint32 paramId)2131 SimulatedBlock::allocChunks(AllocChunk dst[],
2132                             Uint32 arraysize,
2133                             Uint32 rg,
2134                             Uint32 pages,
2135                             Uint32 paramId)
2136 {
2137   const Uint32 save = pages; // For fail
2138   Uint32 i = 0;
2139   for (; i<arraysize && pages > 0; i++)
2140   {
2141     Uint32 cnt = pages;
2142     m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1);
2143     if (unlikely(cnt == 0))
2144       goto fail;
2145     pages -= cnt;
2146     dst[i].cnt = cnt;
2147   }
2148   if (unlikely(pages != 0))
2149     goto fail;
2150 
2151   qsort(dst, i, sizeof(dst[0]), sortchunks);
2152   return i;
2153 
2154 fail:
2155   char buf1[255];
2156   char buf2[255];
2157   struct ndb_mgm_param_info param_info;
2158   size_t size = sizeof(ndb_mgm_param_info);
2159 
2160   if (ndb_mgm_get_db_parameter_info(paramId, &param_info, &size) != 0)
2161   {
2162     ndbassert(false);
2163     param_info.m_name = "<unknown>";
2164   }
2165 
2166   BaseString::snprintf(buf1, sizeof(buf1),
2167                        "%s could not allocate memory for parameter %s",
2168                        getBlockName(number()), param_info.m_name);
2169   BaseString::snprintf(buf2, sizeof(buf2), "Requested: %llu bytes",
2170                        Uint64(save) * sizeof(GlobalPage));
2171   ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
2172   return 0;
2173 }
2174 
2175 void
refresh_watch_dog(Uint32 place)2176 SimulatedBlock::refresh_watch_dog(Uint32 place)
2177 {
2178 #ifdef NDBD_MULTITHREADED
2179   (*m_watchDogCounter) = place;
2180 #else
2181   globalData.incrementWatchDogCounter(place);
2182 #endif
2183 }
2184 
2185 volatile Uint32*
get_watch_dog()2186 SimulatedBlock::get_watch_dog()
2187 {
2188 #ifdef NDBD_MULTITHREADED
2189   return m_watchDogCounter;
2190 #else
2191   return globalData.getWatchDogPtr();
2192 #endif
2193 }
2194 
2195 void
update_watch_dog_timer(Uint32 interval)2196 SimulatedBlock::update_watch_dog_timer(Uint32 interval)
2197 {
2198   extern EmulatorData globalEmulatorData;
2199   globalEmulatorData.theWatchDog->setCheckInterval(interval);
2200 }
2201 
2202 void
progError(int line,int err_code,const char * extra,const char * check) const2203 SimulatedBlock::progError(int line, int err_code, const char* extra,
2204                           const char* check) const {
2205   jamNoBlock();
2206 
2207   const char *aBlockName = getBlockName(number(), "VM Kernel");
2208 
2209   // Pack status of interesting config variables
2210   // so that we can print them in error.log
2211   int magicStatus =
2212     (m_ctx.m_config.stopOnError()<<1) +
2213     (m_ctx.m_config.getInitialStart()<<2);
2214 
2215   /* Add line number and failed expression to block name */
2216   char buf[500];
2217   /*Add the check to the log message only if default value of ""
2218     is over-written. */
2219   if(native_strcasecmp(check,"") == 0)
2220     BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x",
2221         aBlockName, line, magicStatus);
2222   else
2223     BaseString::snprintf(&buf[0], sizeof(buf),
2224         "%s (Line: %d) 0x%.8x Check %.400s failed", aBlockName,
2225         line, magicStatus, check);
2226 
2227   ErrorReporter::handleError(err_code, extra, buf);
2228 
2229 }
2230 
2231 #define MAX_EVENT_REP_SIZE_BYTES (MAX_EVENT_REP_SIZE_WORDS * 4)
2232 void
infoEvent(const char * msg,...) const2233 SimulatedBlock::infoEvent(const char * msg, ...) const
2234 {
2235   if(msg == 0)
2236     return;
2237 
2238   SignalT<25> signalT;
2239   signalT.theData[0] = NDB_LE_InfoEvent;
2240   Uint32 buf_str[MAX_EVENT_REP_SIZE_WORDS];
2241   char * buf = (char *)&buf_str[1];
2242 
2243   buf_str[0] = signalT.theData[0];
2244   va_list ap;
2245   va_start(ap, msg);
2246   BaseString::vsnprintf(buf, MAX_EVENT_REP_SIZE_BYTES - 5, msg, ap);
2247   va_end(ap);
2248 
2249   size_t len = strlen(buf) + 1;
2250   if(len >= (MAX_EVENT_REP_SIZE_BYTES - 5))
2251   {
2252     len = MAX_EVENT_REP_SIZE_BYTES - 4;
2253     buf[MAX_EVENT_REP_SIZE_BYTES - 5] = 0;
2254   }
2255 
2256   SegmentedSectionPtr segptr;
2257   Uint32 len_words = 1 + ((len + 3) / 4);
2258   bool ok = import(segptr,
2259                    &buf_str[0],
2260                    len_words);
2261   signalT.theData[1] = segptr.i;
2262   if (!ok)
2263   {
2264     return;
2265   }
2266   /**
2267    * Init and put it into the job buffer
2268    */
2269   memset(&signalT.header, 0, sizeof(SignalHeader));
2270 
2271   const Signal * signal = globalScheduler.getVMSignals();
2272   Uint32 tTrace = signal->header.theTrace;
2273   Uint32 tSignalId = signal->header.theSignalId;
2274 
2275   signalT.header.theVerId_signalNumber   = GSN_EVENT_REP;
2276   signalT.header.theReceiversBlockNumber = CMVMI;
2277   signalT.header.theSendersBlockRef      = reference();
2278   signalT.header.theTrace                = tTrace;
2279   signalT.header.theSignalId             = tSignalId;
2280   signalT.header.theLength               = 1;
2281   signalT.header.m_noOfSections          = 1;
2282 #ifdef NDBD_MULTITHREADED
2283   sendlocal(m_threadId,
2284             &signalT.header, signalT.theData, signalT.theData + 1);
2285 #else
2286   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
2287                           signalT.theData + 1);
2288 #endif
2289 }
2290 
2291 void
warningEvent(const char * msg,...)2292 SimulatedBlock::warningEvent(const char * msg, ...)
2293 {
2294   if(msg == 0)
2295     return;
2296 
2297   SignalT<25> signalT;
2298   signalT.theData[0] = NDB_LE_WarningEvent;
2299   Uint32 buf_str[MAX_EVENT_REP_SIZE_WORDS];
2300   char * buf = (char *)&buf_str[1];
2301   memset(&buf_str[0], 0, 4);
2302 
2303   va_list ap;
2304   va_start(ap, msg);
2305   BaseString::vsnprintf(buf, MAX_EVENT_REP_SIZE_BYTES - 5, msg, ap);
2306   va_end(ap);
2307 
2308   size_t len = strlen(buf) + 1;
2309   if(len >= (MAX_EVENT_REP_SIZE_BYTES - 5))
2310   {
2311     len = MAX_EVENT_REP_SIZE_BYTES - 4;
2312     buf[MAX_EVENT_REP_SIZE_BYTES - 5] = 0;
2313   }
2314   SegmentedSectionPtr segptr;
2315   Uint32 len_words = 1 + ((len + 3) / 4);
2316   bool ok = import(segptr,
2317                    &buf_str[0],
2318                    len_words);
2319   signalT.theData[1] = segptr.i;
2320   if (!ok)
2321   {
2322     return;
2323   }
2324 
2325   /**
2326    * Init and put it into the job buffer
2327    */
2328   memset(&signalT.header, 0, sizeof(SignalHeader));
2329 
2330   const Signal * signal = globalScheduler.getVMSignals();
2331   Uint32 tTrace = signal->header.theTrace;
2332   Uint32 tSignalId = signal->header.theSignalId;
2333 
2334   signalT.header.theVerId_signalNumber   = GSN_EVENT_REP;
2335   signalT.header.theReceiversBlockNumber = CMVMI;
2336   signalT.header.theSendersBlockRef      = reference();
2337   signalT.header.theTrace                = tTrace;
2338   signalT.header.theSignalId             = tSignalId;
2339   signalT.header.theLength               = 1;
2340   signalT.header.m_noOfSections          = 1;
2341 
2342 #ifdef NDBD_MULTITHREADED
2343   sendlocal(m_threadId,
2344             &signalT.header, signalT.theData, signalT.theData + 1);
2345 #else
2346   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
2347                           signalT.theData + 1);
2348 #endif
2349 }
2350 
2351 void
execNODE_STATE_REP(Signal * signal)2352 SimulatedBlock::execNODE_STATE_REP(Signal* signal){
2353   const NodeStateRep * const  rep = (NodeStateRep *)&signal->theData[0];
2354 
2355   this->theNodeState = rep->nodeState;
2356 }
2357 
2358 void
execCHANGE_NODE_STATE_REQ(Signal * signal)2359 SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){
2360   const ChangeNodeStateReq * const  req =
2361     (ChangeNodeStateReq *)&signal->theData[0];
2362 
2363   this->theNodeState = req->nodeState;
2364   const Uint32 senderData = req->senderData;
2365   const BlockReference senderRef = req->senderRef;
2366 
2367   /**
2368    * Pack return signal
2369    */
2370   ChangeNodeStateConf * const  conf =
2371     (ChangeNodeStateConf *)&signal->theData[0];
2372 
2373   conf->senderData = senderData;
2374 
2375   sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
2376 	     ChangeNodeStateConf::SignalLength, JBB);
2377 }
2378 
2379 void
execNDB_TAMPER(Signal * signal)2380 SimulatedBlock::execNDB_TAMPER(Signal * signal){
2381   if (signal->getLength() == 1)
2382   {
2383     SET_ERROR_INSERT_VALUE(signal->theData[0]);
2384   }
2385   else
2386   {
2387     SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]);
2388   }
2389 }
2390 
2391 void
execSIGNAL_DROPPED_REP(Signal * signal)2392 SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
2393   /* Note no need for fragmented signal handling as we are
2394    * going to crash this node
2395    */
2396   char msg[64];
2397   const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
2398   BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
2399 	   rep->originalGsn, rep->originalLength,rep->originalSectionCount);
2400   ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
2401 			     msg,
2402 			     __FILE__,
2403 			     NST_ErrorHandler);
2404 }
2405 
2406 void
execCONTINUE_FRAGMENTED(Signal * signal)2407 SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
2408   jamEntry();
2409 
2410   ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2411   ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
2412 
2413   switch (sig->type)
2414   {
2415   case ContinueFragmented::CONTINUE_SENDING :
2416   {
2417     jam();
2418     Ptr<FragmentSendInfo> fragPtr;
2419 
2420     c_segmentedFragmentSendList.first(fragPtr);
2421     for(; !fragPtr.isNull();){
2422       jam();
2423       Ptr<FragmentSendInfo> copyPtr = fragPtr;
2424       c_segmentedFragmentSendList.next(fragPtr);
2425 
2426       sendNextSegmentedFragment(signal, * copyPtr.p);
2427       if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2428         jam();
2429         if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2430           jam();
2431           execute(signal, copyPtr.p->m_callback, 0);
2432         }//if
2433         c_segmentedFragmentSendList.release(copyPtr);
2434       }
2435     }
2436 
2437     c_linearFragmentSendList.first(fragPtr);
2438     for(; !fragPtr.isNull();){
2439       jam();
2440       Ptr<FragmentSendInfo> copyPtr = fragPtr;
2441       c_linearFragmentSendList.next(fragPtr);
2442 
2443       sendNextLinearFragment(signal, * copyPtr.p);
2444       if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2445         jam();
2446         if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2447           jam();
2448           execute(signal, copyPtr.p->m_callback, 0);
2449         }//if
2450         c_linearFragmentSendList.release(copyPtr);
2451       }
2452     }
2453 
2454     if(c_segmentedFragmentSendList.isEmpty() &&
2455        c_linearFragmentSendList.isEmpty()){
2456       jam();
2457       c_fragSenderRunning = false;
2458       return;
2459     }
2460 
2461     sig->type = ContinueFragmented::CONTINUE_SENDING;
2462     sig->line = __LINE__;
2463     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
2464     break;
2465   }
2466   case ContinueFragmented::CONTINUE_CLEANUP:
2467   {
2468     jam();
2469 
2470     const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2471     /* Check length of signal */
2472     ndbassert(signal->getLength() ==
2473               ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2474               callbackWords);
2475 
2476     Callback cb;
2477     memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2);
2478 
2479     doNodeFailureCleanup(signal,
2480                          sig->cleanup.failedNodeId,
2481                          sig->cleanup.resource,
2482                          sig->cleanup.cursor,
2483                          sig->cleanup.elementsCleaned,
2484                          cb);
2485     break;
2486   }
2487   default:
2488     ndbabort();
2489   }
2490 }
2491 
2492 void
execSTOP_FOR_CRASH(Signal * signal)2493 SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal)
2494 {
2495 #ifdef NDBD_MULTITHREADED
2496   mt_execSTOP_FOR_CRASH();
2497 #endif
2498 }
2499 
2500 void
execNODE_START_REP(Signal * signal)2501 SimulatedBlock::execNODE_START_REP(Signal* signal)
2502 {
2503 }
2504 
2505 void
execAPI_START_REP(Signal * signal)2506 SimulatedBlock::execAPI_START_REP(Signal* signal)
2507 {
2508 }
2509 
2510 void
execSEND_PACKED(Signal * signal)2511 SimulatedBlock::execSEND_PACKED(Signal* signal)
2512 {
2513 }
2514 
2515 ATTRIBUTE_NOINLINE
2516 void
handle_execute_error(GlobalSignalNumber gsn)2517 SimulatedBlock::handle_execute_error(GlobalSignalNumber gsn)
2518 {
2519   /**
2520    * This method only called if an error has occurred
2521    */
2522   char errorMsg[255];
2523   if (!(gsn <= MAX_GSN)) {
2524     BaseString::snprintf(errorMsg, 255, "Illegal signal received (GSN %d too high)", gsn);
2525     ERROR_SET(fatal, NDBD_EXIT_PRGERR, errorMsg, errorMsg);
2526   }
2527   if (!(theExecArray[gsn] != 0)) {
2528     BaseString::snprintf(errorMsg, 255, "Illegal signal received (GSN %d not added)", gsn);
2529     ERROR_SET(fatal, NDBD_EXIT_PRGERR, errorMsg, errorMsg);
2530   }
2531   ndbabort();
2532 }
2533 
2534 // MT LQH callback CONF via signal
2535 
2536 const SimulatedBlock::CallbackEntry&
getCallbackEntry(Uint32 ci)2537 SimulatedBlock::getCallbackEntry(Uint32 ci)
2538 {
2539   ndbrequire(m_callbackTableAddr != 0);
2540   const CallbackTable& ct = *m_callbackTableAddr;
2541   ndbrequire(ci < ct.m_count);
2542   return ct.m_entry[ci];
2543 }
2544 
2545 void
sendCallbackConf(Signal * signal,Uint32 fullBlockNo,CallbackPtr & cptr,Uint32 senderData,Uint32 callbackInfo,Uint32 returnCode)2546 SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
2547                                  CallbackPtr& cptr,
2548                                  Uint32 senderData, Uint32 callbackInfo,
2549                                  Uint32 returnCode)
2550 {
2551   Uint32 blockNo = blockToMain(fullBlockNo);
2552   Uint32 instanceNo = blockToInstance(fullBlockNo);
2553   SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo);
2554   ndbrequire(b != 0);
2555 
2556   const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
2557 
2558   if (!isNdbMtLqh()) {
2559     Callback c;
2560     c.m_callbackFunction = ce.m_function;
2561     c.m_callbackData = cptr.m_callbackData;
2562     b->execute(signal, c, returnCode);
2563 
2564     if (ce.m_flags & CALLBACK_ACK) {
2565       jam();
2566       CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2567       ack->senderData = senderData;
2568       ack->callbackInfo = callbackInfo;
2569       EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK,
2570                      signal, CallbackAck::SignalLength);
2571     }
2572   } else {
2573     CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend();
2574     conf->senderData = senderData;
2575     conf->senderRef = reference();
2576     conf->callbackIndex = cptr.m_callbackIndex;
2577     conf->callbackData = cptr.m_callbackData;
2578     conf->callbackInfo = callbackInfo;
2579     conf->returnCode = returnCode;
2580 
2581     if (ce.m_flags & CALLBACK_DIRECT) {
2582       jam();
2583       EXECUTE_DIRECT_MT(blockNo, GSN_CALLBACK_CONF,
2584                         signal, CallbackConf::SignalLength, instanceNo);
2585     } else {
2586       jam();
2587       BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
2588       sendSignal(ref, GSN_CALLBACK_CONF,
2589                  signal, CallbackConf::SignalLength, JBB);
2590     }
2591   }
2592   cptr.m_callbackIndex = ZNIL;
2593 }
2594 
2595 void
execCALLBACK_CONF(Signal * signal)2596 SimulatedBlock::execCALLBACK_CONF(Signal* signal)
2597 {
2598   const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr();
2599 
2600   Uint32 senderData = conf->senderData;
2601   Uint32 senderRef = conf->senderRef;
2602   Uint32 callbackIndex = conf->callbackIndex;
2603   Uint32 callbackData = conf->callbackData;
2604   Uint32 callbackInfo = conf->callbackInfo;
2605   Uint32 returnCode = conf->returnCode;
2606 
2607   ndbrequire(m_callbackTableAddr != 0);
2608   const CallbackEntry& ce = getCallbackEntry(callbackIndex);
2609   CallbackFunction function = ce.m_function;
2610 
2611   Callback callback;
2612   callback.m_callbackFunction = function;
2613   callback.m_callbackData = callbackData;
2614   execute(signal, callback, returnCode);
2615 
2616   if (ce.m_flags & CALLBACK_ACK) {
2617     jam();
2618     CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2619     ack->senderData = senderData;
2620     ack->callbackInfo = callbackInfo;
2621     sendSignal(senderRef, GSN_CALLBACK_ACK,
2622                signal, CallbackAck::SignalLength, JBB);
2623   }
2624 }
2625 
2626 #ifdef VM_TRACE_TIME
2627 void
clearTimes()2628 SimulatedBlock::clearTimes() {
2629   for(Uint32 i = 0; i <= MAX_GSN; i++){
2630     m_timeTrace[i].cnt = 0;
2631     m_timeTrace[i].sum = 0;
2632     m_timeTrace[i].sub = 0;
2633   }
2634 }
2635 
2636 void
printTimes(FILE * output)2637 SimulatedBlock::printTimes(FILE * output){
2638   fprintf(output, "-- %s --\n", getBlockName(number()));
2639   Uint64 sum = 0;
2640   for(Uint32 i = 0; i <= MAX_GSN; i++){
2641     Uint32 n = m_timeTrace[i].cnt;
2642     if(n != 0){
2643       double dn = n;
2644 
2645       double avg = m_timeTrace[i].sum;
2646       double avg2 = avg - m_timeTrace[i].sub;
2647 
2648       avg /= dn;
2649       avg2 /= dn;
2650 
2651       fprintf(output,
2652 	      //name ; cnt ; loc ; acc
2653 	      "%s ; #%d ; %dus ; %dus ; %dms\n",
2654 	      getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
2655 	      (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
2656 
2657       sum += (m_timeTrace[i].sum - m_timeTrace[i].sub);
2658     }
2659   }
2660   sum = (sum + 500)/ 1000;
2661   fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
2662   fprintf(output, "\n");
2663   fflush(output);
2664 }
2665 
2666 #endif
2667 
FragmentInfo(Uint32 fragId,Uint32 sender)2668 SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
2669   m_fragmentId = fragId;
2670   m_senderRef = sender;
2671   m_sectionPtrI[0] = RNIL;
2672   m_sectionPtrI[1] = RNIL;
2673   m_sectionPtrI[2] = RNIL;
2674 }
2675 
FragmentSendInfo()2676 SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
2677 {
2678 }
2679 
2680 bool
assembleFragments(Signal * signal)2681 SimulatedBlock::assembleFragments(Signal * signal){
2682   Uint32 sigLen = signal->length() - 1;
2683   Uint32 fragId = signal->theData[sigLen];
2684   Uint32 fragInfo = signal->header.m_fragmentInfo;
2685   Uint32 senderRef = signal->getSendersBlockRef();
2686 
2687   Uint32 *sectionPtr = signal->m_sectionPtrI;
2688 
2689   if(fragInfo == 0){
2690     return true;
2691   }
2692 
2693   const Uint32 secs = signal->header.m_noOfSections;
2694   const Uint32 * const secNos = &signal->theData[sigLen - secs];
2695 
2696   if(fragInfo == 1){
2697     /**
2698      * First in train
2699      */
2700     Ptr<FragmentInfo> fragPtr;
2701     if(!c_fragmentInfoHash.seize(fragPtr)){
2702       ndbabort();
2703       return false;
2704     }
2705 
2706     new (fragPtr.p)FragmentInfo(fragId, senderRef);
2707     c_fragmentInfoHash.add(fragPtr);
2708 
2709     for(Uint32 i = 0; i<secs; i++){
2710       Uint32 sectionNo = secNos[i];
2711       ndbassert(sectionNo < 3);
2712       fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i];
2713     }
2714 
2715     ndbassert(! fragPtr.p->isDropped() );
2716 
2717     /**
2718      * Don't release allocated segments
2719      */
2720     signal->header.m_fragmentInfo = 0;
2721     signal->header.m_noOfSections = 0;
2722     return false;
2723   }
2724 
2725   FragmentInfo key(fragId, senderRef);
2726   Ptr<FragmentInfo> fragPtr;
2727   if(c_fragmentInfoHash.find(fragPtr, key)){
2728 
2729     /**
2730      * FragInfo == 2 or 3
2731      */
2732     if ( likely(! fragPtr.p->isDropped()) )
2733     {
2734       Uint32 i;
2735       for(i = 0; i<secs; i++){
2736         Uint32 sectionNo = secNos[i];
2737         ndbassert(sectionNo < 3);
2738         Uint32 sectionPtrI = sectionPtr[i];
2739         if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
2740           linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
2741         } else {
2742           fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
2743         }
2744       }
2745 
2746       /**
2747        * fragInfo = 2
2748        */
2749       if(fragInfo == 2){
2750         signal->header.m_fragmentInfo = 0;
2751         signal->header.m_noOfSections = 0;
2752         return false;
2753       }
2754 
2755       /**
2756        * fragInfo = 3
2757        */
2758       for(i = 0; i<3; i++){
2759         Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
2760         if(ptrI != RNIL){
2761           signal->m_sectionPtrI[i] = ptrI;
2762         } else {
2763           break;
2764         }
2765       }
2766 
2767       signal->setLength(sigLen - secs);
2768       signal->header.m_noOfSections = i;
2769       signal->header.m_fragmentInfo = 0;
2770 
2771       c_fragmentInfoHash.release(fragPtr);
2772       return true;
2773     }
2774     else
2775     {
2776       /* This fragmented signal has already had at least 1 fragment
2777        * dropped.  We must release the received segments.
2778        */
2779       for (Uint32 i=0; i < secs; i++)
2780         releaseSection( sectionPtr[i] );
2781 
2782       signal->header.m_fragmentInfo = 0;
2783       signal->header.m_noOfSections = 0;
2784 
2785       /* FragInfo == 2
2786        * More fragments to come, keep waiting
2787        */
2788       if (fragInfo == 2)
2789         return false;
2790 
2791       /* FragInfo == 3
2792        * That was the last fragment.
2793        * We're now ready for handling the dropped signal.
2794        */
2795       SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData;
2796       Uint32 gsn = signal->header.theVerId_signalNumber;
2797       Uint32 len = signal->header.theLength;
2798       Uint32 newLen= (len > 22 ? 22 : len);
2799       memmove(rep->originalData, signal->theData, (4 * newLen));
2800       rep->originalGsn = gsn;
2801       rep->originalLength = len;
2802       rep->originalSectionCount = 0;
2803       signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
2804       signal->header.theLength = newLen + 3;
2805       signal->header.m_noOfSections = 0;
2806       signal->header.m_fragmentInfo = 3;
2807 
2808 
2809       /**
2810        * NOTE: Don't use EXECUTE_DIRECT as it
2811        *       sets sendersBlockRef to reference()
2812        */
2813       /* Perform dropped signal handling, in this thread, now */
2814       jamBuffer()->markEndOfSigExec();
2815       executeFunction(GSN_SIGNAL_DROPPED_REP, signal);
2816 
2817       /* return false to caller - they should not process the signal */
2818       return false;
2819     } // else (isDropped())
2820   }
2821 
2822   /**
2823    * Unable to find fragment
2824    */
2825   ndbabort();
2826   return false;
2827 }
2828 
2829 bool
assembleDroppedFragments(Signal * signal)2830 SimulatedBlock::assembleDroppedFragments(Signal* signal)
2831 {
2832   /* This method is called at the start of a  SIGNAL_DROPPED_REP
2833    * handler when there is a chance that the dropped signal could
2834    * be part of a fragmented signal.
2835    * If the dropped signal was a fragmented signal, this
2836    * needs to be handled specially to ensure that fragments
2837    * of the signal are correctly dropped to avoid segment
2838    * leaks etc.
2839    * There are a number of cases :
2840    *   1) First fragment dropped  (FragInfo=1)
2841    *      All remaining fragments must be dropped when they
2842    *      arrive.  The Signal dropped report handler must be
2843    *      executed when the last fragment has arrived.
2844    *   2) Middle fragment dropped  (FragInfo=2)
2845    *      Any existing stored segments must be released.
2846    *      All remaining fragments must be dropped when they
2847    *      arrive.
2848    *   3) Last fragment dropped  (FragInfo=3)
2849    *      Any existing stored segments must be released.
2850    *      Signal Dropped handling can occur, so return true.
2851    *
2852    * To indicate that a fragment has been dropped for a signal,
2853    * all the section I Values in the fragment's hash entry are
2854    * set to RNIL.
2855    * Signal Dropped Report handling is performed when the last
2856    * fragment arrives.  If the last fragment is not dropped
2857    * by the transporter layer then normal fragment assembly
2858    * arranges for dropped signal handling to occur.
2859    */
2860   Uint32 sigLen = signal->length() - 1;
2861   Uint32 fragId = signal->theData[sigLen];
2862   Uint32 fragInfo = signal->header.m_fragmentInfo;
2863   Uint32 senderRef = signal->getSendersBlockRef();
2864 
2865   if(fragInfo == 0){
2866     return true;
2867   }
2868 
2869   /* This method is for handling SIGNAL_DROPPED_REP only */
2870   ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
2871   ndbrequire(signal->header.m_noOfSections == 0);
2872 
2873   if(fragInfo == 1){
2874     /**
2875      * First in train
2876      */
2877     Ptr<FragmentInfo> fragPtr;
2878     if(!c_fragmentInfoHash.seize(fragPtr)){
2879       ndbabort();
2880       return false;
2881     }
2882 
2883     new (fragPtr.p)FragmentInfo(fragId, senderRef);
2884     c_fragmentInfoHash.add(fragPtr);
2885 
2886     /* Mark entry in hash as belonging to dropped signal so subsequent
2887      * fragments can also be dropped
2888      */
2889     fragPtr.p->m_sectionPtrI[0]= RNIL;
2890     fragPtr.p->m_sectionPtrI[1]= RNIL;
2891     fragPtr.p->m_sectionPtrI[2]= RNIL;
2892 
2893     /* Wait for last fragment before SignalDroppedRep handling */
2894     signal->header.m_fragmentInfo = 0;
2895     return false;
2896   }
2897 
2898   FragmentInfo key(fragId, senderRef);
2899   Ptr<FragmentInfo> fragPtr;
2900   if(c_fragmentInfoHash.find(fragPtr, key)){
2901 
2902     /**
2903      * FragInfo == 2 or 3
2904      */
2905     if (! fragPtr.p->isDropped() )
2906     {
2907       /* Fragmented Signal not already marked as dropped
2908        * Need to free stored segments
2909        */
2910       releaseSection(fragPtr.p->m_sectionPtrI[0]);
2911       releaseSection(fragPtr.p->m_sectionPtrI[1]);
2912       releaseSection(fragPtr.p->m_sectionPtrI[2]);
2913 
2914       /* Mark as dropped now */
2915       fragPtr.p->m_sectionPtrI[0]= RNIL;
2916       fragPtr.p->m_sectionPtrI[1]= RNIL;
2917       fragPtr.p->m_sectionPtrI[2]= RNIL;
2918 
2919       ndbassert( fragPtr.p->isDropped() );
2920     }
2921 
2922     /**
2923      * fragInfo = 2
2924      *   Still waiting for final fragments.
2925      *   Return false to caller.
2926      */
2927     if(fragInfo == 2){
2928       signal->header.m_fragmentInfo = 0;
2929       return false;
2930     }
2931 
2932     /**
2933      * fragInfo = 3
2934      *   All fragments received, remove entry
2935      *   from hash and return to caller for
2936      *   dropped signal handling.
2937      */
2938     signal->header.m_fragmentInfo = 0;
2939 
2940     c_fragmentInfoHash.release(fragPtr);
2941     return true;
2942   }
2943 
2944   /**
2945    * Unable to find fragment
2946    */
2947   ndbabort();
2948   return false;
2949 }
2950 
2951 /**
2952  * doCleanupFragInfo
2953  * Iterate over block's Fragment assembly hash, looking
2954  * for in-assembly fragments from the failed node
2955  * Release these
2956  * Returns after each scanned bucket to avoid consuming
2957  * too much time.
2958  *
2959  * Parameters
2960  *   failedNodeId    : Node id of failed node
2961  *   cursor          : Hash bucket to start iteration from
2962  *   rtUnitsUsed     : Total rt units used
2963  *   elementsCleaned : Number of elements cleaned
2964  *
2965  * Updates
2966  *   cursor          : Hash bucket to continue iteration from
2967  *   rtUnitsUsed     : += units used
2968  *   elementsCleaned : += elements cleaned
2969  *
2970  * Returns
2971  *   true  if all FragInfo structs cleaned up
2972  *   false if more to do
2973  */
2974 bool
doCleanupFragInfo(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2975 SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
2976                                   Uint32& cursor,
2977                                   Uint32& rtUnitsUsed,
2978                                   Uint32& elementsCleaned)
2979 {
2980   jam();
2981   FragmentInfo_hash::Iterator iter;
2982 
2983   c_fragmentInfoHash.next(cursor, iter);
2984 
2985   const Uint32 startBucket = iter.bucket;
2986 
2987   while (!iter.isNull() &&
2988          (iter.bucket == startBucket))
2989   {
2990     jam();
2991 
2992     Ptr<FragmentInfo> curr = iter.curr;
2993     c_fragmentInfoHash.next(iter);
2994 
2995     FragmentInfo* fragInfo = curr.p;
2996 
2997     if (refToNode(fragInfo->m_senderRef) == failedNodeId)
2998     {
2999       jam();
3000       /* We were assembling a fragmented signal from the
3001        * failed node, discard the partially assembled
3002        * sections and free the FragmentInfo hash entry
3003        */
3004       for(Uint32 s = 0; s<3; s++)
3005       {
3006         if (fragInfo->m_sectionPtrI[s] != RNIL)
3007         {
3008           jam();
3009           SegmentedSectionPtr ssptr;
3010           getSection(ssptr, fragInfo->m_sectionPtrI[s]);
3011           release(ssptr);
3012         }
3013       }
3014 
3015       /* Release FragmentInfo hash element */
3016       c_fragmentInfoHash.release(curr);
3017 
3018       elementsCleaned++;
3019       rtUnitsUsed+=3;
3020     }
3021 
3022     rtUnitsUsed++;
3023   } // while
3024 
3025   cursor = iter.bucket;
3026   return iter.isNull();
3027 }
3028 
3029 bool
doCleanupFragSend(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)3030 SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
3031                                   Uint32& cursor,
3032                                   Uint32& rtUnitsUsed,
3033                                   Uint32& elementsCleaned)
3034 {
3035   jam();
3036 
3037   Ptr<FragmentSendInfo> fragPtr;
3038   const Uint32 NumSendLists = 2;
3039   ndbrequire(cursor < NumSendLists);
3040 
3041   FragmentSendInfo_list* fragSendLists[ NumSendLists ] =
3042     { &c_segmentedFragmentSendList,
3043       &c_linearFragmentSendList };
3044 
3045   FragmentSendInfo_list* list = fragSendLists[ cursor ];
3046 
3047   list->first(fragPtr);
3048   for(; !fragPtr.isNull();){
3049     jam();
3050     Ptr<FragmentSendInfo> copyPtr = fragPtr;
3051     list->next(fragPtr);
3052     rtUnitsUsed++;
3053 
3054     NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
3055 
3056     if (rg.m_nodes.get(failedNodeId))
3057     {
3058       jam();
3059       /* Fragmented signal is being sent to node */
3060       rg.m_nodes.clear(failedNodeId);
3061 
3062       if (rg.m_nodes.isclear())
3063       {
3064         jam();
3065         /* No other nodes in receiver group - send
3066          * is cancelled
3067          * Will be cleaned up in the usual CONTINUE_FRAGMENTED
3068          * handling code.
3069          */
3070         copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
3071       }
3072       elementsCleaned++;
3073     }
3074   }
3075 
3076   /* Next time we'll do the next list */
3077   cursor++;
3078 
3079   return (cursor == NumSendLists);
3080 }
3081 
3082 
3083 Uint32
doNodeFailureCleanup(Signal * signal,Uint32 failedNodeId,Uint32 resource,Uint32 cursor,Uint32 elementsCleaned,Callback & cb)3084 SimulatedBlock::doNodeFailureCleanup(Signal* signal,
3085                                      Uint32 failedNodeId,
3086                                      Uint32 resource,
3087                                      Uint32 cursor,
3088                                      Uint32 elementsCleaned,
3089                                      Callback& cb)
3090 {
3091   jam();
3092   const bool userCallback = (cb.m_callbackFunction != 0);
3093   const Uint32 maxRtUnits = userCallback ?
3094 #ifdef VM_TRACE
3095     2 :
3096 #else
3097     16 :
3098 #endif
3099     ~0; /* Must complete all processing in this call */
3100 
3101   Uint32 rtUnitsUsed = 0;
3102 
3103   /* Loop over resources, cleaning them up */
3104   do
3105   {
3106     bool resourceDone= false;
3107     switch(resource) {
3108     case ContinueFragmented::RES_FRAGSEND:
3109     {
3110       jam();
3111       resourceDone = doCleanupFragSend(failedNodeId, cursor,
3112                                        rtUnitsUsed, elementsCleaned);
3113       break;
3114     }
3115     case ContinueFragmented::RES_FRAGINFO:
3116     {
3117       jam();
3118       resourceDone = doCleanupFragInfo(failedNodeId, cursor,
3119                                        rtUnitsUsed, elementsCleaned);
3120       break;
3121     }
3122     case ContinueFragmented::RES_LAST:
3123     {
3124       jam();
3125       /* Node failure processing complete, execute user callback if provided */
3126       if (userCallback)
3127         execute(signal, cb, elementsCleaned);
3128 
3129       return elementsCleaned;
3130     }
3131     default:
3132       ndbabort();
3133     }
3134 
3135     /* Did we complete cleaning up this resource? */
3136     if (resourceDone)
3137     {
3138       resource++;
3139       cursor= 0;
3140     }
3141 
3142   } while (rtUnitsUsed <= maxRtUnits);
3143 
3144   jam();
3145 
3146   /* Not yet completed failure handling.
3147    * Must have exhausted RT units.
3148    * Update cursor and re-invoke
3149    */
3150   ndbassert(userCallback);
3151 
3152   /* Send signal to continue processing */
3153 
3154   ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3155   sig->type = ContinueFragmented::CONTINUE_CLEANUP;
3156   sig->cleanup.failedNodeId = failedNodeId;
3157   sig->cleanup.resource = resource;
3158   sig->cleanup.cursor = cursor;
3159   sig->cleanup.elementsCleaned= elementsCleaned;
3160   Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
3161   Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
3162     callbackWords;
3163   ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
3164   memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2);
3165 
3166   sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
3167 
3168   return elementsCleaned;
3169 }
3170 
3171 Uint32
simBlockNodeFailure(Signal * signal,Uint32 failedNodeId,Callback & cb)3172 SimulatedBlock::simBlockNodeFailure(Signal* signal,
3173                                     Uint32 failedNodeId,
3174                                     Callback& cb)
3175 {
3176   jam();
3177   return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
3178 }
3179 
3180 Uint32
debugPrintFragmentCounts()3181 SimulatedBlock::debugPrintFragmentCounts()
3182 {
3183   const char* blockName = getBlockName(theNumber);
3184   FragmentInfo_hash::Iterator iter;
3185   Uint32 fragmentInfoCount = 0;
3186   c_fragmentInfoHash.first(iter);
3187 
3188   while(!iter.isNull())
3189   {
3190     fragmentInfoCount++;
3191     c_fragmentInfoHash.next(iter);
3192   }
3193 
3194   Ptr<FragmentSendInfo> ptr;
3195   Uint32 linSendInfoCount = 0;
3196 
3197   c_linearFragmentSendList.first(ptr);
3198 
3199   while (!ptr.isNull())
3200   {
3201     linSendInfoCount++;
3202     c_linearFragmentSendList.next(ptr);
3203   }
3204 
3205   Uint32 segSendInfoCount = 0;
3206   c_segmentedFragmentSendList.first(ptr);
3207 
3208   while (!ptr.isNull())
3209   {
3210     segSendInfoCount++;
3211     c_segmentedFragmentSendList.next(ptr);
3212   }
3213 
3214   ndbout_c("%s : Fragment assembly hash entry count : %d",
3215            blockName,
3216            fragmentInfoCount);
3217 
3218   ndbout_c("%s : Linear fragment send list size : %d",
3219            blockName,
3220            linSendInfoCount);
3221 
3222   ndbout_c("%s : Segmented fragment send list size : %d",
3223            blockName,
3224            segSendInfoCount);
3225 
3226   return fragmentInfoCount +
3227     linSendInfoCount +
3228     segSendInfoCount;
3229 }
3230 
3231 
3232 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Uint32 messageSize)3233 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
3234 				  NodeReceiverGroup rg,
3235 				  GlobalSignalNumber gsn,
3236 				  Signal* signal,
3237 				  Uint32 length,
3238 				  JobBufferLevel jbuf,
3239 				  SectionHandle* sections,
3240                                   bool noRelease,
3241                                   Uint32 messageSize) {
3242 
3243   Uint32 noSections = sections->m_cnt;
3244   SegmentedSectionPtr * ptr = sections->m_ptr;
3245 
3246   info.m_sectionPtr[0].m_segmented.i = RNIL;
3247   info.m_sectionPtr[1].m_segmented.i = RNIL;
3248   info.m_sectionPtr[2].m_segmented.i = RNIL;
3249 
3250   Uint32 totalSize = 0;
3251   switch(noSections){
3252   case 3:
3253     info.m_sectionPtr[2].m_segmented.i = ptr[2].i;
3254     info.m_sectionPtr[2].m_segmented.p = ptr[2].p;
3255     totalSize += ptr[2].sz;
3256     // Fall through
3257   case 2:
3258     info.m_sectionPtr[1].m_segmented.i = ptr[1].i;
3259     info.m_sectionPtr[1].m_segmented.p = ptr[1].p;
3260     totalSize += ptr[1].sz;
3261     // Fall through
3262   case 1:
3263     info.m_sectionPtr[0].m_segmented.i = ptr[0].i;
3264     info.m_sectionPtr[0].m_segmented.p = ptr[0].p;
3265     totalSize += ptr[0].sz;
3266   }
3267 
3268   if(totalSize <= messageSize + SectionSegment::DataLength){
3269     /**
3270      * Send signal directly
3271      */
3272     if (noRelease)
3273       sendSignalNoRelease(rg, gsn, signal, length, jbuf, sections);
3274     else
3275       sendSignal(rg, gsn, signal, length, jbuf, sections);
3276 
3277     info.m_status = FragmentSendInfo::SendComplete;
3278     return true;
3279   }
3280 
3281   /**
3282    * Setup info object
3283    */
3284   info.m_status = FragmentSendInfo::SendNotComplete;
3285   info.m_prio = (Uint8)jbuf;
3286   info.m_gsn = gsn;
3287   info.m_fragInfo = 1;
3288   info.m_flags = 0;
3289   info.m_messageSize = messageSize;
3290   info.m_fragmentId = c_fragmentIdCounter++;
3291   info.m_nodeReceiverGroup = rg;
3292   info.m_callback.m_callbackFunction = 0;
3293 
3294   if (noRelease)
3295   {
3296     /* Record info that we are not releasing segments */
3297     info.m_flags|= FragmentSendInfo::SendNoReleaseSeg;
3298   }
3299   else
3300   {
3301     /**
3302      * Clear sections in caller's handle.  Actual send
3303      * will consume them
3304      */
3305     sections->m_cnt = 0;
3306   }
3307 
3308   /* Store main signal data in a segment for sending later */
3309   Ptr<SectionSegment> tmp;
3310   if(!import(tmp, &signal->theData[0], length))
3311   {
3312     handle_out_of_longsignal_memory(0);
3313     return false;
3314   }
3315   info.m_theDataSection.p = &tmp.p->theData[0];
3316   info.m_theDataSection.sz = length;
3317   tmp.p->theData[length] = tmp.i;
3318 
3319   sendNextSegmentedFragment(signal, info);
3320 
3321   if(c_fragmentIdCounter == 0){
3322     /**
3323      * Fragment id 0 is invalid
3324      */
3325     c_fragmentIdCounter = 1;
3326   }
3327 
3328   return true;
3329 }
3330 
3331 #if 0
3332 #define lsout(x) x
3333 #else
3334 #define lsout(x)
3335 #endif
3336 
3337 void
sendNextSegmentedFragment(Signal * signal,FragmentSendInfo & info)3338 SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
3339 					  FragmentSendInfo & info){
3340 
3341   if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3342   {
3343     /* Send was cancelled - all dest. nodes have failed
3344      * since send was started
3345      */
3346     if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg))
3347     {
3348       /*
3349        * Free any sections still to be sent
3350        */
3351       SectionHandle handle(this);
3352       for (Uint32 s = 0; s < 3; s++)
3353       {
3354         Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
3355         if (sectionI != RNIL)
3356         {
3357           getSection(handle.m_ptr[handle.m_cnt], sectionI);
3358           info.m_sectionPtr[s].m_segmented.i = RNIL;
3359           info.m_sectionPtr[s].m_segmented.p = NULL;
3360           handle.m_cnt++;
3361         }
3362       }
3363 
3364       releaseSections(handle);
3365     }
3366 
3367     /* Free inline signal data storage section */
3368     Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3369     g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3370 
3371     info.m_status = FragmentSendInfo::SendComplete;
3372     return;
3373   }
3374 
3375   /**
3376    * Setup main signal data from stored copy
3377    */
3378   const Uint32 sigLen = info.m_theDataSection.sz;
3379   memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3380 
3381   Uint32 sz = 0;
3382   Uint32 maxSz = info.m_messageSize;
3383 
3384   Int32 secNo = 2;
3385   Uint32 secCount = 0;
3386   Uint32 * secNos = &signal->theData[sigLen];
3387 
3388   SectionHandle sections(this);
3389   SegmentedSectionPtr *ptr = sections.m_ptr;
3390 
3391   bool split= false;
3392   Uint32 splitSectionStartI= RNIL;
3393   SectionSegment* splitSectionStartP= NULL;
3394   Uint32 splitSectionLastSegment= RNIL;
3395   Uint32 splitSectionSz= 0;
3396 
3397   enum { Unknown = 0, Full = 1 } loop = Unknown;
3398   for(; secNo >= 0 && secCount < 3; secNo--){
3399     Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
3400     if(ptrI == RNIL)
3401       continue;
3402 
3403     info.m_sectionPtr[secNo].m_segmented.i = RNIL;
3404 
3405     SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p;
3406     const Uint32 size = ptrP->m_sz;
3407 
3408     ptr[secCount].i = ptrI;
3409     ptr[secCount].p = ptrP;
3410     ptr[secCount].sz = size;
3411     secNos[secCount] = secNo;
3412     secCount++;
3413 
3414     const Uint32 sizeLeft = maxSz - sz;
3415     if(size <= sizeLeft){
3416       /**
3417        * The section fits
3418        */
3419       sz += size;
3420       lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3421       continue;
3422     }
3423 
3424     const Uint32 overflow = size - sizeLeft; // > 0
3425     if(overflow <= SectionSegment::DataLength){
3426       /**
3427        * Only one segment left to send
3428        *   send even if sizeLeft <= size
3429        */
3430       lsout(ndbout_c("section %d saved as %d but full over: %d",
3431 		     secNo, secCount-1, overflow));
3432       secNo--;
3433       break;
3434     }
3435 
3436     // size >= 61
3437     if(sizeLeft < SectionSegment::DataLength){
3438       /**
3439        * Less than one segment left (space)
3440        *   dont bother sending
3441        */
3442       secCount--;
3443       info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3444       loop = Full;
3445       lsout(ndbout_c("section %d not saved", secNo));
3446       break;
3447     }
3448 
3449     /**
3450      * Split list
3451      * 1) Find place to split
3452      * 2) Rewrite header (the part that will be sent)
3453      * 3) Write new header (for remaining part)
3454      * 4) Store new header on FragmentSendInfo - record
3455      */
3456     // size >= 61 && sizeLeft >= 60
3457     Uint32 sum = SectionSegment::DataLength;
3458     Uint32 prevPtrI = ptrI;
3459     ptrI = ptrP->m_nextSegment;
3460     const Uint32 fill = sizeLeft - SectionSegment::DataLength;
3461     while (sum <= fill)
3462     {
3463       prevPtrI = ptrI;
3464       ptrP = g_sectionSegmentPool.getPtr(ptrI);
3465       ptrI = ptrP->m_nextSegment;
3466       sum += SectionSegment::DataLength;
3467     }
3468 
3469     Uint32 prev = secCount - 1;
3470     /**
3471      * Record details of the section pre-split
3472      * This allows the split to be 'healed' afterwards in the
3473      * no release case.
3474      */
3475     split= true;
3476     splitSectionStartI= ptr[prev].i;
3477     splitSectionStartP= ptr[prev].p;
3478     splitSectionLastSegment= splitSectionStartP->m_lastSegment;
3479     splitSectionSz= splitSectionStartP->m_sz;
3480 
3481     /**
3482      * Rewrite header w.r.t size and last
3483      * This is what will be sent in this fragment.
3484      */
3485     splitSectionStartP->m_lastSegment = prevPtrI;
3486     splitSectionStartP->m_sz = sum;
3487     ptr[prev].sz = sum;
3488 
3489     /**
3490      * Write "new" list header
3491      * This is what remains to be sent in this section
3492      */
3493     ptrP = g_sectionSegmentPool.getPtr(ptrI);
3494     ptrP->m_lastSegment = splitSectionLastSegment;
3495     ptrP->m_sz = size - sum;
3496 
3497     /**
3498      * And store it on info-record
3499      */
3500     info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3501     info.m_sectionPtr[secNo].m_segmented.p = ptrP;
3502 
3503     loop = Full;
3504     lsout(ndbout_c("section %d split into %d", secNo, prev));
3505     break;
3506   }
3507 
3508   lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3509 		 loop, secNo, secCount, sz));
3510 
3511   /**
3512    * Store fragment id
3513    */
3514   secNos[secCount] = info.m_fragmentId;
3515 
3516   Uint32 fragInfo = info.m_fragInfo;
3517   info.m_fragInfo = 2;
3518   switch(loop){
3519   case Unknown:
3520     if(secNo >= 0){
3521       lsout(ndbout_c("Unknown - Full"));
3522       /**
3523        * Not finished
3524        */
3525       break;
3526     }
3527     // Fall through
3528     lsout(ndbout_c("Unknown - Done"));
3529     info.m_status = FragmentSendInfo::SendComplete;
3530     ndbassert(fragInfo == 2);
3531     fragInfo = 3;
3532   case Full:
3533     break;
3534   }
3535 
3536   signal->header.m_fragmentInfo = fragInfo;
3537   signal->header.m_noOfSections = 0;
3538   sections.m_cnt = secCount;
3539 
3540   if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)
3541   {
3542     sendSignalNoRelease(info.m_nodeReceiverGroup,
3543                         info.m_gsn,
3544                         signal,
3545                         sigLen + secCount + 1,
3546                         (JobBufferLevel)info.m_prio,
3547                         &sections);
3548     /* NoRelease leaves SectionHandle populated, we'll
3549      * clear it here.  The actual sections themselves
3550      * remain allocated.
3551      */
3552     sections.m_cnt = 0;
3553 
3554     if (split)
3555     {
3556       /* There was a split section, which required us to modify the
3557        * segment list.
3558        * Now restore the split section's segment list back to
3559        * its previous state
3560        * (Only really required for first segment, but we do
3561        *  it for all of them, to be a good citizen)
3562        */
3563       ndbrequire( splitSectionStartI != RNIL );
3564       ndbrequire( splitSectionStartP != NULL );
3565       ndbrequire( splitSectionLastSegment != RNIL );
3566 
3567       splitSectionStartP->m_lastSegment= splitSectionLastSegment;
3568       splitSectionStartP->m_sz= splitSectionSz;
3569 
3570       /* Check our handiwork */
3571       assert(verifySection(splitSectionStartI));
3572     }
3573   }
3574   else
3575   {
3576     /* Normal, release sections case */
3577     sendSignal(info.m_nodeReceiverGroup,
3578                info.m_gsn,
3579                signal,
3580                sigLen + secCount + 1,
3581                (JobBufferLevel)info.m_prio,
3582                &sections);
3583   }
3584 
3585   if(fragInfo == 3){
3586     /**
3587      * This is the last signal
3588      * Release saved 'main signal' words segment
3589      */
3590     g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3591   }
3592 }
3593 
3594 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Uint32 messageSize)3595 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
3596 				  NodeReceiverGroup rg,
3597 				  GlobalSignalNumber gsn,
3598 				  Signal* signal,
3599 				  Uint32 length,
3600 				  JobBufferLevel jbuf,
3601 				  LinearSectionPtr ptr[3],
3602 				  Uint32 noOfSections,
3603 				  Uint32 messageSize){
3604 
3605 ndbrequire(signal->header.m_noOfSections == 0);
3606   check_sections(signal, signal->header.m_noOfSections, noOfSections);
3607 
3608   info.m_sectionPtr[0].m_linear.p = NULL;
3609   info.m_sectionPtr[1].m_linear.p = NULL;
3610   info.m_sectionPtr[2].m_linear.p = NULL;
3611 
3612   Uint32 totalSize = 0;
3613   switch(noOfSections){
3614   case 3:
3615     info.m_sectionPtr[2].m_linear = ptr[2];
3616     totalSize += ptr[2].sz;
3617     // Fall through
3618   case 2:
3619     info.m_sectionPtr[1].m_linear = ptr[1];
3620     totalSize += ptr[1].sz;
3621     // Fall through
3622   case 1:
3623     info.m_sectionPtr[0].m_linear = ptr[0];
3624     totalSize += ptr[0].sz;
3625   }
3626 
3627   if(totalSize <= messageSize + SectionSegment::DataLength){
3628     /**
3629      * Send signal directly
3630      */
3631     sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
3632     info.m_status = FragmentSendInfo::SendComplete;
3633 
3634     /**
3635      * Indicate to sendLinearSignalFragment
3636      *   that we'r already done
3637      */
3638     return true;
3639   }
3640 
3641   /**
3642    * Setup info object
3643    */
3644   info.m_status = FragmentSendInfo::SendNotComplete;
3645   info.m_prio = (Uint8)jbuf;
3646   info.m_gsn = gsn;
3647   info.m_messageSize = messageSize;
3648   info.m_fragInfo = 1;
3649   info.m_flags = 0;
3650   info.m_fragmentId = c_fragmentIdCounter++;
3651   info.m_nodeReceiverGroup = rg;
3652   info.m_callback.m_callbackFunction = 0;
3653 
3654   Ptr<SectionSegment> tmp;
3655   if(unlikely(!import(tmp, &signal->theData[0], length)))
3656   {
3657     handle_out_of_longsignal_memory(0);
3658     return false;
3659   }
3660 
3661   info.m_theDataSection.p = &tmp.p->theData[0];
3662   info.m_theDataSection.sz = length;
3663   tmp.p->theData[length] = tmp.i;
3664 
3665   sendNextLinearFragment(signal, info);
3666 
3667   if(c_fragmentIdCounter == 0){
3668     /**
3669      * Fragment id 0 is invalid
3670      */
3671     c_fragmentIdCounter = 1;
3672   }
3673 
3674   return true;
3675 }
3676 
3677 void
sendNextLinearFragment(Signal * signal,FragmentSendInfo & info)3678 SimulatedBlock::sendNextLinearFragment(Signal* signal,
3679 				       FragmentSendInfo & info){
3680 
3681   if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3682   {
3683     /* Send was cancelled - all dest. nodes have failed
3684      * since send was started
3685      */
3686     /* Free inline signal data storage section */
3687     Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3688     g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3689 
3690     info.m_status = FragmentSendInfo::SendComplete;
3691     return;
3692   }
3693 
3694   /**
3695    * Store "theData"
3696    */
3697   const Uint32 sigLen = info.m_theDataSection.sz;
3698   memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3699 
3700   Uint32 sz = 0;
3701   Uint32 maxSz = info.m_messageSize;
3702 
3703   Int32 secNo = 2;
3704   Uint32 secCount = 0;
3705   Uint32 * secNos = &signal->theData[sigLen];
3706   LinearSectionPtr signalPtr[3];
3707 
3708   enum { Unknown = 0, Full = 2 } loop = Unknown;
3709   for(; secNo >= 0 && secCount < 3; secNo--){
3710     Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
3711     if(ptrP == NULL)
3712       continue;
3713 
3714     info.m_sectionPtr[secNo].m_linear.p = NULL;
3715     const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
3716 
3717     signalPtr[secCount].p = ptrP;
3718     signalPtr[secCount].sz = size;
3719     secNos[secCount] = secNo;
3720     secCount++;
3721 
3722     const Uint32 sizeLeft = maxSz - sz;
3723     if(size <= sizeLeft){
3724       /**
3725        * The section fits
3726        */
3727       sz += size;
3728       lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3729       continue;
3730     }
3731 
3732     const Uint32 overflow = size - sizeLeft; // > 0
3733     if(overflow <= SectionSegment::DataLength){
3734       /**
3735        * Only one segment left to send
3736        *   send even if sizeLeft <= size
3737        */
3738       lsout(ndbout_c("section %d saved as %d but full over: %d",
3739 		     secNo, secCount-1, overflow));
3740       secNo--;
3741       break;
3742     }
3743 
3744     // size >= 61
3745     if(sizeLeft < SectionSegment::DataLength){
3746       /**
3747        * Less than one segment left (space)
3748        *   dont bother sending
3749        */
3750       secCount--;
3751       info.m_sectionPtr[secNo].m_linear.p = ptrP;
3752       loop = Full;
3753       lsout(ndbout_c("section %d not saved", secNo));
3754       break;
3755     }
3756 
3757     /**
3758      * Split list
3759      * 1) Find place to split
3760      * 2) Rewrite header (the part that will be sent)
3761      * 3) Write new header (for remaining part)
3762      * 4) Store new header on FragmentSendInfo - record
3763      */
3764     Uint32 sum = sizeLeft;
3765     sum /= SectionSegment::DataLength;
3766     sum *= SectionSegment::DataLength;
3767 
3768     /**
3769      * Rewrite header w.r.t size
3770      */
3771     Uint32 prev = secCount - 1;
3772     signalPtr[prev].sz = sum;
3773 
3774     /**
3775      * Write/store "new" header
3776      */
3777     info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
3778     info.m_sectionPtr[secNo].m_linear.sz = size - sum;
3779 
3780     loop = Full;
3781     lsout(ndbout_c("section %d split into %d", secNo, prev));
3782     break;
3783   }
3784 
3785   lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3786 		 loop, secNo, secCount, sz));
3787 
3788   /**
3789    * Store fragment id
3790    */
3791   secNos[secCount] = info.m_fragmentId;
3792 
3793   Uint32 fragInfo = info.m_fragInfo;
3794   info.m_fragInfo = 2;
3795   switch(loop){
3796   case Unknown:
3797     if(secNo >= 0){
3798       lsout(ndbout_c("Unknown - Full"));
3799       /**
3800        * Not finished
3801        */
3802       break;
3803     }
3804     // Fall through
3805     lsout(ndbout_c("Unknown - Done"));
3806     info.m_status = FragmentSendInfo::SendComplete;
3807     ndbassert(fragInfo == 2);
3808     fragInfo = 3;
3809   case Full:
3810     break;
3811   }
3812 
3813   signal->header.m_noOfSections = 0;
3814   signal->header.m_fragmentInfo = fragInfo;
3815 
3816   sendSignal(info.m_nodeReceiverGroup,
3817 	     info.m_gsn,
3818 	     signal,
3819 	     sigLen + secCount + 1,
3820 	     (JobBufferLevel)info.m_prio,
3821 	     signalPtr,
3822 	     secCount);
3823 
3824   if(fragInfo == 3){
3825     /**
3826      * This is the last signal
3827      */
3828     g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3829   }
3830 }
3831 
3832 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3833 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3834 				     GlobalSignalNumber gsn,
3835 				     Signal* signal,
3836 				     Uint32 length,
3837 				     JobBufferLevel jbuf,
3838 				     SectionHandle* sections,
3839 				     Callback & c,
3840 				     Uint32 messageSize){
3841   bool res = true;
3842   Ptr<FragmentSendInfo> tmp;
3843   res = c_segmentedFragmentSendList.seizeFirst(tmp);
3844   ndbrequire(res);
3845 
3846   res = sendFirstFragment(* tmp.p,
3847 			  NodeReceiverGroup(ref),
3848 			  gsn,
3849 			  signal,
3850 			  length,
3851 			  jbuf,
3852 			  sections,
3853                           false, // Release sections on send
3854 			  messageSize);
3855   ndbrequire(res);
3856 
3857   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3858     c_segmentedFragmentSendList.release(tmp);
3859     if(c.m_callbackFunction != 0)
3860       execute(signal, c, 0);
3861     return;
3862   }
3863   tmp.p->m_callback = c;
3864 
3865   if(!c_fragSenderRunning)
3866   {
3867     SaveSignal<2> save(signal);
3868     c_fragSenderRunning = true;
3869     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3870     sig->type = ContinueFragmented::CONTINUE_SENDING;
3871     sig->line = __LINE__;
3872     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3873   }
3874 }
3875 
3876 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3877 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3878 				     GlobalSignalNumber gsn,
3879 				     Signal* signal,
3880 				     Uint32 length,
3881 				     JobBufferLevel jbuf,
3882 				     SectionHandle * sections,
3883 				     Callback & c,
3884 				     Uint32 messageSize){
3885   bool res = true;
3886   Ptr<FragmentSendInfo> tmp;
3887   res = c_segmentedFragmentSendList.seizeFirst(tmp);
3888   ndbrequire(res);
3889 
3890   res = sendFirstFragment(* tmp.p,
3891 			  rg,
3892 			  gsn,
3893 			  signal,
3894 			  length,
3895 			  jbuf,
3896 			  sections,
3897 			  false, // Release sections on send
3898                           messageSize);
3899   ndbrequire(res);
3900 
3901   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3902     c_segmentedFragmentSendList.release(tmp);
3903     if(c.m_callbackFunction != 0)
3904       execute(signal, c, 0);
3905     return;
3906   }
3907   tmp.p->m_callback = c;
3908 
3909   if(!c_fragSenderRunning)
3910   {
3911     SaveSignal<2> save(signal);
3912     c_fragSenderRunning = true;
3913     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3914     sig->type = ContinueFragmented::CONTINUE_SENDING;
3915     sig->line = __LINE__;
3916     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3917   }
3918 }
3919 
3920 SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
3921 void
TheNULLCallbackFunction(class Signal *,Uint32,Uint32)3922 SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32)
3923 { abort(); /* should never be called */ }
3924 SimulatedBlock::Callback SimulatedBlock::TheNULLCallback =
3925 { &SimulatedBlock::TheNULLCallbackFunction, 0 };
3926 
3927 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3928 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3929 				     GlobalSignalNumber gsn,
3930 				     Signal* signal,
3931 				     Uint32 length,
3932 				     JobBufferLevel jbuf,
3933 				     LinearSectionPtr ptr[3],
3934 				     Uint32 noOfSections,
3935 				     Callback & c,
3936 				     Uint32 messageSize){
3937   bool res = true;
3938   Ptr<FragmentSendInfo> tmp;
3939   res = c_linearFragmentSendList.seizeFirst(tmp);
3940   ndbrequire(res);
3941 
3942   res = sendFirstFragment(* tmp.p,
3943 			  NodeReceiverGroup(ref),
3944 			  gsn,
3945 			  signal,
3946 			  length,
3947 			  jbuf,
3948 			  ptr,
3949 			  noOfSections,
3950 			  messageSize);
3951   ndbrequire(res);
3952 
3953   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3954     c_linearFragmentSendList.release(tmp);
3955     if(c.m_callbackFunction != 0)
3956       execute(signal, c, 0);
3957     return;
3958   }
3959   tmp.p->m_callback = c;
3960 
3961   if(!c_fragSenderRunning)
3962   {
3963     SaveSignal<2> save(signal);
3964     c_fragSenderRunning = true;
3965     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3966     sig->type = ContinueFragmented::CONTINUE_SENDING;
3967     sig->line = __LINE__;
3968     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3969   }
3970 }
3971 
3972 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3973 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3974 				     GlobalSignalNumber gsn,
3975 				     Signal* signal,
3976 				     Uint32 length,
3977 				     JobBufferLevel jbuf,
3978 				     LinearSectionPtr ptr[3],
3979 				     Uint32 noOfSections,
3980 				     Callback & c,
3981 				     Uint32 messageSize){
3982   bool res = true;
3983   Ptr<FragmentSendInfo> tmp;
3984   res = c_linearFragmentSendList.seizeFirst(tmp);
3985   ndbrequire(res);
3986 
3987   res = sendFirstFragment(* tmp.p,
3988 			  rg,
3989 			  gsn,
3990 			  signal,
3991 			  length,
3992 			  jbuf,
3993 			  ptr,
3994 			  noOfSections,
3995 			  messageSize);
3996   ndbrequire(res);
3997 
3998   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3999     c_linearFragmentSendList.release(tmp);
4000     if(c.m_callbackFunction != 0)
4001       execute(signal, c, 0);
4002     return;
4003   }
4004   tmp.p->m_callback = c;
4005 
4006   if(!c_fragSenderRunning)
4007   {
4008     SaveSignal<2> save(signal);
4009     c_fragSenderRunning = true;
4010     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
4011     sig->type = ContinueFragmented::CONTINUE_SENDING;
4012     sig->line = __LINE__;
4013     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
4014   }
4015 }
4016 
4017 void
sendBatchedFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Callback & c,Uint32 messageSize)4018 SimulatedBlock::sendBatchedFragmentedSignal(BlockReference ref,
4019                                             GlobalSignalNumber gsn,
4020                                             Signal* signal,
4021                                             Uint32 length,
4022                                             JobBufferLevel jbuf,
4023                                             SectionHandle* sections,
4024                                             bool noRelease,
4025                                             Callback & c,
4026                                             Uint32 messageSize)
4027 {
4028   jam();
4029   bool res = true;
4030   FragmentSendInfo fragSendInfo;
4031 
4032   const Uint32 noOfSections = sections->m_cnt;
4033   SegmentedSectionPtr * const ptr = sections->m_ptr;
4034   const Uint32 totalSize =
4035     (noOfSections >= 1 ? ptr[0].sz : 0) +
4036     (noOfSections >= 2 ? ptr[1].sz : 0) +
4037     (noOfSections >= 3 ? ptr[2].sz : 0);
4038 
4039   res = sendFirstFragment(fragSendInfo,
4040                           NodeReceiverGroup(ref),
4041                           gsn,
4042                           signal,
4043                           length,
4044                           jbuf,
4045                           sections,
4046                           noRelease,
4047                           messageSize);
4048   ndbrequire(res);
4049 
4050   Uint32 guard = totalSize / messageSize + 1 + noOfSections;
4051 
4052   while (guard > 0 && fragSendInfo.m_status != FragmentSendInfo::SendComplete)
4053   {
4054     jam();
4055     guard--;
4056     // Send remaining fragments
4057     sendNextSegmentedFragment(signal, fragSendInfo);
4058   }
4059 
4060   ndbrequire(fragSendInfo.m_status == FragmentSendInfo::SendComplete);
4061 
4062   if (c.m_callbackFunction != nullptr)
4063   {
4064     jam();
4065     execute(signal, c, 0);
4066   }
4067   return;
4068 }
4069 
4070 void
sendBatchedFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Callback & c,Uint32 messageSize)4071 SimulatedBlock::sendBatchedFragmentedSignal(NodeReceiverGroup rg,
4072                                             GlobalSignalNumber gsn,
4073                                             Signal* signal,
4074                                             Uint32 length,
4075                                             JobBufferLevel jbuf,
4076                                             SectionHandle * sections,
4077                                             bool noRelease,
4078                                             Callback & c,
4079                                             Uint32 messageSize)
4080 {
4081   jam();
4082   bool res = true;
4083   FragmentSendInfo fragSendInfo;
4084 
4085   const Uint32 noOfSections = sections->m_cnt;
4086   SegmentedSectionPtr * const ptr = sections->m_ptr;
4087   const Uint32 totalSize =
4088     (noOfSections >= 1 ? ptr[0].sz : 0) +
4089     (noOfSections >= 2 ? ptr[1].sz : 0) +
4090     (noOfSections >= 3 ? ptr[2].sz : 0);
4091 
4092   res = sendFirstFragment(fragSendInfo,
4093                           rg,
4094                           gsn,
4095                           signal,
4096                           length,
4097                           jbuf,
4098                           sections,
4099                           noRelease,
4100                           messageSize);
4101   ndbrequire(res);
4102 
4103   Uint32 guard = totalSize / messageSize + 1 + noOfSections;
4104 
4105   while (guard > 0 && fragSendInfo.m_status != FragmentSendInfo::SendComplete)
4106   {
4107     jam();
4108     guard--;
4109     // Send remaining fragments
4110     sendNextSegmentedFragment(signal, fragSendInfo);
4111   }
4112 
4113   ndbrequire(fragSendInfo.m_status == FragmentSendInfo::SendComplete);
4114 
4115   if (c.m_callbackFunction != nullptr)
4116   {
4117     jam();
4118     execute(signal, c, 0);
4119   }
4120   return;
4121 }
4122 
4123 void
sendBatchedFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)4124 SimulatedBlock::sendBatchedFragmentedSignal(BlockReference ref,
4125                                             GlobalSignalNumber gsn,
4126                                             Signal* signal,
4127                                             Uint32 length,
4128                                             JobBufferLevel jbuf,
4129                                             LinearSectionPtr ptr[3],
4130                                             Uint32 noOfSections,
4131                                             Callback & c,
4132                                             Uint32 messageSize)
4133 {
4134   jam();
4135   bool res = true;
4136   FragmentSendInfo fragSendInfo;
4137 
4138   res = sendFirstFragment(fragSendInfo,
4139                           NodeReceiverGroup(ref),
4140                           gsn,
4141                           signal,
4142                           length,
4143                           jbuf,
4144                           ptr,
4145                           noOfSections,
4146                           messageSize);
4147   ndbrequire(res);
4148 
4149   const Uint32 totalSize =
4150     (noOfSections >= 1 ? ptr[0].sz : 0) +
4151     (noOfSections >= 2 ? ptr[1].sz : 0) +
4152     (noOfSections >= 3 ? ptr[2].sz : 0);
4153 
4154   Uint32 guard = totalSize / messageSize + 1 + noOfSections;
4155 
4156   while (guard > 0 && fragSendInfo.m_status != FragmentSendInfo::SendComplete)
4157   {
4158     jam();
4159     guard--;
4160     // Send remaining fragments
4161     sendNextLinearFragment(signal, fragSendInfo);
4162   }
4163 
4164   ndbrequire(fragSendInfo.m_status == FragmentSendInfo::SendComplete);
4165 
4166   if (c.m_callbackFunction != nullptr)
4167   {
4168     execute(signal, c, 0);
4169   }
4170   return;
4171 }
4172 
4173 void
sendBatchedFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)4174 SimulatedBlock::sendBatchedFragmentedSignal(NodeReceiverGroup rg,
4175                                             GlobalSignalNumber gsn,
4176                                             Signal* signal,
4177                                             Uint32 length,
4178                                             JobBufferLevel jbuf,
4179                                             LinearSectionPtr ptr[3],
4180                                             Uint32 noOfSections,
4181                                             Callback & c,
4182                                             Uint32 messageSize)
4183 {
4184   jam();
4185   bool res = true;
4186   FragmentSendInfo fragSendInfo;
4187 
4188   res = sendFirstFragment(fragSendInfo,
4189                           rg,
4190                           gsn,
4191                           signal,
4192                           length,
4193                           jbuf,
4194                           ptr,
4195                           noOfSections,
4196                           messageSize);
4197   ndbrequire(res);
4198 
4199   const Uint32 totalSize =
4200     (noOfSections >= 1 ? ptr[0].sz : 0) +
4201     (noOfSections >= 2 ? ptr[1].sz : 0) +
4202     (noOfSections >= 3 ? ptr[2].sz : 0);
4203 
4204   Uint32 guard = totalSize / messageSize + 1 + noOfSections;
4205 
4206   while (guard > 0 && fragSendInfo.m_status != FragmentSendInfo::SendComplete)
4207   {
4208     jam();
4209     guard--;
4210     // Send remaining fragments
4211     sendNextLinearFragment(signal, fragSendInfo);
4212   }
4213 
4214   ndbrequire(fragSendInfo.m_status == FragmentSendInfo::SendComplete);
4215 
4216   if (c.m_callbackFunction != nullptr)
4217   {
4218     execute(signal, c, 0);
4219   }
4220   return;
4221 }
4222 
4223 
4224 NodeInfo &
setNodeInfo(NodeId nodeId)4225 SimulatedBlock::setNodeInfo(NodeId nodeId) {
4226   ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
4227   return globalData.m_nodeInfo[nodeId];
4228 }
4229 
4230 bool
isMultiThreaded()4231 SimulatedBlock::isMultiThreaded()
4232 {
4233 #ifdef NDBD_MULTITHREADED
4234   return true;
4235 #else
4236   return false;
4237 #endif
4238 }
4239 
4240 
4241 void
execUTIL_CREATE_LOCK_REF(Signal * signal)4242 SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){
4243   jamEntry();
4244   c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
4245 }
4246 
execUTIL_CREATE_LOCK_CONF(Signal * signal)4247 void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){
4248   jamEntry();
4249   c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
4250 }
4251 
execUTIL_DESTORY_LOCK_REF(Signal * signal)4252 void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){
4253   jamEntry();
4254   c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
4255 }
4256 
execUTIL_DESTORY_LOCK_CONF(Signal * signal)4257 void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){
4258   jamEntry();
4259   c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
4260 }
4261 
execUTIL_LOCK_REF(Signal * signal)4262 void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){
4263   jamEntry();
4264   c_mutexMgr.execUTIL_LOCK_REF(signal);
4265 }
4266 
execUTIL_LOCK_CONF(Signal * signal)4267 void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){
4268   jamEntry();
4269   c_mutexMgr.execUTIL_LOCK_CONF(signal);
4270 }
4271 
execUTIL_UNLOCK_REF(Signal * signal)4272 void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){
4273   jamEntry();
4274   c_mutexMgr.execUTIL_UNLOCK_REF(signal);
4275 }
4276 
execUTIL_UNLOCK_CONF(Signal * signal)4277 void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){
4278   jamEntry();
4279   c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
4280 }
4281 
4282 void
ignoreMutexUnlockCallback(Signal * signal,Uint32 ptrI,Uint32 retVal)4283 SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal,
4284 					  Uint32 ptrI, Uint32 retVal){
4285   c_mutexMgr.release(ptrI);
4286 }
4287 
4288 void
fsRefError(Signal * signal,Uint32 line,const char * msg)4289 SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg)
4290 {
4291   const FsRef *fsRef = (FsRef*)signal->getDataPtr();
4292   Uint32 errorCode = fsRef->errorCode;
4293   Uint32 osErrorCode = fsRef->osErrorCode;
4294   char msg2[100];
4295 
4296   sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
4297 
4298   progError(line, errorCode, msg2);
4299 }
4300 
4301 void
execFSWRITEREF(Signal * signal)4302 SimulatedBlock::execFSWRITEREF(Signal* signal)
4303 {
4304   fsRefError(signal, __LINE__, "File system write failed");
4305 }
4306 
4307 void
execFSREADREF(Signal * signal)4308 SimulatedBlock::execFSREADREF(Signal* signal)
4309 {
4310   fsRefError(signal, __LINE__, "File system read failed");
4311 }
4312 
4313 void
execFSCLOSEREF(Signal * signal)4314 SimulatedBlock::execFSCLOSEREF(Signal* signal)
4315 {
4316   fsRefError(signal, __LINE__, "File system close failed");
4317 }
4318 
4319 void
execFSOPENREF(Signal * signal)4320 SimulatedBlock::execFSOPENREF(Signal* signal)
4321 {
4322   fsRefError(signal, __LINE__, "File system open failed");
4323 }
4324 
4325 void
execFSREMOVEREF(Signal * signal)4326 SimulatedBlock::execFSREMOVEREF(Signal* signal)
4327 {
4328   fsRefError(signal, __LINE__, "File system remove failed");
4329 }
4330 
4331 void
execFSSYNCREF(Signal * signal)4332 SimulatedBlock::execFSSYNCREF(Signal* signal)
4333 {
4334   fsRefError(signal, __LINE__, "File system sync failed");
4335 }
4336 
4337 void
execFSAPPENDREF(Signal * signal)4338 SimulatedBlock::execFSAPPENDREF(Signal* signal)
4339 {
4340   fsRefError(signal, __LINE__, "File system append failed");
4341 }
4342 
4343 #if defined(USE_INIT_GLOBAL_VARIABLES)
4344 void
disable_global_variables()4345 SimulatedBlock::disable_global_variables()
4346 {
4347 #ifdef NDBD_MULTITHREADED
4348   mt_disable_global_variables(m_threadId);
4349 #endif
4350 }
4351 
4352 void
enable_global_variables()4353 SimulatedBlock::enable_global_variables()
4354 {
4355 #ifdef NDBD_MULTITHREADED
4356   mt_enable_global_variables(m_threadId);
4357 #endif
4358 }
4359 
4360 void
init_global_ptrs(void ** tmp,size_t cnt)4361 SimulatedBlock::init_global_ptrs(void ** tmp, size_t cnt)
4362 {
4363 #ifdef NDBD_MULTITHREADED
4364   mt_init_global_variables_ptr_instances(m_threadId, tmp, cnt);
4365 #endif
4366 }
4367 
4368 void
init_global_uint32_ptrs(void ** tmp,size_t cnt)4369 SimulatedBlock::init_global_uint32_ptrs(void ** tmp, size_t cnt)
4370 {
4371 #ifdef NDBD_MULTITHREADED
4372   mt_init_global_variables_uint32_ptr_instances(m_threadId, tmp, cnt);
4373 #endif
4374 }
4375 
4376 void
init_global_uint32(void ** tmp,size_t cnt)4377 SimulatedBlock::init_global_uint32(void ** tmp, size_t cnt)
4378 {
4379 #ifdef NDBD_MULTITHREADED
4380   mt_init_global_variables_uint32_instances(m_threadId, tmp, cnt);
4381 #endif
4382 }
4383 #endif
4384 
4385 int
cmp_key(Uint32 tab,const Uint32 * s1,const Uint32 * s2) const4386 SimulatedBlock::cmp_key(Uint32 tab, const Uint32 *s1, const Uint32 *s2) const
4387 {
4388   const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
4389   const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
4390 
4391   for (Uint32 i = 0; i < noOfKeyAttr; i++)
4392   {
4393     const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
4394     const Uint32 attrDesc = keyAttr.attributeDescriptor;
4395     const Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDesc);
4396 
4397     const int res = cmp_attr(attrDesc, keyAttr.charsetInfo,
4398 			     s1, srcBytes, s2, srcBytes);
4399     if (res != 0)
4400       return res;
4401 
4402     if (i+1 < noOfKeyAttr) //Optimization; skip if last keyAttr
4403     {
4404       const Uint32 typeId = AttributeDescriptor::getType(attrDesc);
4405       Uint32 lb, len;
4406       ndbrequire(NdbSqlUtil::get_var_length(typeId, s1, srcBytes, lb, len));
4407       s1 += ((len+lb+3) >> 2);
4408 
4409       ndbrequire(NdbSqlUtil::get_var_length(typeId, s2, srcBytes, lb, len));
4410       s2 += ((len+lb+3) >> 2);
4411     }
4412   }
4413   // Fall through: Compared equal
4414   return 0;
4415 }
4416 
4417 int
cmp_attr(Uint32 attrDesc,const CHARSET_INFO * cs,const Uint32 * s1,Uint32 s1Len,const Uint32 * s2,Uint32 s2Len) const4418 SimulatedBlock::cmp_attr(Uint32 attrDesc, const CHARSET_INFO* cs,
4419 			 const Uint32 *s1, Uint32 s1Len,
4420 			 const Uint32 *s2, Uint32 s2Len) const
4421 {
4422   const Uint32 typeId = AttributeDescriptor::getType(attrDesc);
4423   NdbSqlUtil::Cmp *cmp = NdbSqlUtil::getType(typeId).m_cmp;
4424   return (*cmp)(cs, s1, s1Len, s2, s1Len);
4425 }
4426 
4427 
4428 Uint32
xfrm_key_hash(Uint32 tab,const Uint32 * src,Uint32 * dst,Uint32 dstSize,Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const4429 SimulatedBlock::xfrm_key_hash(
4430                          Uint32 tab, const Uint32* src,
4431 			 Uint32 *dst, Uint32 dstSize,
4432 			 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
4433 {
4434   const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
4435   const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
4436 
4437   Uint32 i = 0;
4438   Uint32 srcPos = 0;
4439   Uint32 dstPos = 0;
4440   while (i < noOfKeyAttr)
4441   {
4442     const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
4443     Uint32 dstWords =
4444       xfrm_attr_hash(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
4445                 src, srcPos, dst, dstPos, dstSize);
4446     keyPartLen[i++] = dstWords;
4447     if (unlikely(dstWords == 0))
4448       return 0;
4449   }
4450 
4451   if (0)
4452   {
4453     for(Uint32 i = 0; i<dstPos; i++)
4454     {
4455       printf("%.8x ", dst[i]);
4456     }
4457     printf("\n");
4458   }
4459   return dstPos;
4460 }
4461 
4462 Uint32
xfrm_attr_hash(Uint32 attrDesc,const CHARSET_INFO * cs,const Uint32 * src,Uint32 & srcPos,Uint32 * dst,Uint32 & dstPos,Uint32 dstSize) const4463 SimulatedBlock::xfrm_attr_hash(
4464                           Uint32 attrDesc, const CHARSET_INFO* cs,
4465                           const Uint32* src, Uint32 & srcPos,
4466                           Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const
4467 {
4468   Uint32 array =
4469     AttributeDescriptor::getArrayType(attrDesc);
4470   Uint32 srcBytes =
4471     AttributeDescriptor::getSizeInBytes(attrDesc);
4472 
4473   Uint32 srcWords = ~0;
4474   Uint32 dstWords = ~0;
4475   uchar* dstPtr = (uchar*)&dst[dstPos];
4476   const uchar* srcPtr = (const uchar*)&src[srcPos];
4477 
4478   if (cs == NULL)
4479   {
4480     jam();
4481     Uint32 len = 0;
4482     switch(array){
4483     case NDB_ARRAYTYPE_SHORT_VAR:
4484       len = 1 + srcPtr[0];
4485       break;
4486     case NDB_ARRAYTYPE_MEDIUM_VAR:
4487       len = 2 + srcPtr[0] + (srcPtr[1] << 8);
4488       break;
4489 #ifndef VM_TRACE
4490     default:
4491       abort();
4492 #endif
4493     case NDB_ARRAYTYPE_FIXED:
4494       len = srcBytes;
4495     }
4496     srcWords = (len + 3) >> 2;
4497     dstWords = srcWords;
4498     memcpy(dstPtr, srcPtr, dstWords << 2);
4499 
4500     if (0)
4501     {
4502       ndbout_c("srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d",
4503                srcPos, dstPos, len, srcWords, dstWords);
4504 
4505       for(Uint32 i = 0; i<srcWords; i++)
4506         printf("%.8x ", src[srcPos + i]);
4507       printf("\n");
4508     }
4509   }
4510   else
4511   {
4512     jam();
4513     Uint32 typeId =
4514       AttributeDescriptor::getType(attrDesc);
4515     Uint32 lb, len;
4516     bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
4517     if (unlikely(!ok))
4518       return 0;
4519 
4520     // remLen: Remaining dst-buffer length
4521     // len:    Actual length of 'src'
4522     // defLen: Max defined length of src data
4523     const unsigned remLen = ((dstSize - dstPos) << 2);
4524     const unsigned defLen = srcBytes - lb;
4525     int n = NdbSqlUtil::strnxfrm_hash(cs, typeId,
4526                                  dstPtr, remLen,
4527                                  srcPtr + lb, len, defLen);
4528 
4529     if (unlikely(n == -1))
4530       return 0;
4531     while ((n & 3) != 0)
4532     {
4533       dstPtr[n++] = 0;
4534     }
4535     dstWords = (n >> 2);
4536     srcWords = (lb + len + 3) >> 2;
4537   }
4538 
4539   dstPos += dstWords;
4540   srcPos += srcWords;
4541   return dstWords;
4542 }
4543 
4544 Uint32
create_distr_key(Uint32 tableId,const Uint32 * src,Uint32 * dst,const Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const4545 SimulatedBlock::create_distr_key(Uint32 tableId,
4546 				 const Uint32 *src,
4547                                  Uint32* dst,
4548 				 const Uint32
4549 				 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
4550 {
4551   const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
4552   const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
4553   Uint32 noOfDistrKeys = desc->noOfDistrKeys;
4554 
4555   Uint32 i = 0;
4556   Uint32 dstPos = 0;
4557 
4558   /* --Note that src and dst may be the same location-- */
4559 
4560   if(keyPartLen)
4561   {
4562     while (i < noOfKeyAttr && noOfDistrKeys)
4563     {
4564       Uint32 attr = desc->keyAttr[i].attributeDescriptor;
4565       Uint32 len = keyPartLen[i];
4566       if(AttributeDescriptor::getDKey(attr))
4567       {
4568 	noOfDistrKeys--;
4569 	memmove(dst+dstPos, src, len << 2);
4570 	dstPos += len;
4571       }
4572       src += len;
4573       i++;
4574     }
4575   }
4576   else
4577   {
4578     while (i < noOfKeyAttr && noOfDistrKeys)
4579     {
4580       Uint32 attr = desc->keyAttr[i].attributeDescriptor;
4581       Uint32 len = AttributeDescriptor::getSizeInWords(attr);
4582       ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED);
4583       if(AttributeDescriptor::getDKey(attr))
4584       {
4585 	noOfDistrKeys--;
4586 	memmove(dst+dstPos, src, len << 2);
4587 	dstPos += len;
4588       }
4589       src += len;
4590       i++;
4591     }
4592   }
4593   return dstPos;
4594 }
4595 
4596 CArray<KeyDescriptor> g_key_descriptor_pool;
4597 
4598 void
sendRoutedSignal(RoutePath path[],Uint32 pathcnt,Uint32 dst[],Uint32 dstcnt,Uint32 gsn,Signal * signal,Uint32 sigLen,JobBufferLevel prio,SectionHandle * userhandle)4599 SimulatedBlock::sendRoutedSignal(RoutePath path[], Uint32 pathcnt,
4600                                  Uint32 dst[],
4601                                  Uint32 dstcnt,
4602                                  Uint32 gsn,
4603                                  Signal * signal,
4604                                  Uint32 sigLen,
4605                                  JobBufferLevel prio,
4606                                  SectionHandle * userhandle)
4607 {
4608   ndbrequire(pathcnt > 0); // don't support (now) directly multi-cast
4609   pathcnt--; // first hop is made from here
4610 
4611 
4612   Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt;
4613   ndbrequire(len <= 25);
4614 
4615   SectionHandle handle(this, signal);
4616   if (userhandle)
4617   {
4618     jam();
4619     handle.m_cnt = userhandle->m_cnt;
4620     for (Uint32 i = 0; i<handle.m_cnt; i++)
4621       handle.m_ptr[i] = userhandle->m_ptr[i];
4622     userhandle->m_cnt = 0;
4623   }
4624 
4625   if (len + sigLen > 25)
4626   {
4627     jam();
4628 
4629     /**
4630      * we need to store theData in a section
4631      */
4632     ndbrequire(handle.m_cnt < 3);
4633     handle.m_ptr[2] = handle.m_ptr[1];
4634     handle.m_ptr[1] = handle.m_ptr[0];
4635     Ptr<SectionSegment> tmp;
4636     if (unlikely(! import(tmp, signal->theData, sigLen)))
4637     {
4638       handle_out_of_longsignal_memory(0);
4639     }
4640     handle.m_ptr[0].p = tmp.p;
4641     handle.m_ptr[0].i = tmp.i;
4642     handle.m_ptr[0].sz = sigLen;
4643     handle.m_cnt ++;
4644   }
4645   else
4646   {
4647     jam();
4648     memmove(signal->theData + len, signal->theData, 4 * sigLen);
4649     len += sigLen;
4650   }
4651 
4652   LocalRouteOrd * ord = (LocalRouteOrd*)signal->getDataPtrSend();
4653   ord->cnt = (pathcnt << 16) | (dstcnt);
4654   ord->gsn = gsn;
4655   ord->prio = Uint32(prio);
4656 
4657   Uint32 * dstptr = ord->path;
4658   for (Uint32 i = 1; i <= pathcnt; i++)
4659   {
4660     ndbrequire(refToNode(path[i].ref) == 0 ||
4661                refToNode(path[i].ref) == getOwnNodeId());
4662 
4663     * dstptr++ = path[i].ref;
4664     * dstptr++ = Uint32(path[i].prio);
4665   }
4666 
4667   for (Uint32 i = 0; i<dstcnt; i++)
4668   {
4669     ndbrequire(refToNode(dst[i]) == 0 ||
4670                refToNode(dst[i]) == getOwnNodeId());
4671 
4672     * dstptr++ = dst[i];
4673   }
4674 
4675   sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4676              path[0].prio, &handle);
4677 }
4678 
4679 void
execLOCAL_ROUTE_ORD(Signal * signal)4680 SimulatedBlock::execLOCAL_ROUTE_ORD(Signal* signal)
4681 {
4682   jamEntry();
4683 
4684   if (!assembleFragments(signal))
4685   {
4686     jam();
4687     return;
4688   }
4689 
4690   if (ERROR_INSERTED(1001))
4691   {
4692     /**
4693      * This NDBCNTR error code 1001
4694      */
4695     jam();
4696     SectionHandle handle(this, signal);
4697     sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200,
4698                         signal->getLength(), &handle);
4699     return;
4700   }
4701 
4702   LocalRouteOrd* ord = (LocalRouteOrd*)signal->getDataPtr();
4703   Uint32 pathcnt = ord->cnt >> 16;
4704   Uint32 dstcnt = ord->cnt & 0xFFFF;
4705   Uint32 sigLen = signal->getLength();
4706 
4707   if (pathcnt == 0)
4708   {
4709     /**
4710      * Send to final destination(s);
4711      */
4712     jam();
4713     Uint32 gsn = ord->gsn;
4714     Uint32 prio = ord->prio;
4715     memcpy(signal->theData+25, ord->path, 4*dstcnt);
4716     SectionHandle handle(this, signal);
4717     if (sigLen > LocalRouteOrd::StaticLen + dstcnt)
4718     {
4719       jam();
4720       /**
4721        * Data is at end of this...
4722        */
4723       memmove(signal->theData,
4724               signal->theData + LocalRouteOrd::StaticLen + dstcnt,
4725               4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt)));
4726       sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt);
4727     }
4728     else
4729     {
4730       jam();
4731       /**
4732        * Put section 0 in signal->theData
4733        */
4734       sigLen = handle.m_ptr[0].sz;
4735       ndbrequire(sigLen <= 25);
4736       copy(signal->theData, handle.m_ptr[0]);
4737       release(handle.m_ptr[0]);
4738 
4739       for (Uint32 i = 0; i < handle.m_cnt - 1; i++)
4740         handle.m_ptr[i] = handle.m_ptr[i+1];
4741       handle.m_cnt--;
4742     }
4743 
4744     /*
4745      * The extra if-statement is as sendSignalNoRelease will copy sections
4746      *   which is not necessary is only sending to one destination
4747      */
4748     if (dstcnt > 1)
4749     {
4750       jam();
4751       for (Uint32 i = 0; i<dstcnt; i++)
4752       {
4753         jam();
4754         sendSignalNoRelease(signal->theData[25+i], gsn, signal, sigLen,
4755                             JobBufferLevel(prio), &handle);
4756       }
4757       releaseSections(handle);
4758     }
4759     else
4760     {
4761       jam();
4762       sendSignal(signal->theData[25+0], gsn, signal, sigLen,
4763                  JobBufferLevel(prio), &handle);
4764     }
4765   }
4766   else
4767   {
4768     /**
4769      * Reroute
4770      */
4771     jam();
4772     SectionHandle handle(this, signal);
4773     Uint32 ref = ord->path[0];
4774     Uint32 prio = ord->path[1];
4775     Uint32 len = sigLen - 2;
4776     ord->cnt = ((pathcnt - 1) << 16) | dstcnt;
4777     memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen));
4778     sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4779                JobBufferLevel(prio), &handle);
4780   }
4781 }
4782 
4783 
4784 #ifdef VM_TRACE
4785 bool
debugOutOn()4786 SimulatedBlock::debugOutOn()
4787 {
4788   return true;
4789   SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
4790   return
4791     globalData.testOn &&
4792     globalSignalLoggers.logMatch(number(), mask);
4793 }
4794 
4795 const char*
debugOutTag(char * buf,int line)4796 SimulatedBlock::debugOutTag(char *buf, int line)
4797 {
4798   char blockbuf[40];
4799   char instancebuf[40];
4800   char linebuf[40];
4801   char timebuf[40];
4802   sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
4803   instancebuf[0] = 0;
4804   if (instance() != 0)
4805     sprintf(instancebuf, "/%u", instance());
4806   sprintf(linebuf, " %d", line);
4807   timebuf[0] = 0;
4808 #ifdef VM_TRACE_TIME
4809   {
4810     Uint64 t = NdbTick_CurrentMillisecond();
4811     uint s = (t / 1000) % 3600;
4812     uint ms = t % 1000;
4813     sprintf(timebuf, " - %u.%03u -", s, ms);
4814   }
4815 #endif
4816   sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
4817   return buf;
4818 }
4819 #endif
4820 
4821 #ifdef NDBD_MULTITHREADED
4822 // Leave synchronize_threads() undefined for ndbd where it should not be used.
4823 void
synchronize_threads(Signal * signal,const BlockThreadBitmask & threads,const Callback & cb,JobBufferLevel req_prio,JobBufferLevel conf_prio)4824 SimulatedBlock::synchronize_threads(Signal * signal,
4825                                     const BlockThreadBitmask& threads,
4826                                     const Callback & cb,
4827                                     JobBufferLevel req_prio,
4828                                     JobBufferLevel conf_prio)
4829 {
4830   jam();
4831 
4832   Ptr<SyncThreadRecord> ptr;
4833   ndbrequire(c_syncThreadPool.seize(ptr));
4834   ptr.p->m_threads = threads;
4835   ptr.p->m_cnt = 0;
4836   ptr.p->m_next = 0;
4837   ptr.p->m_callback = cb;
4838 
4839   const Uint32 cnt = threads.count();
4840   if (cnt == 0)
4841   {
4842     jam();
4843     Callback copy = cb;
4844     c_syncThreadPool.release(ptr);
4845     execute(signal, copy, 0);
4846     return;
4847   }
4848 
4849   signal->theData[0] = reference();
4850   signal->theData[1] = ptr.i;
4851   signal->theData[2] = Uint32(req_prio);
4852   signal->theData[3] = Uint32(conf_prio);
4853   ptr.p->m_next = ptr.p->m_threads.find_first();
4854   sendSYNC_THREAD_REQ(signal, ptr);
4855 }
4856 #endif
4857 
4858 void
synchronize_threads_for_blocks(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel req_prio,JobBufferLevel conf_prio)4859 SimulatedBlock::synchronize_threads_for_blocks(Signal * signal,
4860                                                const Uint32 blocks[],
4861                                                const Callback & cb,
4862                                                JobBufferLevel req_prio,
4863                                                JobBufferLevel conf_prio)
4864 {
4865 #ifndef NDBD_MULTITHREADED
4866   Callback copy = cb;
4867   execute(signal, copy, 0);
4868 #else
4869   jam();
4870 
4871   BlockThreadBitmask threads;
4872 
4873   mt_get_threads_for_blocks_no_proxy(blocks, threads);
4874 
4875   if (conf_prio == ILLEGAL_JB_LEVEL)
4876   {
4877     conf_prio = req_prio;
4878   }
4879   synchronize_threads(signal, threads, cb, req_prio, conf_prio);
4880 #endif
4881 }
4882 
4883 void
sendSYNC_THREAD_REQ(Signal * signal,Ptr<SyncThreadRecord> ptr)4884 SimulatedBlock::sendSYNC_THREAD_REQ(Signal* signal, Ptr<SyncThreadRecord> ptr)
4885 {
4886   JobBufferLevel req_prio = JobBufferLevel(signal->theData[2]);
4887   Uint32 instance = ptr.p->m_next;
4888   constexpr Uint32 MAX_FAN_OUT = 4;
4889   constexpr Uint32 MAX_INFLIGHT = 50;
4890   for (Uint32 fan_out = 0;
4891        ptr.p->m_cnt < MAX_INFLIGHT &&
4892        fan_out < MAX_FAN_OUT &&
4893        instance != BlockThreadBitmask::NotFound;
4894        fan_out++, instance = ptr.p->m_threads.find_next(instance + 1))
4895   {
4896     Uint32 ref = numberToRef(THRMAN, instance, 0);
4897     sendSignal(ref, GSN_SYNC_THREAD_REQ, signal, 4, req_prio);
4898     ptr.p->m_cnt++;
4899   }
4900   ptr.p->m_next = instance;
4901 }
4902 
4903 void
execSYNC_THREAD_REQ(Signal * signal)4904 SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
4905 {
4906   jamEntry();
4907   Uint32 ref = signal->theData[0];
4908   Uint32 conf_prio = signal->theData[3];
4909   sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
4910              JobBufferLevel(conf_prio));
4911 }
4912 
4913 void
execSYNC_THREAD_CONF(Signal * signal)4914 SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
4915 {
4916   jamEntry();
4917   Ptr<SyncThreadRecord> ptr;
4918   c_syncThreadPool.getPtr(ptr, signal->theData[1]);
4919 
4920   ndbrequire(ptr.p->m_cnt > 0);
4921   ptr.p->m_cnt--;
4922 
4923   sendSYNC_THREAD_REQ(signal, ptr);
4924 
4925   if (ptr.p->m_cnt > 0)
4926   {
4927     jam();
4928     return;
4929   }
4930 
4931   Callback copy = ptr.p->m_callback;
4932   c_syncThreadPool.release(ptr);
4933   execute(signal, copy, 0);
4934   return;
4935 }
4936 
4937 void
execSYNC_REQ(Signal * signal)4938 SimulatedBlock::execSYNC_REQ(Signal* signal)
4939 {
4940   jamEntry();
4941   Uint32 ref = signal->theData[0];
4942   Uint32 prio = signal->theData[2];
4943   sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(),
4944              JobBufferLevel(prio));
4945 }
4946 
4947 void
synchronize_external_signals(Signal * signal,const Callback & cb)4948 SimulatedBlock::synchronize_external_signals(Signal* signal, const Callback& cb)
4949 {
4950 #ifndef NDBD_MULTITHREADED
4951   Callback copy = cb;
4952   execute(signal, copy, 0);
4953 #else
4954   jam();
4955 
4956   BlockThreadBitmask threads;
4957 
4958   const Uint32 my_thr_no = getThreadId();
4959   Uint32 cnt = mt_get_addressable_threads(my_thr_no, threads);
4960 
4961   // Assume current thread does not need synchronization
4962   if (threads.get(my_thr_no))
4963   {
4964     jam();
4965     threads.clear(my_thr_no);
4966     cnt--;
4967   }
4968 
4969   synchronize_threads(signal, threads, cb, JBB, JBA);
4970 #endif
4971 }
4972 
4973 void
synchronize_path(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4974 SimulatedBlock::synchronize_path(Signal * signal,
4975                                  const Uint32 blocks[],
4976                                  const Callback & cb,
4977                                  JobBufferLevel prio)
4978 {
4979   jam();
4980 
4981   // reuse SyncThreadRecord
4982   Ptr<SyncThreadRecord> ptr;
4983   ndbrequire(c_syncThreadPool.seize(ptr));
4984   ptr.p->m_cnt = 0; // with count of 0
4985   ptr.p->m_callback = cb;
4986 
4987   SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4988   req->senderData = ptr.i;
4989   req->prio = Uint32(prio);
4990   req->count = 1;
4991   if (blocks[0] == 0)
4992   {
4993     jam();
4994     ndbabort(); // TODO
4995   }
4996   else
4997   {
4998     jam();
4999     Uint32 len = 0;
5000     for (; blocks[len+1] != 0; len++)
5001     {
5002       req->path[len] = blocks[len+1];
5003     }
5004     req->pathlen = 1 + len;
5005     req->path[len] = reference();
5006     sendSignal(numberToRef(blocks[0], getOwnNodeId()),
5007                GSN_SYNC_PATH_REQ, signal,
5008                SyncPathReq::SignalLength + (1 + len), prio);
5009   }
5010 }
5011 
5012 void
execSYNC_PATH_REQ(Signal * signal)5013 SimulatedBlock::execSYNC_PATH_REQ(Signal* signal)
5014 {
5015   jamEntry();
5016   SyncPathReq * req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
5017   if (req->pathlen == 1)
5018   {
5019     jam();
5020     SyncPathReq copy = *req;
5021     SyncPathConf* conf = CAST_PTR(SyncPathConf, signal->getDataPtrSend());
5022     conf->senderData = copy.senderData;
5023     conf->count = copy.count;
5024     sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal,
5025                SyncPathConf::SignalLength, JobBufferLevel(copy.prio));
5026   }
5027   else
5028   {
5029     jam();
5030     Uint32 ref = numberToRef(req->path[0], getOwnNodeId());
5031     req->pathlen--;
5032     memmove(req->path, req->path + 1, 4 * req->pathlen);
5033     sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
5034                SyncPathReq::SignalLength + (1 + req->pathlen),
5035                JobBufferLevel(req->prio));
5036   }
5037 }
5038 
5039 void
execSYNC_PATH_CONF(Signal * signal)5040 SimulatedBlock::execSYNC_PATH_CONF(Signal* signal)
5041 {
5042   jamEntry();
5043   SyncPathConf conf = * CAST_CONSTPTR(SyncPathConf, signal->getDataPtr());
5044   Ptr<SyncThreadRecord> ptr;
5045 
5046   c_syncThreadPool.getPtr(ptr, conf.senderData);
5047 
5048   if (ptr.p->m_cnt == 0)
5049   {
5050     jam();
5051     ptr.p->m_cnt = conf.count;
5052   }
5053 
5054   if (ptr.p->m_cnt == 1)
5055   {
5056     jam();
5057     Callback copy = ptr.p->m_callback;
5058     c_syncThreadPool.release(ptr);
5059     execute(signal, copy, 0);
5060     return;
5061   }
5062 
5063   ptr.p->m_cnt --;
5064 }
5065 
5066 
5067 bool
checkNodeFailSequence(Signal * signal)5068 SimulatedBlock::checkNodeFailSequence(Signal* signal)
5069 {
5070   Uint32 ref = signal->getSendersBlockRef();
5071 
5072   /**
5073    * Make sure that a signal being part of node-failure handling
5074    *   from a remote node, does not get to us before we got the NODE_FAILREP
5075    *   (this to avoid tricky state handling to some extent when receving
5076    *    signals from old nodes)
5077    *
5078    * To ensure this, we send the signal via the transporter for the remote
5079    * sender node via QMGR and NDBCNTR to DBDIH.  Although approximating
5080    * synchronization between all threads and transporters with a single hop to
5081    * DBLQH_REF to at least send via another thread for multi threaded data
5082    * node.
5083    *
5084    * The extra time should be negilable
5085    *
5086    * Note, make an exception for signals sent by our self
5087    *       as they are only sent as a consequence of NODE_FAILREP
5088    *
5089    * Also note that this function no longer guarantee that signal arrives to
5090    * its destination after corresponding NODE_FAILREP, as a complement caller
5091    * need some further logic delaying the processing of the signal until
5092    * NODE_FAILREP have been seen.
5093    */
5094   if (ref == reference() ||
5095       (refToNode(ref) == getOwnNodeId() &&
5096        refToMain(ref) == DBDIH))
5097   {
5098     jam();
5099     return true;
5100   }
5101 
5102   Uint32 trpman_ref;
5103   if (globalData.ndbMtReceiveThreads == 0)
5104   {
5105     jam();
5106     ndbrequire(!isNdbMt());
5107     trpman_ref = TRPMAN_REF;
5108   }
5109   else
5110   {
5111     jam();
5112     ndbrequire(isNdbMt());
5113     Uint32 sender_node = refToNode(ref);
5114     Uint32 inst = (get_recv_thread_idx(sender_node) + /* proxy */ 1);
5115     if (inst > NDBMT_MAX_BLOCK_INSTANCES)
5116     {
5117       jam();
5118       trpman_ref = TRPMAN_REF;
5119     }
5120     else
5121     {
5122       jam();
5123       trpman_ref = numberToRef(TRPMAN, inst, getOwnNodeId());
5124     }
5125   }
5126 
5127   RoutePath path[5];
5128   Uint32 path_idx = 0;
5129 
5130   /* Start at TRPMAN for sending node */
5131   path[path_idx].ref = trpman_ref;
5132   path[path_idx].prio = JBA;
5133   path_idx++;
5134 
5135   /* Follow COMMIT_FAILREQ to QMGR */
5136   path[path_idx].ref = QMGR_REF;
5137   path[path_idx].prio = JBB;
5138   path_idx++;
5139 
5140   /*
5141    * Should be sync_threads, but sends only to DBLQH_REF to at least send
5142    * to another thread than main thread (if using a multi threaded data node)
5143    */
5144   path[path_idx].ref = DBLQH_REF;
5145   path[path_idx].prio = JBB;
5146   path_idx++;
5147 
5148   /* Follow NODE_FAILREP to NDBCNT */
5149   path[path_idx].ref = NDBCNTR_REF;
5150   path[path_idx].prio = JBB;
5151   path_idx++;
5152 
5153   /* Follow NODE_FAILREP to DBDIH */
5154   path[path_idx].ref = DBDIH_REF;
5155   path[path_idx].prio = JBB;
5156   path_idx++;
5157 
5158   ndbrequire(path_idx <= NDB_ARRAY_SIZE(path));
5159 
5160   Uint32 dst[1];
5161   dst[0] = reference();
5162 
5163   SectionHandle handle(this, signal);
5164   Uint32 gsn = signal->header.theVerId_signalNumber;
5165   Uint32 len = signal->getLength();
5166 
5167   sendRoutedSignal(path, path_idx, dst, 1, gsn, signal, len, JBB, &handle);
5168   return false;
5169 }
5170 
5171 #ifdef ERROR_INSERT
5172 void
setDelayedPrepare()5173 SimulatedBlock::setDelayedPrepare()
5174 {
5175 #ifdef NDBD_MULTITHREADED
5176   mt_set_delayed_prepare(m_threadId);
5177 #else
5178   // ndbd todo
5179 #endif
5180 }
5181 #endif
5182 
5183 void
setup_wakeup()5184 SimulatedBlock::setup_wakeup()
5185 {
5186 #ifdef NDBD_MULTITHREADED
5187 #else
5188   globalTransporterRegistry.setup_wakeup_socket();
5189 #endif
5190 }
5191 
5192 void
wakeup()5193 SimulatedBlock::wakeup()
5194 {
5195 #ifdef NDBD_MULTITHREADED
5196   mt_wakeup(this);
5197 #else
5198   globalTransporterRegistry.wakeup();
5199 #endif
5200 }
5201 
5202 
5203 void
ndbinfo_send_row(Signal * signal,const DbinfoScanReq & req,const Ndbinfo::Row & row,Ndbinfo::Ratelimit & rl) const5204 SimulatedBlock::ndbinfo_send_row(Signal* signal,
5205                                  const DbinfoScanReq& req,
5206                                  const Ndbinfo::Row& row,
5207                                  Ndbinfo::Ratelimit& rl) const
5208 {
5209   // Check correct number of columns against table
5210   assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
5211 
5212   TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();
5213   tidai->connectPtr= req.resultData;
5214   tidai->transId[0]= req.transId[0];
5215   tidai->transId[1]= req.transId[1];
5216 
5217   LinearSectionPtr ptr[3];
5218   ptr[0].p = row.getDataPtr();
5219   ptr[0].sz = row.getLength();
5220 
5221   rl.rows++;
5222   rl.bytes += row.getLength();
5223 
5224   sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
5225              signal, TransIdAI::HeaderLength, JBB, ptr, 1);
5226 }
5227 
5228 
5229 void
ndbinfo_send_scan_break(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl,Uint32 data1,Uint32 data2,Uint32 data3,Uint32 data4) const5230 SimulatedBlock::ndbinfo_send_scan_break(Signal* signal,
5231                                        DbinfoScanReq& req,
5232                                        const Ndbinfo::Ratelimit& rl,
5233                                        Uint32 data1, Uint32 data2,
5234                                        Uint32 data3, Uint32 data4) const
5235 {
5236   DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
5237   const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
5238   MEMCOPY_NO_WORDS(conf, &req, signal_length);
5239 
5240   conf->returnedRows = rl.rows;
5241 
5242   // Update the cursor with current item number
5243   Ndbinfo::ScanCursor* cursor =
5244     (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
5245 
5246   cursor->data[0] = data1;
5247   cursor->data[1] = data2;
5248   cursor->data[2] = data3;
5249   cursor->data[3] = data4;
5250 
5251   // Increase number of rows and bytes sent to far
5252   cursor->totalRows += rl.rows;
5253   cursor->totalBytes += rl.bytes;
5254 
5255   Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
5256 
5257   sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
5258              signal_length, JBB);
5259 }
5260 
5261 void
ndbinfo_send_scan_conf(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl) const5262 SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal,
5263                                        DbinfoScanReq& req,
5264                                        const Ndbinfo::Ratelimit& rl) const
5265 {
5266   DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
5267   const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
5268   Uint32 sender_ref = req.resultRef;
5269   MEMCOPY_NO_WORDS(conf, &req, signal_length);
5270 
5271   conf->returnedRows = rl.rows;
5272 
5273   if (req.cursor_sz)
5274   {
5275     jam();
5276     // Update the cursor with current item number
5277     Ndbinfo::ScanCursor* cursor =
5278       (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
5279 
5280     // Reset all data holders
5281     memset(cursor->data, 0, sizeof(cursor->data));
5282 
5283     // Increase number of rows and bytes sent to far
5284     cursor->totalRows += rl.rows;
5285     cursor->totalBytes += rl.bytes;
5286 
5287     Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
5288 
5289     sender_ref = cursor->senderRef;
5290 
5291   }
5292   sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
5293              signal_length, JBB);
5294 }
5295 
init_elapsed_time(Signal * signal,NDB_TICKS & latestTIME_SIGNAL)5296 void SimulatedBlock::init_elapsed_time(Signal *signal,
5297                                        NDB_TICKS &latestTIME_SIGNAL)
5298 {
5299   const NDB_TICKS currentTime = NdbTick_getCurrentTicks();
5300   signal->theData[0] = Uint32(currentTime.getUint64() >> 32);
5301   signal->theData[1] = Uint32(currentTime.getUint64() & 0xFFFFFFFF);
5302   latestTIME_SIGNAL = currentTime;
5303   sendSignal(reference(), GSN_TIME_SIGNAL, signal, 2, JBB);
5304 }
5305 
sendTIME_SIGNAL(Signal * signal,const NDB_TICKS currentTime,Uint32 delay)5306 void SimulatedBlock::sendTIME_SIGNAL(Signal *signal,
5307                                      const NDB_TICKS currentTime,
5308                                      Uint32 delay)
5309 {
5310   signal->theData[0] = Uint32(currentTime.getUint64() >> 32);
5311   signal->theData[1] = Uint32(currentTime.getUint64() & 0xFFFFFFFF);
5312   sendSignalWithDelay(reference(), GSN_TIME_SIGNAL, signal, delay, 2);
5313 }
5314 
5315 /*
5316   This function is used to handle TIME_SIGNAL. This signal is intended to
5317   be used sort of like a drum beat. We should execute some timer calls
5318   every so often. However the OS can easily make the delayed signals to
5319   be delayed if the OS is occupied with other things. We will never report
5320   sleeps for longer than twice the expected delay. We rely on the delayed
5321   signal scheduler to ensure that we run time a bit faster for a while
5322   after long sleeps.
5323 
5324   This function will return the elapsed time since last time we called it.
5325 */
5326 Uint64
elapsed_time(Signal * signal,const NDB_TICKS currentTime,NDB_TICKS & latestTIME_SIGNAL,Uint32 expected_delay)5327 SimulatedBlock::elapsed_time(Signal *signal,
5328                              const NDB_TICKS currentTime,
5329                              NDB_TICKS &latestTIME_SIGNAL,
5330                              Uint32 expected_delay)
5331 {
5332   const Uint64 elapsed_time =
5333     NdbTick_Elapsed(latestTIME_SIGNAL, currentTime).milliSec();
5334   latestTIME_SIGNAL = currentTime;
5335 
5336   if (elapsed_time > Uint64(2 * expected_delay))
5337   {
5338     return Uint64(2 * expected_delay);
5339   }
5340   return elapsed_time;
5341 }
5342 
5343 #ifdef VM_TRACE
5344 void
assertOwnThread()5345 SimulatedBlock::assertOwnThread()
5346 {
5347 #ifdef NDBD_MULTITHREADED
5348   mt_assert_own_thread(this);
5349 #endif
5350 }
5351 
5352 #endif
5353 
5354 Uint32
get_recv_thread_idx(TrpId trp_id)5355 SimulatedBlock::get_recv_thread_idx(TrpId trp_id)
5356 {
5357 #ifdef NDBD_MULTITHREADED
5358   return mt_get_recv_thread_idx(trp_id);
5359 #else
5360   return 0;
5361 #endif
5362 }
5363 
5364 #ifndef NDBD_MULTITHREADED
5365 /**
5366  * Add a stub for this function since we have some code in ErrorReporter.cpp
5367  * that needs this function, it's only really needed for ndbmtd, so need an
5368  * empty function in ndbd.
5369  */
5370 void
prepare_to_crash(bool first_phase,bool error_insert_crash)5371 ErrorReporter::prepare_to_crash(bool first_phase, bool error_insert_crash)
5372 {
5373   (void)first_phase;
5374   (void)error_insert_crash;
5375 }
5376 #endif
5377 
5378 /**
5379  * Implementation of SegmentUtils
5380  * Here we forward the calls, but
5381  * in ndbmtd, add our thread cache
5382  * + lock function arguments.
5383  */
5384 
5385 SectionSegment*
getSegmentPtr(Uint32 iVal)5386 SimulatedBlock::getSegmentPtr(Uint32 iVal)
5387 {
5388   return g_sectionSegmentPool.getPtr(iVal);
5389 }
5390 
5391 bool
seizeSegment(Ptr<SectionSegment> & p)5392 SimulatedBlock::seizeSegment(Ptr<SectionSegment>& p)
5393 {
5394   return g_sectionSegmentPool.seize(SB_SP_REL_ARG p);
5395 }
5396 
5397 void
releaseSegment(Uint32 iVal)5398 SimulatedBlock::releaseSegment(Uint32 iVal)
5399 {
5400   g_sectionSegmentPool.release(SB_SP_REL_ARG iVal);
5401 }
5402 
5403 void
releaseSegmentList(Uint32 firstSegmentIVal)5404 SimulatedBlock::releaseSegmentList(Uint32 firstSegmentIVal)
5405 {
5406   ::releaseSection(SB_SP_ARG firstSegmentIVal);
5407 }
5408 
5409 #ifdef NDB_DEBUG_RES_OWNERSHIP
5410 void
lock_global_ssp()5411 SimulatedBlock::lock_global_ssp()
5412 {
5413 #ifdef NDBD_MULTITHREADED
5414   f_section_lock.lock();
5415 #endif
5416 }
5417 
5418 void
unlock_global_ssp()5419 SimulatedBlock::unlock_global_ssp()
5420 {
5421 #ifdef NDBD_MULTITHREADED
5422   f_section_lock.unlock();
5423 #endif
5424 }
5425 
5426 #endif
5427 
5428 
5429 
5430 /**
5431  * #undef is needed since this file is included by SimulatedBlock_nonmt.cpp
5432  * and SimulatedBlock_mt.cpp
5433  */
5434 #undef JAM_FILE_ID
5435