1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 /*
26   This file is used to build both the multithreaded and the singlethreaded
27   ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
28   the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
29   macro not defined).
30 */
31 
32 #include <ndb_global.h>
33 
34 #include "SimulatedBlock.hpp"
35 #include <NdbOut.hpp>
36 #include <OutputStream.hpp>
37 #include <GlobalData.hpp>
38 #include <Emulator.hpp>
39 #include <WatchDog.hpp>
40 #include <ErrorHandlingMacros.hpp>
41 #include <TimeQueue.hpp>
42 #include <TransporterRegistry.hpp>
43 #include <SignalLoggerManager.hpp>
44 #include <FastScheduler.hpp>
45 #include "ndbd_malloc.hpp"
46 #include <signaldata/EventReport.hpp>
47 #include <signaldata/ContinueFragmented.hpp>
48 #include <signaldata/NodeStateSignalData.hpp>
49 #include <signaldata/FsRef.hpp>
50 #include <signaldata/SignalDroppedRep.hpp>
51 #include <signaldata/LocalRouteOrd.hpp>
52 #include <signaldata/TransIdAI.hpp>
53 #include <signaldata/Sync.hpp>
54 #include <DebuggerNames.hpp>
55 #include "LongSignal.hpp"
56 
57 #include <Properties.hpp>
58 #include "Configuration.hpp"
59 #include <AttributeDescriptor.hpp>
60 #include <NdbSqlUtil.hpp>
61 
62 #include "../blocks/dbdih/Dbdih.hpp"
63 #include <signaldata/CallbackSignal.hpp>
64 #include "LongSignalImpl.hpp"
65 
66 #include "KeyDescriptor.hpp"
67 
68 #include <EventLogger.hpp>
69 
70 #define JAM_FILE_ID 252
71 
72 extern EventLogger * g_eventLogger;
73 
74 //
75 // Constructor, Destructor
76 //
SimulatedBlock(BlockNumber blockNumber,struct Block_context & ctx,Uint32 instanceNumber)77 SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
78 			       struct Block_context & ctx,
79                                Uint32 instanceNumber)
80   : theNodeId(globalData.ownId),
81     theNumber(blockNumber),
82     theInstance(instanceNumber),
83     theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
84     theInstanceList(0),
85     theMainInstance(0),
86     m_ctx(ctx),
87     m_global_page_pool(globalData.m_global_page_pool),
88     m_shared_page_pool(globalData.m_shared_page_pool),
89     c_fragmentInfoHash(c_fragmentInfoPool),
90     c_linearFragmentSendList(c_fragmentSendPool),
91     c_segmentedFragmentSendList(c_fragmentSendPool),
92     c_mutexMgr(* this),
93     c_counterMgr(* this)
94 #ifdef VM_TRACE
95     ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream())))
96 #endif
97 #ifdef VM_TRACE_TIME
98     ,m_currentGsn(0)
99 #endif
100 {
101   m_threadId = 0;
102   m_watchDogCounter = NULL;
103   m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
104   NewVarRef = 0;
105 
106   SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
107 
108   if (theInstance == 0) {
109     ndbrequire(mainBlock == 0);
110     mainBlock = this;
111     theMainInstance = mainBlock;
112     globalData.setBlock(blockNumber, mainBlock);
113     mainBlock->addInstance(this, theInstance);
114   } else {
115     ndbrequire(mainBlock != 0);
116     mainBlock->addInstance(this, theInstance);
117     theMainInstance = mainBlock;
118   }
119 
120   c_fragmentIdCounter = 1;
121   c_fragSenderRunning = false;
122 
123 #ifdef VM_TRACE_TIME
124   clearTimes();
125 #endif
126 
127   for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++)
128     theExecArray[i] = 0;
129 
130   installSimulatedBlockFunctions();
131 
132   m_callbackTableAddr = 0;
133 
134   CLEAR_ERROR_INSERT_VALUE;
135 
136 #ifdef VM_TRACE
137   m_global_variables = new Ptr<void> * [1];
138   m_global_variables[0] = 0;
139   m_global_variables_save = 0;
140 #endif
141 }
142 
143 void
addInstance(SimulatedBlock * b,Uint32 theInstance)144 SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
145 {
146   ndbrequire(theMainInstance == this);
147   ndbrequire(number() == b->number());
148   if (theInstanceList == 0)
149   {
150     theInstanceList = new SimulatedBlock* [MaxInstances];
151     ndbrequire(theInstanceList != 0);
152     for (Uint32 i = 0; i < MaxInstances; i++)
153       theInstanceList[i] = 0;
154   }
155   ndbrequire(theInstance < MaxInstances);
156   ndbrequire(theInstanceList[theInstance] == 0);
157   theInstanceList[theInstance] = b;
158 }
159 
160 void
initCommon()161 SimulatedBlock::initCommon()
162 {
163   Uint32 count = 10;
164   this->getParam("FragmentSendPool", &count);
165   c_fragmentSendPool.setSize(count);
166 
167   count = 10;
168   this->getParam("FragmentInfoPool", &count);
169   c_fragmentInfoPool.setSize(count);
170 
171   count = 10;
172   this->getParam("FragmentInfoHash", &count);
173   c_fragmentInfoHash.setSize(count);
174 
175   Uint32 def = 5;
176 #ifdef NDBD_MULTITHREADED
177   def += globalData.getBlockThreads();
178 #endif
179 
180   count = def;
181   this->getParam("ActiveMutexes", &count);
182   c_mutexMgr.setSize(count);
183 
184   count = def;
185   this->getParam("ActiveCounters", &count);
186   c_counterMgr.setSize(count);
187 
188   count = def;
189   this->getParam("ActiveThreadSync", &count);
190   c_syncThreadPool.setSize(count);
191 }
192 
~SimulatedBlock()193 SimulatedBlock::~SimulatedBlock()
194 {
195   freeBat();
196 #ifdef VM_TRACE_TIME
197   printTimes(stdout);
198 #endif
199 
200 #ifdef VM_TRACE
201   enable_global_variables();
202   delete [] m_global_variables;
203   m_global_variables = 0;
204 #endif
205 
206   if (theInstanceList != 0) {
207     Uint32 i;
208     for (i = 0; i < MaxInstances; i++)
209     {
210       if (theInstanceList[i] != this)
211       {
212         delete theInstanceList[i];
213       }
214     }
215     delete [] theInstanceList;
216   }
217   theInstanceList = 0;
218 }
219 
220 void
installSimulatedBlockFunctions()221 SimulatedBlock::installSimulatedBlockFunctions(){
222   ExecFunction * a = theExecArray;
223   a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
224   a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ;
225   a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
226   a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
227   a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
228   a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
229   a[GSN_UTIL_CREATE_LOCK_REF]   = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
230   a[GSN_UTIL_CREATE_LOCK_CONF]  = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
231   a[GSN_UTIL_DESTROY_LOCK_REF]  = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
232   a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
233   a[GSN_UTIL_LOCK_REF]    = &SimulatedBlock::execUTIL_LOCK_REF;
234   a[GSN_UTIL_LOCK_CONF]   = &SimulatedBlock::execUTIL_LOCK_CONF;
235   a[GSN_UTIL_UNLOCK_REF]  = &SimulatedBlock::execUTIL_UNLOCK_REF;
236   a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
237   a[GSN_FSOPENREF]    = &SimulatedBlock::execFSOPENREF;
238   a[GSN_FSCLOSEREF]   = &SimulatedBlock::execFSCLOSEREF;
239   a[GSN_FSWRITEREF]   = &SimulatedBlock::execFSWRITEREF;
240   a[GSN_FSREADREF]    = &SimulatedBlock::execFSREADREF;
241   a[GSN_FSREMOVEREF]  = &SimulatedBlock::execFSREMOVEREF;
242   a[GSN_FSSYNCREF]    = &SimulatedBlock::execFSSYNCREF;
243   a[GSN_FSAPPENDREF]  = &SimulatedBlock::execFSAPPENDREF;
244   a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
245   a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
246   a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
247   a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
248   a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
249   a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
250   a[GSN_LOCAL_ROUTE_ORD] = &SimulatedBlock::execLOCAL_ROUTE_ORD;
251   a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ;
252   a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ;
253   a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF;
254 }
255 
256 void
addRecSignalImpl(GlobalSignalNumber gsn,ExecFunction f,bool force)257 SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
258 				 ExecFunction f, bool force){
259   if(gsn > MAX_GSN || (!force &&  theExecArray[gsn] != 0)){
260     char errorMsg[255];
261     BaseString::snprintf(errorMsg, 255,
262  	     "GSN %d(%d))", gsn, MAX_GSN);
263     ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
264   }
265   theExecArray[gsn] = f;
266 }
267 
268 void
assignToThread(ThreadContext ctx)269 SimulatedBlock::assignToThread(ThreadContext ctx)
270 {
271   m_threadId = ctx.threadId;
272   m_jamBuffer = ctx.jamBuffer;
273   m_watchDogCounter = ctx.watchDogCounter;
274   m_sectionPoolCache = ctx.sectionPoolCache;
275 }
276 
277 Uint32
getInstanceKey(Uint32 tabId,Uint32 fragId)278 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
279 {
280   Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
281   Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
282   return instanceKey;
283 }
284 
285 Uint32
getInstanceFromKey(Uint32 instanceKey)286 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey)
287 {
288   Uint32 lqhWorkers = globalData.ndbMtLqhWorkers;
289   Uint32 instanceNo;
290   if (lqhWorkers == 0) {
291     instanceNo = 0;
292   } else {
293     assert(instanceKey != 0);
294     instanceNo = 1 + (instanceKey - 1) % lqhWorkers;
295   }
296   return instanceNo;
297 }
298 
299 void
signal_error(Uint32 gsn,Uint32 len,Uint32 recBlockNo,const char * filename,int lineno) const300 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
301 			     const char* filename, int lineno) const
302 {
303   char objRef[255];
304   BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno);
305   char probData[255];
306   BaseString::snprintf(probData, 255,
307 	   "Signal (GSN: %d, Length: %d, Rec Block No: %d)",
308 	   gsn, len, recBlockNo);
309 
310   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
311 			     probData,
312 			     objRef);
313 }
314 
315 
316 extern class SectionSegmentPool g_sectionSegmentPool;
317 
318 void
handle_invalid_sections_in_send_signal(const Signal * signal) const319 SimulatedBlock::handle_invalid_sections_in_send_signal(const Signal* signal)
320 const
321 {
322   char errMsg[160];
323   BaseString::snprintf(errMsg, sizeof errMsg,
324                        "Unhandled sections in sendSignal for GSN %u (%s).",
325                        signal->header.theVerId_signalNumber,
326                        getSignalName(signal->header.theVerId_signalNumber));
327   // Print message and terminate.
328   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
329                              errMsg,
330                              "");
331 }
332 
333 void
handle_lingering_sections_after_execute(const Signal * signal) const334 SimulatedBlock::handle_lingering_sections_after_execute(const Signal* signal)
335 const
336 {
337   char errMsg[160];
338   BaseString::snprintf(errMsg, sizeof errMsg,
339                       "Unhandled sections after execute for GSN %u (%s).",
340                       signal->header.theVerId_signalNumber,
341                       getSignalName(signal->header.theVerId_signalNumber));
342   // Print message and terminate.
343   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
344                              errMsg,
345                              "");
346 }
347 
348 void
handle_invalid_fragmentInfo(Signal * signal) const349 SimulatedBlock::handle_invalid_fragmentInfo(Signal* signal) const
350 {
351 #if defined VM_TRACE || defined ERROR_INSERT
352   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
353                              "Incorrect header->m_fragmentInfo in sendSignal()",
354                              "");
355 #else
356   signal->header.m_fragmentInfo = 0;
357   infoEvent("Incorrect header->m_fragmentInfo in sendSignal");
358 #endif
359 }
360 
361 void
handle_out_of_longsignal_memory(Signal * signal) const362 SimulatedBlock::handle_out_of_longsignal_memory(Signal * signal) const
363 {
364   ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
365 			     "Out of LongMessageBuffer in sendSignal",
366 			     "");
367 }
368 
369 void
handle_send_failed(SendStatus ss,Signal * signal) const370 SimulatedBlock::handle_send_failed(SendStatus ss, Signal * signal) const
371 {
372   switch(ss){
373   case SEND_BUFFER_FULL:
374     ErrorReporter::handleError(NDBD_EXIT_GENERIC,
375                                "Out of SendBufferMemory in sendSignal", "");
376     break;
377   case SEND_MESSAGE_TOO_BIG:
378     ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
379                                "Message to big in sendSignal", "");
380     break;
381   case SEND_UNKNOWN_NODE:
382     ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
383                                "Unknown node in sendSignal", "");
384     break;
385   case SEND_OK:
386   case SEND_BLOCKED:
387   case SEND_DISCONNECTED:
388     break;
389   }
390   ndbrequire(false);
391 }
392 
393 static void
linkSegments(Uint32 head,Uint32 tail)394 linkSegments(Uint32 head, Uint32 tail){
395 
396   Ptr<SectionSegment> headPtr;
397   g_sectionSegmentPool.getPtr(headPtr, head);
398 
399   Ptr<SectionSegment> tailPtr;
400   g_sectionSegmentPool.getPtr(tailPtr, tail);
401 
402   Ptr<SectionSegment> oldTailPtr;
403   g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
404 
405   /* Can only efficiently link segments if linking to the end of a
406    * multiple-of-segment-size sized chunk
407    */
408   if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0)
409   {
410 #if defined VM_TRACE || defined ERROR_INSERT
411     ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
412                                "Bad head segment size",
413                                "");
414 #else
415     ndbout_c("linkSegments : Bad head segment size");
416 #endif
417   }
418 
419   headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
420   headPtr.p->m_sz += tailPtr.p->m_sz;
421 
422   oldTailPtr.p->m_nextSegment = tailPtr.i;
423 }
424 
425 void
getSections(Uint32 secCount,SegmentedSectionPtr ptr[3])426 getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
427   Uint32 tSec0 = ptr[0].i;
428   Uint32 tSec1 = ptr[1].i;
429   Uint32 tSec2 = ptr[2].i;
430   SectionSegment * p;
431   switch(secCount){
432   case 3:
433     p = g_sectionSegmentPool.getPtr(tSec2);
434     ptr[2].p = p;
435     ptr[2].sz = p->m_sz;
436   case 2:
437     p = g_sectionSegmentPool.getPtr(tSec1);
438     ptr[1].p = p;
439     ptr[1].sz = p->m_sz;
440   case 1:
441     p = g_sectionSegmentPool.getPtr(tSec0);
442     ptr[0].p = p;
443     ptr[0].sz = p->m_sz;
444   case 0:
445     return;
446   }
447   char msg[40];
448   sprintf(msg, "secCount=%d", secCount);
449   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
450 }
451 
452 void
getSection(SegmentedSectionPtr & ptr,Uint32 i)453 getSection(SegmentedSectionPtr & ptr, Uint32 i){
454   ptr.i = i;
455   SectionSegment * p = g_sectionSegmentPool.getPtr(i);
456   ptr.p = p;
457   ptr.sz = p->m_sz;
458 }
459 
getSectionSz(Uint32 id)460 Uint32 getSectionSz(Uint32 id)
461 {
462   return g_sectionSegmentPool.getPtr(id)->m_sz;
463 }
464 
getLastWordPtr(Uint32 id)465 Uint32* getLastWordPtr(Uint32 id)
466 {
467   SectionSegment* first= g_sectionSegmentPool.getPtr(id);
468   SectionSegment* last= g_sectionSegmentPool.getPtr(first->m_lastSegment);
469   Uint32 offset= (first->m_sz -1) % SectionSegment::DataLength;
470   return &last->theData[offset];
471 }
472 
473 #ifdef NDBD_MULTITHREADED
474 #define SB_SP_ARG *m_sectionPoolCache,
475 #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache,
476 #else
477 #define SB_SP_ARG
478 #define SB_SP_REL_ARG
479 #endif
480 
481 static
482 void
releaseSections(SPC_ARG Uint32 secCount,SegmentedSectionPtr ptr[3])483 releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){
484   Uint32 tSec0 = ptr[0].i;
485   Uint32 tSz0 = ptr[0].sz;
486   Uint32 tSec1 = ptr[1].i;
487   Uint32 tSz1 = ptr[1].sz;
488   Uint32 tSec2 = ptr[2].i;
489   Uint32 tSz2 = ptr[2].sz;
490   switch(secCount){
491   case 3:
492     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
493                                      relSz(tSz2), tSec2,
494 				     ptr[2].p->m_lastSegment);
495   case 2:
496     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
497                                      relSz(tSz1), tSec1,
498 				     ptr[1].p->m_lastSegment);
499   case 1:
500     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
501                                      relSz(tSz0), tSec0,
502 				     ptr[0].p->m_lastSegment);
503   case 0:
504     return;
505   }
506   char msg[40];
507   sprintf(msg, "secCount=%d", secCount);
508   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
509 }
510 
511 void
getSendBufferLevel(NodeId node,SB_LevelType & level)512 SimulatedBlock::getSendBufferLevel(NodeId node, SB_LevelType &level)
513 {
514 #ifdef NDBD_MULTITHREADED
515   mt_getSendBufferLevel(m_threadId, node, level);
516 #else
517   globalTransporterRegistry.getSendBufferLevel(node, level);
518 #endif
519 }
520 
521 Uint32
getSignalsInJBB()522 SimulatedBlock::getSignalsInJBB()
523 {
524   Uint32 num_signals;
525 #ifdef NDBD_MULTITHREADED
526   num_signals = mt_getSignalsInJBB(m_threadId);
527 #else
528   num_signals = globalScheduler.getBOccupancy();
529 #endif
530   return num_signals;
531 }
532 
533 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const534 SimulatedBlock::sendSignal(BlockReference ref,
535 			   GlobalSignalNumber gsn,
536 			   Signal* signal,
537 			   Uint32 length,
538 			   JobBufferLevel jobBuffer) const {
539 
540   BlockReference sendBRef = reference();
541 
542   Uint32 recBlock = refToBlock(ref);
543   Uint32 recNode   = refToNode(ref);
544   Uint32 ourProcessor         = globalData.ownId;
545 
546   check_sections(signal, signal->header.m_noOfSections, 0);
547 
548   signal->header.theLength = length;
549   signal->header.theVerId_signalNumber = gsn;
550   signal->header.theReceiversBlockNumber = recBlock;
551   signal->header.m_noOfSections = 0;
552 
553   Uint32 tSignalId = signal->header.theSignalId;
554 
555   if ((length == 0) || length > 25 || (recBlock == 0)) {
556     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
557     return;
558   }//if
559 #ifdef VM_TRACE
560   if(globalData.testOn){
561     Uint16 proc =
562       (recNode == 0 ? globalData.ownId : recNode);
563     signal->header.theSendersBlockRef = sendBRef;
564     globalSignalLoggers.sendSignal(signal->header,
565 				   jobBuffer,
566 				   &signal->theData[0],
567 				   proc);
568   }
569 #endif
570 
571   if(recNode == ourProcessor || recNode == 0) {
572     signal->header.theSendersSignalId = tSignalId;
573     signal->header.theSendersBlockRef = sendBRef;
574 #ifdef NDBD_MULTITHREADED
575     if (jobBuffer == JBB)
576       sendlocal(m_threadId, &signal->header, signal->theData, NULL);
577     else
578       sendprioa(m_threadId, &signal->header, signal->theData, NULL);
579 #else
580     globalScheduler.execute(signal, jobBuffer, recBlock,
581 			    gsn);
582 #endif
583     return;
584   } else {
585     // send distributed Signal
586     SignalHeader sh;
587 
588     Uint32 tTrace = signal->getTrace();
589 
590     sh.theVerId_signalNumber   = gsn;
591     sh.theReceiversBlockNumber = recBlock;
592     sh.theSendersBlockRef      = refToBlock(sendBRef);
593     sh.theLength               = length;
594     sh.theTrace                = tTrace;
595     sh.theSignalId             = tSignalId;
596     sh.m_noOfSections          = 0;
597     sh.m_fragmentInfo          = 0;
598 
599 #ifdef TRACE_DISTRIBUTED
600     ndbout_c("send: %s(%d) to (%s, %d)",
601 	     getSignalName(gsn), gsn, getBlockName(recBlock),
602 	     recNode);
603 #endif
604 
605     SendStatus ss;
606 #ifdef NDBD_MULTITHREADED
607     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
608                         recNode, 0);
609 #else
610     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
611                                                &signal->theData[0], recNode,
612                                                (LinearSectionPtr*)0);
613 #endif
614 
615     if (unlikely(! (ss == SEND_OK ||
616                     ss == SEND_BLOCKED ||
617                     ss == SEND_DISCONNECTED)))
618     {
619       handle_send_failed(ss, signal);
620     }
621   }
622   return;
623 }
624 
625 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const626 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
627 			   GlobalSignalNumber gsn,
628 			   Signal* signal,
629 			   Uint32 length,
630 			   JobBufferLevel jobBuffer) const {
631 
632   Uint32 noOfSections = signal->header.m_noOfSections;
633   Uint32 tSignalId = signal->header.theSignalId;
634   Uint32 tTrace = signal->getTrace();
635 
636   Uint32 ourProcessor = globalData.ownId;
637   Uint32 recBlock = rg.m_block;
638 
639   signal->header.theLength = length;
640   signal->header.theVerId_signalNumber = gsn;
641   signal->header.theReceiversBlockNumber = recBlock;
642   signal->header.theSendersSignalId = tSignalId;
643   signal->header.theSendersBlockRef = reference();
644   signal->header.m_noOfSections = 0;
645 
646   check_sections(signal, noOfSections, 0);
647 
648   if ((length == 0) || (length > 25) || (recBlock == 0)) {
649     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
650     return;
651   }//if
652 
653   SignalHeader sh;
654 
655   sh.theVerId_signalNumber   = gsn;
656   sh.theReceiversBlockNumber = recBlock;
657   sh.theSendersBlockRef      = refToBlock(reference());
658   sh.theLength               = length;
659   sh.theTrace                = tTrace;
660   sh.theSignalId             = tSignalId;
661   sh.m_noOfSections          = 0;
662   sh.m_fragmentInfo          = 0;
663 
664   /**
665    * Check own node
666    */
667   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
668 #ifdef VM_TRACE
669     if(globalData.testOn){
670       globalSignalLoggers.sendSignal(signal->header,
671 				     jobBuffer,
672 				     &signal->theData[0],
673 				     ourProcessor);
674     }
675 #endif
676 
677 #ifdef NDBD_MULTITHREADED
678     if (jobBuffer == JBB)
679       sendlocal(m_threadId, &signal->header, signal->theData, NULL);
680     else
681       sendprioa(m_threadId, &signal->header, signal->theData, NULL);
682 #else
683     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
684 #endif
685 
686     rg.m_nodes.clear((Uint32)0);
687     rg.m_nodes.clear(ourProcessor);
688   }
689 
690   /**
691    * Do the big loop
692    */
693   Uint32 recNode = 0;
694   while(!rg.m_nodes.isclear()){
695     recNode = rg.m_nodes.find(recNode + 1);
696     rg.m_nodes.clear(recNode);
697 #ifdef VM_TRACE
698     if(globalData.testOn){
699       globalSignalLoggers.sendSignal(signal->header,
700 				     jobBuffer,
701 				     &signal->theData[0],
702 				     recNode);
703     }
704 #endif
705 
706 #ifdef TRACE_DISTRIBUTED
707     ndbout_c("send: %s(%d) to (%s, %d)",
708 	     getSignalName(gsn), gsn, getBlockName(recBlock),
709 	     recNode);
710 #endif
711 
712     SendStatus ss;
713 #ifdef NDBD_MULTITHREADED
714     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
715                         recNode, 0);
716 #else
717     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
718                                                &signal->theData[0], recNode,
719                                                (LinearSectionPtr*)0);
720 #endif
721 
722     if (unlikely(! (ss == SEND_OK ||
723                     ss == SEND_BLOCKED ||
724                     ss == SEND_DISCONNECTED)))
725     {
726       handle_send_failed(ss, signal);
727     }
728   }
729 
730   return;
731 }
732 
733 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
734 
735 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const736 SimulatedBlock::sendSignal(BlockReference ref,
737 			   GlobalSignalNumber gsn,
738 			   Signal* signal,
739 			   Uint32 length,
740 			   JobBufferLevel jobBuffer,
741 			   LinearSectionPtr ptr[3],
742 			   Uint32 noOfSections) const {
743 
744   BlockReference sendBRef = reference();
745 
746   Uint32 recBlock = refToBlock(ref);
747   Uint32 recNode   = refToNode(ref);
748   Uint32 ourProcessor         = globalData.ownId;
749 
750   check_sections(signal, signal->header.m_noOfSections, noOfSections);
751 
752   signal->header.theLength = length;
753   signal->header.theVerId_signalNumber = gsn;
754   signal->header.theReceiversBlockNumber = recBlock;
755   signal->header.m_noOfSections = noOfSections;
756 
757   Uint32 tSignalId = signal->header.theSignalId;
758   Uint32 tFragInfo = signal->header.m_fragmentInfo;
759 
760   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
761     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
762     return;
763   }//if
764 #ifdef VM_TRACE
765   if(globalData.testOn){
766     Uint16 proc =
767       (recNode == 0 ? globalData.ownId : recNode);
768     signal->header.theSendersBlockRef = sendBRef;
769     globalSignalLoggers.sendSignal(signal->header,
770 				   jobBuffer,
771 				   &signal->theData[0],
772 				   proc,
773                                    ptr, noOfSections);
774   }
775 #endif
776 
777   if(recNode == ourProcessor || recNode == 0) {
778     signal->header.theSendersSignalId = tSignalId;
779     signal->header.theSendersBlockRef = sendBRef;
780 
781     /**
782      * We have to copy the data
783      */
784     bool ok = true;
785     Ptr<SectionSegment> segptr[3];
786     for(Uint32 i = 0; i<noOfSections; i++){
787       ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
788       signal->theData[length+i] = segptr[i].i;
789     }
790 
791     if (unlikely(! ok))
792     {
793       handle_out_of_longsignal_memory(signal);
794     }
795 
796 #ifdef NDBD_MULTITHREADED
797     if (jobBuffer == JBB)
798       sendlocal(m_threadId, &signal->header, signal->theData,
799                 signal->theData+length);
800     else
801       sendprioa(m_threadId, &signal->header, signal->theData,
802                 signal->theData+length);
803 #else
804     globalScheduler.execute(signal, jobBuffer, recBlock,
805 			    gsn);
806 #endif
807     signal->header.m_noOfSections = 0;
808     return;
809   } else {
810     // send distributed Signal
811     SignalHeader sh;
812 
813     Uint32 tTrace = signal->getTrace();
814     Uint32 noOfSections = signal->header.m_noOfSections;
815 
816     sh.theVerId_signalNumber   = gsn;
817     sh.theReceiversBlockNumber = recBlock;
818     sh.theSendersBlockRef      = refToBlock(sendBRef);
819     sh.theLength               = length;
820     sh.theTrace                = tTrace;
821     sh.theSignalId             = tSignalId;
822     sh.m_noOfSections          = noOfSections;
823     sh.m_fragmentInfo          = tFragInfo;
824 
825 #ifdef TRACE_DISTRIBUTED
826     ndbout_c("send: %s(%d) to (%s, %d)",
827 	     getSignalName(gsn), gsn, getBlockName(recBlock),
828 	     recNode);
829 #endif
830 
831     SendStatus ss;
832 #ifdef NDBD_MULTITHREADED
833     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
834                         recNode, ptr);
835 #else
836     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
837                                                &signal->theData[0], recNode,
838                                                ptr);
839 #endif
840 
841     if (unlikely(! (ss == SEND_OK ||
842                     ss == SEND_BLOCKED ||
843                     ss == SEND_DISCONNECTED)))
844     {
845       handle_send_failed(ss, signal);
846     }
847   }
848 
849   signal->header.m_noOfSections = 0;
850   signal->header.m_fragmentInfo = 0;
851   return;
852 }
853 
854 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const855 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
856 			   GlobalSignalNumber gsn,
857 			   Signal* signal,
858 			   Uint32 length,
859 			   JobBufferLevel jobBuffer,
860 			   LinearSectionPtr ptr[3],
861 			   Uint32 noOfSections) const {
862 
863   Uint32 tSignalId = signal->header.theSignalId;
864   Uint32 tTrace    = signal->getTrace();
865   Uint32 tFragInfo = signal->header.m_fragmentInfo;
866 
867   Uint32 ourProcessor = globalData.ownId;
868   Uint32 recBlock = rg.m_block;
869 
870   check_sections(signal, signal->header.m_noOfSections, noOfSections);
871 
872   signal->header.theLength = length;
873   signal->header.theVerId_signalNumber = gsn;
874   signal->header.theReceiversBlockNumber = recBlock;
875   signal->header.theSendersSignalId = tSignalId;
876   signal->header.theSendersBlockRef = reference();
877   signal->header.m_noOfSections = noOfSections;
878 
879   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
880     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
881     return;
882   }//if
883 
884   SignalHeader sh;
885   sh.theVerId_signalNumber   = gsn;
886   sh.theReceiversBlockNumber = recBlock;
887   sh.theSendersBlockRef      = refToBlock(reference());
888   sh.theLength               = length;
889   sh.theTrace                = tTrace;
890   sh.theSignalId             = tSignalId;
891   sh.m_noOfSections          = noOfSections;
892   sh.m_fragmentInfo          = tFragInfo;
893 
894   /**
895    * Check own node
896    */
897   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
898 #ifdef VM_TRACE
899     if(globalData.testOn){
900       globalSignalLoggers.sendSignal(signal->header,
901 				     jobBuffer,
902 				     &signal->theData[0],
903 				     ourProcessor,
904                                      ptr, noOfSections);
905     }
906 #endif
907     /**
908      * We have to copy the data
909      */
910     bool ok = true;
911     Ptr<SectionSegment> segptr[3];
912     for(Uint32 i = 0; i<noOfSections; i++){
913       ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
914       signal->theData[length+i] = segptr[i].i;
915     }
916 
917     if (unlikely(! ok))
918     {
919       handle_out_of_longsignal_memory(signal);
920     }
921 
922 #ifdef NDBD_MULTITHREADED
923     if (jobBuffer == JBB)
924       sendlocal(m_threadId, &signal->header, signal->theData,
925                 signal->theData + length);
926     else
927       sendprioa(m_threadId, &signal->header, signal->theData,
928                 signal->theData + length);
929 #else
930     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
931 #endif
932 
933     rg.m_nodes.clear((Uint32)0);
934     rg.m_nodes.clear(ourProcessor);
935   }
936 
937   /**
938    * Do the big loop
939    */
940   Uint32 recNode = 0;
941   while(!rg.m_nodes.isclear()){
942     recNode = rg.m_nodes.find(recNode + 1);
943     rg.m_nodes.clear(recNode);
944 
945 #ifdef VM_TRACE
946     if(globalData.testOn){
947       globalSignalLoggers.sendSignal(signal->header,
948 				     jobBuffer,
949 				     &signal->theData[0],
950 				     recNode,
951                                      ptr, noOfSections);
952     }
953 #endif
954 
955 #ifdef TRACE_DISTRIBUTED
956     ndbout_c("send: %s(%d) to (%s, %d)",
957 	     getSignalName(gsn), gsn, getBlockName(recBlock),
958 	     recNode);
959 #endif
960 
961     SendStatus ss;
962 #ifdef NDBD_MULTITHREADED
963     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
964                         recNode, ptr);
965 #else
966     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
967                                                &signal->theData[0], recNode,
968                                                ptr);
969 #endif
970 
971     if (unlikely(! (ss == SEND_OK ||
972                     ss == SEND_BLOCKED ||
973                     ss == SEND_DISCONNECTED)))
974     {
975       handle_send_failed(ss, signal);
976     }
977   }
978 
979   signal->header.m_noOfSections = 0;
980   signal->header.m_fragmentInfo = 0;
981 
982   return;
983 }
984 
985 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const986 SimulatedBlock::sendSignal(BlockReference ref,
987 			   GlobalSignalNumber gsn,
988 			   Signal* signal,
989 			   Uint32 length,
990 			   JobBufferLevel jobBuffer,
991 			   SectionHandle* sections) const {
992 
993   Uint32 noOfSections = sections->m_cnt;
994   BlockReference sendBRef = reference();
995 
996   Uint32 recBlock = refToBlock(ref);
997   Uint32 recNode   = refToNode(ref);
998   Uint32 ourProcessor         = globalData.ownId;
999 
1000   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1001 
1002   signal->header.theLength = length;
1003   signal->header.theVerId_signalNumber = gsn;
1004   signal->header.theReceiversBlockNumber = recBlock;
1005   signal->header.m_noOfSections = noOfSections;
1006 
1007   Uint32 tSignalId = signal->header.theSignalId;
1008   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1009 
1010   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1011     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1012     return;
1013   }//if
1014 #ifdef VM_TRACE
1015   if(globalData.testOn){
1016     Uint16 proc =
1017       (recNode == 0 ? globalData.ownId : recNode);
1018     signal->header.theSendersBlockRef = sendBRef;
1019     globalSignalLoggers.sendSignal(signal->header,
1020 				   jobBuffer,
1021 				   &signal->theData[0],
1022 				   proc,
1023                                    sections->m_ptr, noOfSections);
1024   }
1025 #endif
1026 
1027   if(recNode == ourProcessor || recNode == 0) {
1028     signal->header.theSendersSignalId = tSignalId;
1029     signal->header.theSendersBlockRef = sendBRef;
1030 
1031     /**
1032      * We have to copy the data
1033      */
1034     Uint32 * dst = signal->theData + length;
1035     * dst ++ = sections->m_ptr[0].i;
1036     * dst ++ = sections->m_ptr[1].i;
1037     * dst ++ = sections->m_ptr[2].i;
1038 
1039 #ifdef NDBD_MULTITHREADED
1040     if (jobBuffer == JBB)
1041       sendlocal(m_threadId, &signal->header, signal->theData,
1042                 signal->theData + length);
1043     else
1044       sendprioa(m_threadId, &signal->header, signal->theData,
1045                 signal->theData + length);
1046 #else
1047     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1048 #endif
1049   } else {
1050     // send distributed Signal
1051     SignalHeader sh;
1052 
1053     Uint32 tTrace = signal->getTrace();
1054 
1055     sh.theVerId_signalNumber   = gsn;
1056     sh.theReceiversBlockNumber = recBlock;
1057     sh.theSendersBlockRef      = refToBlock(sendBRef);
1058     sh.theLength               = length;
1059     sh.theTrace                = tTrace;
1060     sh.theSignalId             = tSignalId;
1061     sh.m_noOfSections          = noOfSections;
1062     sh.m_fragmentInfo          = tFragInfo;
1063 
1064 #ifdef TRACE_DISTRIBUTED
1065     ndbout_c("send: %s(%d) to (%s, %d)",
1066 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1067 	     recNode);
1068 #endif
1069 
1070     SendStatus ss;
1071 #ifdef NDBD_MULTITHREADED
1072     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1073                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1074 #else
1075     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1076                                                &signal->theData[0], recNode,
1077                                                g_sectionSegmentPool,
1078                                                sections->m_ptr);
1079 #endif
1080 
1081     if (unlikely(! (ss == SEND_OK ||
1082                     ss == SEND_BLOCKED ||
1083                     ss == SEND_DISCONNECTED)))
1084     {
1085       handle_send_failed(ss, signal);
1086     }
1087 
1088     ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1089   }
1090 
1091   signal->header.m_noOfSections = 0;
1092   signal->header.m_fragmentInfo = 0;
1093   sections->m_cnt = 0;
1094   return;
1095 }
1096 
1097 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1098 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1099 			   GlobalSignalNumber gsn,
1100 			   Signal* signal,
1101 			   Uint32 length,
1102 			   JobBufferLevel jobBuffer,
1103 			   SectionHandle * sections) const {
1104 
1105   Uint32 noOfSections = sections->m_cnt;
1106   Uint32 tSignalId = signal->header.theSignalId;
1107   Uint32 tTrace    = signal->getTrace();
1108   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1109 
1110   Uint32 ourProcessor = globalData.ownId;
1111   Uint32 recBlock = rg.m_block;
1112 
1113   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1114 
1115   signal->header.theLength = length;
1116   signal->header.theVerId_signalNumber = gsn;
1117   signal->header.theReceiversBlockNumber = recBlock;
1118   signal->header.theSendersSignalId = tSignalId;
1119   signal->header.theSendersBlockRef = reference();
1120   signal->header.m_noOfSections = noOfSections;
1121 
1122   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1123     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1124     return;
1125   }//if
1126 
1127   SignalHeader sh;
1128   sh.theVerId_signalNumber   = gsn;
1129   sh.theReceiversBlockNumber = recBlock;
1130   sh.theSendersBlockRef      = refToBlock(reference());
1131   sh.theLength               = length;
1132   sh.theTrace                = tTrace;
1133   sh.theSignalId             = tSignalId;
1134   sh.m_noOfSections          = noOfSections;
1135   sh.m_fragmentInfo          = tFragInfo;
1136 
1137   /**
1138    * Check own node
1139    */
1140   bool release = true;
1141   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1142   {
1143     release = false;
1144 #ifdef VM_TRACE
1145     if(globalData.testOn){
1146       globalSignalLoggers.sendSignal(signal->header,
1147 				     jobBuffer,
1148 				     &signal->theData[0],
1149 				     ourProcessor,
1150                                      sections->m_ptr, noOfSections);
1151     }
1152 #endif
1153     /**
1154      * We have to copy the data
1155      */
1156     Uint32 * dst = signal->theData + length;
1157     * dst ++ = sections->m_ptr[0].i;
1158     * dst ++ = sections->m_ptr[1].i;
1159     * dst ++ = sections->m_ptr[2].i;
1160 #ifdef NDBD_MULTITHREADED
1161     if (jobBuffer == JBB)
1162       sendlocal(m_threadId, &signal->header, signal->theData,
1163                 signal->theData + length);
1164     else
1165       sendprioa(m_threadId, &signal->header, signal->theData,
1166                 signal->theData + length);
1167 #else
1168     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1169 #endif
1170 
1171     rg.m_nodes.clear((Uint32)0);
1172     rg.m_nodes.clear(ourProcessor);
1173   }
1174 
1175   /**
1176    * Do the big loop
1177    */
1178   Uint32 recNode = 0;
1179   while(!rg.m_nodes.isclear()){
1180     recNode = rg.m_nodes.find(recNode + 1);
1181     rg.m_nodes.clear(recNode);
1182 
1183 #ifdef VM_TRACE
1184     if(globalData.testOn){
1185       globalSignalLoggers.sendSignal(signal->header,
1186 				     jobBuffer,
1187 				     &signal->theData[0],
1188 				     recNode,
1189                                      sections->m_ptr, noOfSections);
1190     }
1191 #endif
1192 
1193 #ifdef TRACE_DISTRIBUTED
1194     ndbout_c("send: %s(%d) to (%s, %d)",
1195 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1196 	     recNode);
1197 #endif
1198 
1199     SendStatus ss;
1200 #ifdef NDBD_MULTITHREADED
1201     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1202                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1203 #else
1204     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1205                                                &signal->theData[0], recNode,
1206                                                g_sectionSegmentPool,
1207                                                sections->m_ptr);
1208 #endif
1209 
1210     if (unlikely(! (ss == SEND_OK ||
1211                     ss == SEND_BLOCKED ||
1212                     ss == SEND_DISCONNECTED)))
1213     {
1214       handle_send_failed(ss, signal);
1215     }
1216   }
1217 
1218   if (release)
1219   {
1220     ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1221   }
1222 
1223   sections->m_cnt = 0;
1224   signal->header.m_noOfSections = 0;
1225   signal->header.m_fragmentInfo = 0;
1226 
1227   return;
1228 }
1229 
1230 void
sendSignalNoRelease(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1231 SimulatedBlock::sendSignalNoRelease(BlockReference ref,
1232                                     GlobalSignalNumber gsn,
1233                                     Signal* signal,
1234                                     Uint32 length,
1235                                     JobBufferLevel jobBuffer,
1236                                     SectionHandle* sections) const {
1237 
1238   /**
1239    * Implementation the same as sendSignal(), except that
1240    * the sections are duplicated when sending locally, and
1241    * not released
1242    */
1243 
1244   Uint32 noOfSections = sections->m_cnt;
1245   BlockReference sendBRef = reference();
1246 
1247   Uint32 recBlock = refToBlock(ref);
1248   Uint32 recNode   = refToNode(ref);
1249   Uint32 ourProcessor         = globalData.ownId;
1250 
1251   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1252 
1253   signal->header.theLength = length;
1254   signal->header.theVerId_signalNumber = gsn;
1255   signal->header.theReceiversBlockNumber = recBlock;
1256   signal->header.m_noOfSections = noOfSections;
1257 
1258   Uint32 tSignalId = signal->header.theSignalId;
1259   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1260 
1261   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1262     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1263     return;
1264   }//if
1265 #ifdef VM_TRACE
1266   if(globalData.testOn){
1267     Uint16 proc =
1268       (recNode == 0 ? globalData.ownId : recNode);
1269     signal->header.theSendersBlockRef = sendBRef;
1270     globalSignalLoggers.sendSignal(signal->header,
1271 				   jobBuffer,
1272 				   &signal->theData[0],
1273 				   proc,
1274                                    sections->m_ptr, noOfSections);
1275   }
1276 #endif
1277 
1278   if(recNode == ourProcessor || recNode == 0) {
1279     signal->header.theSendersSignalId = tSignalId;
1280     signal->header.theSendersBlockRef = sendBRef;
1281 
1282     Uint32 * dst = signal->theData + length;
1283 
1284     /* We need to copy the segmented section data into separate
1285      * sections when sending locally and keeping a copy ourselves
1286      */
1287     for (Uint32 sec=0; sec < noOfSections; sec++)
1288     {
1289       Uint32 secCopy;
1290       if (unlikely(! ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i)))
1291       {
1292         handle_out_of_longsignal_memory(signal);
1293         return;
1294       }
1295       * dst ++ = secCopy;
1296     }
1297 
1298 #ifdef NDBD_MULTITHREADED
1299     if (jobBuffer == JBB)
1300       sendlocal(m_threadId, &signal->header, signal->theData,
1301                 signal->theData + length);
1302     else
1303       sendprioa(m_threadId, &signal->header, signal->theData,
1304                 signal->theData + length);
1305 #else
1306     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1307 #endif
1308   } else {
1309     // send distributed Signal
1310     SignalHeader sh;
1311 
1312     Uint32 tTrace = signal->getTrace();
1313 
1314     sh.theVerId_signalNumber   = gsn;
1315     sh.theReceiversBlockNumber = recBlock;
1316     sh.theSendersBlockRef      = refToBlock(sendBRef);
1317     sh.theLength               = length;
1318     sh.theTrace                = tTrace;
1319     sh.theSignalId             = tSignalId;
1320     sh.m_noOfSections          = noOfSections;
1321     sh.m_fragmentInfo          = tFragInfo;
1322 
1323 #ifdef TRACE_DISTRIBUTED
1324     ndbout_c("send: %s(%d) to (%s, %d)",
1325 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1326 	     recNode);
1327 #endif
1328 
1329     SendStatus ss;
1330 #ifdef NDBD_MULTITHREADED
1331     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1332                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1333 #else
1334     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1335                                                &signal->theData[0], recNode,
1336                                                g_sectionSegmentPool,
1337                                                sections->m_ptr);
1338 #endif
1339 
1340     if (unlikely(! (ss == SEND_OK ||
1341                     ss == SEND_BLOCKED ||
1342                     ss == SEND_DISCONNECTED)))
1343     {
1344       handle_send_failed(ss, signal);
1345     }
1346   }
1347 
1348   signal->header.m_noOfSections = 0;
1349   signal->header.m_fragmentInfo = 0;
1350   return;
1351 }
1352 
1353 void
sendSignalNoRelease(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1354 SimulatedBlock::sendSignalNoRelease(NodeReceiverGroup rg,
1355                                     GlobalSignalNumber gsn,
1356                                     Signal* signal,
1357                                     Uint32 length,
1358                                     JobBufferLevel jobBuffer,
1359                                     SectionHandle * sections) const {
1360   /**
1361    * Implementation the same as sendSignal(), except that
1362    * the sections are duplicated when sending locally, and
1363    * not released
1364    */
1365 
1366   Uint32 noOfSections = sections->m_cnt;
1367   Uint32 tSignalId = signal->header.theSignalId;
1368   Uint32 tTrace    = signal->getTrace();
1369   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1370 
1371   Uint32 ourProcessor = globalData.ownId;
1372   Uint32 recBlock = rg.m_block;
1373 
1374   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1375 
1376   signal->header.theLength = length;
1377   signal->header.theVerId_signalNumber = gsn;
1378   signal->header.theReceiversBlockNumber = recBlock;
1379   signal->header.theSendersSignalId = tSignalId;
1380   signal->header.theSendersBlockRef = reference();
1381   signal->header.m_noOfSections = noOfSections;
1382 
1383   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1384     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1385     return;
1386   }//if
1387 
1388   SignalHeader sh;
1389   sh.theVerId_signalNumber   = gsn;
1390   sh.theReceiversBlockNumber = recBlock;
1391   sh.theSendersBlockRef      = refToBlock(reference());
1392   sh.theLength               = length;
1393   sh.theTrace                = tTrace;
1394   sh.theSignalId             = tSignalId;
1395   sh.m_noOfSections          = noOfSections;
1396   sh.m_fragmentInfo          = tFragInfo;
1397 
1398   /**
1399    * Check own node
1400    */
1401   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1402   {
1403 #ifdef VM_TRACE
1404     if(globalData.testOn){
1405       globalSignalLoggers.sendSignal(signal->header,
1406 				     jobBuffer,
1407 				     &signal->theData[0],
1408 				     ourProcessor,
1409                                      sections->m_ptr, noOfSections);
1410     }
1411 #endif
1412 
1413     Uint32 * dst = signal->theData + length;
1414 
1415     /* We need to copy the segmented section data into separate
1416      * sections when sending locally and keeping a copy ourselves
1417      */
1418     for (Uint32 sec=0; sec < noOfSections; sec++)
1419     {
1420       Uint32 secCopy;
1421       if (unlikely(! ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i)))
1422       {
1423         handle_out_of_longsignal_memory(signal);
1424         return;
1425       }
1426       * dst ++ = secCopy;
1427     }
1428 
1429 #ifdef NDBD_MULTITHREADED
1430     if (jobBuffer == JBB)
1431       sendlocal(m_threadId, &signal->header, signal->theData,
1432                 signal->theData + length);
1433     else
1434       sendprioa(m_threadId, &signal->header, signal->theData,
1435                 signal->theData + length);
1436 #else
1437     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1438 #endif
1439 
1440     rg.m_nodes.clear((Uint32)0);
1441     rg.m_nodes.clear(ourProcessor);
1442   }
1443 
1444   /**
1445    * Do the big loop
1446    */
1447   Uint32 recNode = 0;
1448   while(!rg.m_nodes.isclear()){
1449     recNode = rg.m_nodes.find(recNode + 1);
1450     rg.m_nodes.clear(recNode);
1451 
1452 #ifdef VM_TRACE
1453     if(globalData.testOn){
1454       globalSignalLoggers.sendSignal(signal->header,
1455 				     jobBuffer,
1456 				     &signal->theData[0],
1457 				     recNode,
1458                                      sections->m_ptr, noOfSections);
1459     }
1460 #endif
1461 
1462 #ifdef TRACE_DISTRIBUTED
1463     ndbout_c("send: %s(%d) to (%s, %d)",
1464 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1465 	     recNode);
1466 #endif
1467 
1468     SendStatus ss;
1469 #ifdef NDBD_MULTITHREADED
1470     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1471                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1472 #else
1473     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1474                                                &signal->theData[0], recNode,
1475                                                g_sectionSegmentPool,
1476                                                sections->m_ptr);
1477 #endif
1478 
1479     if (unlikely(! (ss == SEND_OK ||
1480                     ss == SEND_BLOCKED ||
1481                     ss == SEND_DISCONNECTED)))
1482     {
1483       handle_send_failed(ss, signal);
1484     }
1485   }
1486 
1487   signal->header.m_noOfSections = 0;
1488   signal->header.m_fragmentInfo = 0;
1489 
1490   return;
1491 }
1492 
1493 
1494 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length) const1495 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1496 				    GlobalSignalNumber gsn,
1497 				    Signal* signal,
1498 				    Uint32 delayInMilliSeconds,
1499 				    Uint32 length) const {
1500 
1501   BlockNumber bnr = refToBlock(ref);
1502 
1503   check_sections(signal, signal->header.m_noOfSections, 0);
1504 
1505   signal->header.theLength = length;
1506   signal->header.theSendersSignalId = signal->header.theSignalId;
1507   signal->header.theVerId_signalNumber = gsn;
1508   signal->header.theReceiversBlockNumber = bnr;
1509   signal->header.theSendersBlockRef = reference();
1510 
1511 #ifdef VM_TRACE
1512   {
1513     if(globalData.testOn){
1514       globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1515 					      signal->header,
1516 					      0,
1517 					      &signal->theData[0],
1518 					      globalData.ownId);
1519     }
1520   }
1521 #endif
1522 
1523 #ifdef NDBD_MULTITHREADED
1524   senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1525 #else
1526   globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1527 #endif
1528 
1529   // befor 2nd parameter to globalTimeQueue.insert
1530   // (Priority)theSendSig[sigIndex].jobBuffer
1531 }
1532 
1533 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length,SectionHandle * sections) const1534 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1535 				    GlobalSignalNumber gsn,
1536 				    Signal* signal,
1537 				    Uint32 delayInMilliSeconds,
1538 				    Uint32 length,
1539 				    SectionHandle * sections) const {
1540 
1541   Uint32 noOfSections = sections->m_cnt;
1542   BlockNumber bnr = refToBlock(ref);
1543 
1544   BlockReference sendBRef = reference();
1545 
1546   if (bnr == 0) {
1547     bnr_error();
1548   }//if
1549 
1550   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1551 
1552   signal->header.theLength = length;
1553   signal->header.theSendersSignalId = signal->header.theSignalId;
1554   signal->header.theSendersBlockRef = sendBRef;
1555   signal->header.theVerId_signalNumber = gsn;
1556   signal->header.theReceiversBlockNumber = bnr;
1557   signal->header.m_noOfSections = noOfSections;
1558 
1559   Uint32 * dst = signal->theData + length;
1560   * dst ++ = sections->m_ptr[0].i;
1561   * dst ++ = sections->m_ptr[1].i;
1562   * dst ++ = sections->m_ptr[2].i;
1563 
1564 #ifdef VM_TRACE
1565   {
1566     if(globalData.testOn){
1567       globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1568 					      signal->header,
1569 					      0,
1570 					      &signal->theData[0],
1571 					      globalData.ownId);
1572     }
1573   }
1574 #endif
1575 
1576 #ifdef NDBD_MULTITHREADED
1577   senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1578 #else
1579   globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1580 #endif
1581 
1582   signal->header.m_noOfSections = 0;
1583   signal->header.m_fragmentInfo = 0;
1584   sections->m_cnt = 0;
1585 }
1586 
1587 void
release(SegmentedSectionPtr & ptr)1588 SimulatedBlock::release(SegmentedSectionPtr & ptr)
1589 {
1590   ::release(SB_SP_ARG ptr);
1591 }
1592 
1593 void
releaseSection(Uint32 firstSegmentIVal)1594 SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
1595 {
1596   ::releaseSection(SB_SP_ARG firstSegmentIVal);
1597 }
1598 
1599 void
releaseSections(SectionHandle & handle)1600 SimulatedBlock::releaseSections(SectionHandle& handle)
1601 {
1602   ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
1603   handle.m_cnt = 0;
1604 }
1605 
1606 bool
appendToSection(Uint32 & firstSegmentIVal,const Uint32 * src,Uint32 len)1607 SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
1608 {
1609   return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
1610 }
1611 
1612 bool
import(Ptr<SectionSegment> & first,const Uint32 * src,Uint32 len)1613 SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len)
1614 {
1615   return ::import(SB_SP_ARG first, src, len);
1616 }
1617 
1618 bool
import(SegmentedSectionPtr & ptr,const Uint32 * src,Uint32 len)1619 SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len)
1620 {
1621   Ptr<SectionSegment> tmp;
1622   if (::import(SB_SP_ARG tmp, src, len))
1623   {
1624     ptr.i = tmp.i;
1625     ptr.p = tmp.p;
1626     ptr.sz = len;
1627     return true;
1628   }
1629   return false;
1630 }
1631 
1632 bool
import(SectionHandle * dst,LinearSectionPtr src[3],Uint32 cnt)1633 SimulatedBlock::import(SectionHandle * dst,
1634                        LinearSectionPtr src[3],
1635                        Uint32 cnt)
1636 {
1637   ndbassert(dst->m_cnt == 0);
1638   if (dst->m_cnt)
1639   {
1640     releaseSections(* dst);
1641   }
1642 
1643   for (Uint32 i = 0; i < cnt; i++)
1644   {
1645     if (unlikely(!import(dst->m_ptr[i], src[i].p, src[i].sz)))
1646     {
1647       if (i)
1648       {
1649         dst->m_cnt = i - 1;
1650         releaseSections(* dst);
1651         return false;
1652       }
1653     }
1654   }
1655   dst->m_cnt = cnt;
1656   return true;
1657 }
1658 
1659 
1660 bool
dupSection(Uint32 & copyFirstIVal,Uint32 srcFirstIVal)1661 SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
1662 {
1663   return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
1664 }
1665 
1666 bool
writeToSection(Uint32 firstSegmentIVal,Uint32 offset,const Uint32 * src,Uint32 len)1667 SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32 offset,
1668                                const Uint32* src, Uint32 len)
1669 {
1670   return ::writeToSection(firstSegmentIVal, offset, src, len);
1671 }
1672 
1673 class SectionSegmentPool&
getSectionSegmentPool()1674 SimulatedBlock::getSectionSegmentPool(){
1675   return g_sectionSegmentPool;
1676 }
1677 
1678 NewVARIABLE *
allocateBat(int batSize)1679 SimulatedBlock::allocateBat(int batSize){
1680   NewVARIABLE* bat = NewVarRef;
1681   bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE));
1682   NewVarRef = bat;
1683   theBATSize = batSize;
1684   return bat;
1685 }
1686 
1687 void
freeBat()1688 SimulatedBlock::freeBat(){
1689   if(NewVarRef != 0){
1690     free(NewVarRef);
1691     NewVarRef = 0;
1692   }
1693 }
1694 
1695 const NewVARIABLE *
getBat(Uint16 blockNo,Uint32 instanceNo)1696 SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
1697   assert(blockNo == blockToMain(blockNo));
1698   SimulatedBlock * sb = globalData.getBlock(blockNo);
1699   if (sb != 0 && instanceNo != 0)
1700     sb = sb->getInstance(instanceNo);
1701   if(sb == 0)
1702     return 0;
1703   return sb->NewVarRef;
1704 }
1705 
1706 Uint16
getBatSize(Uint16 blockNo,Uint32 instanceNo)1707 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
1708   assert(blockNo == blockToMain(blockNo));
1709   SimulatedBlock * sb = globalData.getBlock(blockNo);
1710   if (sb != 0 && instanceNo != 0)
1711     sb = sb->getInstance(instanceNo);
1712   if(sb == 0)
1713     return 0;
1714   return sb->theBATSize;
1715 }
1716 
allocRecord(const char * type,size_t s,size_t n,bool clear,Uint32 paramId)1717 void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
1718 {
1719   return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
1720 }
1721 
1722 void*
allocRecordAligned(const char * type,size_t s,size_t n,void ** unaligned_buffer,Uint32 align,bool clear,Uint32 paramId)1723 SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
1724 {
1725 
1726   void * p = NULL;
1727   Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
1728   size_t size = n*s + over_alloc;
1729   Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
1730   refresh_watch_dog(9);
1731   if (real_size > 0){
1732 #ifdef VM_TRACE_MEM
1733     ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes",
1734 	     getBlockName(number()),
1735 	     type,
1736 	     s,
1737 	     n,
1738 	     real_size);
1739 #endif
1740     if( real_size == (Uint64)size )
1741       p = ndbd_malloc(size);
1742     if (p == NULL){
1743       char buf1[255];
1744       char buf2[255];
1745       struct ndb_mgm_param_info param_info;
1746       size_t size = sizeof(ndb_mgm_param_info);
1747 
1748       if(0 != paramId && 0 == ndb_mgm_get_db_parameter_info(paramId, &param_info, &size)) {
1749         BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s",
1750 	         getBlockName(number()), param_info.m_name);
1751       } else {
1752         BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
1753 	         getBlockName(number()), type);
1754       }
1755       BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes",
1756 	       (Uint32)s, (Uint32)n, (Uint64)real_size);
1757       ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1758     }
1759 
1760     if(clear){
1761       char * ptr = (char*)p;
1762       const Uint32 chunk = 128 * 1024;
1763       while(size > chunk){
1764 	refresh_watch_dog(9);
1765 	memset(ptr, 0, chunk);
1766 	ptr += chunk;
1767 	size -= chunk;
1768       }
1769       refresh_watch_dog(9);
1770       memset(ptr, 0, size);
1771     }
1772     if (unaligned_buffer)
1773     {
1774       *unaligned_buffer = p;
1775       p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
1776 #ifdef VM_TRACE
1777       g_eventLogger->info("'%s' (%u) %llu %llu, alignment correction %u bytes",
1778                           type, align, (Uint64)p, (Uint64)p+n*s,
1779                           (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
1780 #endif
1781     }
1782   }
1783   return p;
1784 }
1785 
1786 void
deallocRecord(void ** ptr,const char * type,size_t s,size_t n)1787 SimulatedBlock::deallocRecord(void ** ptr,
1788 			      const char * type, size_t s, size_t n){
1789   (void)type;
1790 
1791   if(* ptr != 0){
1792       ndbd_free(* ptr, n*s);
1793     * ptr = 0;
1794   }
1795 }
1796 
1797 int
sortchunks(const void * _e0,const void * _e1)1798 SimulatedBlock::sortchunks(const void * _e0, const void * _e1)
1799 {
1800   const AllocChunk *p0 = (const AllocChunk*)_e0;
1801   const AllocChunk *p1 = (const AllocChunk*)_e1;
1802 
1803   if (p0->ptrI > p1->ptrI)
1804     return 1;
1805   if (p0->ptrI < p1->ptrI)
1806     return -1;
1807   return 0;
1808 }
1809 
1810 Uint32
allocChunks(AllocChunk dst[],Uint32 arraysize,Uint32 rg,Uint32 pages,Uint32 paramId)1811 SimulatedBlock::allocChunks(AllocChunk dst[],
1812                             Uint32 arraysize,
1813                             Uint32 rg,
1814                             Uint32 pages,
1815                             Uint32 paramId)
1816 {
1817   const Uint32 save = pages; // For fail
1818   Uint32 i = 0;
1819   for (; i<arraysize && pages > 0; i++)
1820   {
1821     Uint32 cnt = pages;
1822     m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1);
1823     if (unlikely(cnt == 0))
1824       goto fail;
1825     pages -= cnt;
1826     dst[i].cnt = cnt;
1827   }
1828   if (unlikely(pages != 0))
1829     goto fail;
1830 
1831   qsort(dst, i, sizeof(dst[0]), sortchunks);
1832   return i;
1833 
1834 fail:
1835   char buf1[255];
1836   char buf2[255];
1837   struct ndb_mgm_param_info param_info;
1838   size_t size = sizeof(ndb_mgm_param_info);
1839 
1840   if (ndb_mgm_get_db_parameter_info(paramId, &param_info, &size) != 0)
1841   {
1842     ndbassert(false);
1843     param_info.m_name = "<unknown>";
1844   }
1845 
1846   BaseString::snprintf(buf1, sizeof(buf1),
1847                        "%s could not allocate memory for parameter %s",
1848                        getBlockName(number()), param_info.m_name);
1849   BaseString::snprintf(buf2, sizeof(buf2), "Requested: %llu bytes",
1850                        Uint64(save) * sizeof(GlobalPage));
1851   ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1852   return 0;
1853 }
1854 
1855 void
refresh_watch_dog(Uint32 place)1856 SimulatedBlock::refresh_watch_dog(Uint32 place)
1857 {
1858 #ifdef NDBD_MULTITHREADED
1859   (*m_watchDogCounter) = place;
1860 #else
1861   globalData.incrementWatchDogCounter(place);
1862 #endif
1863 }
1864 
1865 void
update_watch_dog_timer(Uint32 interval)1866 SimulatedBlock::update_watch_dog_timer(Uint32 interval)
1867 {
1868   extern EmulatorData globalEmulatorData;
1869   globalEmulatorData.theWatchDog->setCheckInterval(interval);
1870 }
1871 
1872 void
progError(int line,int err_code,const char * extra) const1873 SimulatedBlock::progError(int line, int err_code, const char* extra) const {
1874   jamNoBlock();
1875 
1876   const char *aBlockName = getBlockName(number(), "VM Kernel");
1877 
1878   // Pack status of interesting config variables
1879   // so that we can print them in error.log
1880   int magicStatus =
1881     (m_ctx.m_config.stopOnError()<<1) +
1882     (m_ctx.m_config.getInitialStart()<<2);
1883 
1884   /* Add line number to block name */
1885   char buf[100];
1886   BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x",
1887 	   aBlockName, line, magicStatus);
1888 
1889   ErrorReporter::handleError(err_code, extra, buf);
1890 
1891 }
1892 
1893 void
infoEvent(const char * msg,...) const1894 SimulatedBlock::infoEvent(const char * msg, ...) const {
1895   if(msg == 0)
1896     return;
1897 
1898   SignalT<25> signalT;
1899   signalT.theData[0] = NDB_LE_InfoEvent;
1900   char * buf = (char *)(signalT.theData+1);
1901 
1902   va_list ap;
1903   va_start(ap, msg);
1904   BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1905   va_end(ap);
1906 
1907   size_t len = strlen(buf) + 1;
1908   if(len > 96){
1909     len = 96;
1910     buf[95] = 0;
1911   }
1912 
1913   /**
1914    * Init and put it into the job buffer
1915    */
1916   memset(&signalT.header, 0, sizeof(SignalHeader));
1917 
1918   const Signal * signal = globalScheduler.getVMSignals();
1919   Uint32 tTrace = signal->header.theTrace;
1920   Uint32 tSignalId = signal->header.theSignalId;
1921 
1922   signalT.header.theVerId_signalNumber   = GSN_EVENT_REP;
1923   signalT.header.theReceiversBlockNumber = CMVMI;
1924   signalT.header.theSendersBlockRef      = reference();
1925   signalT.header.theTrace                = tTrace;
1926   signalT.header.theSignalId             = tSignalId;
1927   signalT.header.theLength               = (Uint32)((len+3)/4)+1;
1928 
1929 #ifdef NDBD_MULTITHREADED
1930   sendlocal(m_threadId,
1931             &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1932 #else
1933   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1934                           signalT.m_sectionPtrI);
1935 #endif
1936 }
1937 
1938 void
warningEvent(const char * msg,...) const1939 SimulatedBlock::warningEvent(const char * msg, ...) const {
1940   if(msg == 0)
1941     return;
1942 
1943   SignalT<25> signalT;
1944   signalT.theData[0] = NDB_LE_WarningEvent;
1945   char * buf = (char *)(signalT.theData+1);
1946 
1947   va_list ap;
1948   va_start(ap, msg);
1949   BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1950   va_end(ap);
1951 
1952   size_t len = strlen(buf) + 1;
1953   if(len > 96){
1954     len = 96;
1955     buf[95] = 0;
1956   }
1957 
1958   /**
1959    * Init and put it into the job buffer
1960    */
1961   memset(&signalT.header, 0, sizeof(SignalHeader));
1962 
1963   const Signal * signal = globalScheduler.getVMSignals();
1964   Uint32 tTrace = signal->header.theTrace;
1965   Uint32 tSignalId = signal->header.theSignalId;
1966 
1967   signalT.header.theVerId_signalNumber   = GSN_EVENT_REP;
1968   signalT.header.theReceiversBlockNumber = CMVMI;
1969   signalT.header.theSendersBlockRef      = reference();
1970   signalT.header.theTrace                = tTrace;
1971   signalT.header.theSignalId             = tSignalId;
1972   signalT.header.theLength               = (Uint32)((len+3)/4)+1;
1973 
1974 #ifdef NDBD_MULTITHREADED
1975   sendlocal(m_threadId,
1976             &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1977 #else
1978   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1979                           signalT.m_sectionPtrI);
1980 #endif
1981 }
1982 
1983 void
execNODE_STATE_REP(Signal * signal)1984 SimulatedBlock::execNODE_STATE_REP(Signal* signal){
1985   const NodeStateRep * const  rep = (NodeStateRep *)&signal->theData[0];
1986 
1987   this->theNodeState = rep->nodeState;
1988 }
1989 
1990 void
execCHANGE_NODE_STATE_REQ(Signal * signal)1991 SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){
1992   const ChangeNodeStateReq * const  req =
1993     (ChangeNodeStateReq *)&signal->theData[0];
1994 
1995   this->theNodeState = req->nodeState;
1996   const Uint32 senderData = req->senderData;
1997   const BlockReference senderRef = req->senderRef;
1998 
1999   /**
2000    * Pack return signal
2001    */
2002   ChangeNodeStateConf * const  conf =
2003     (ChangeNodeStateConf *)&signal->theData[0];
2004 
2005   conf->senderData = senderData;
2006 
2007   sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
2008 	     ChangeNodeStateConf::SignalLength, JBB);
2009 }
2010 
2011 void
execNDB_TAMPER(Signal * signal)2012 SimulatedBlock::execNDB_TAMPER(Signal * signal){
2013   if (signal->getLength() == 1)
2014   {
2015     SET_ERROR_INSERT_VALUE(signal->theData[0]);
2016   }
2017   else
2018   {
2019     SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]);
2020   }
2021 }
2022 
2023 void
execSIGNAL_DROPPED_REP(Signal * signal)2024 SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
2025   /* Note no need for fragmented signal handling as we are
2026    * going to crash this node
2027    */
2028   char msg[64];
2029   const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
2030   BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
2031 	   rep->originalGsn, rep->originalLength,rep->originalSectionCount);
2032   ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
2033 			     msg,
2034 			     __FILE__,
2035 			     NST_ErrorHandler);
2036 }
2037 
2038 void
execCONTINUE_FRAGMENTED(Signal * signal)2039 SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
2040   jamEntry();
2041 
2042   ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2043   ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
2044 
2045   switch (sig->type)
2046   {
2047   case ContinueFragmented::CONTINUE_SENDING :
2048   {
2049     jam();
2050     Ptr<FragmentSendInfo> fragPtr;
2051 
2052     c_segmentedFragmentSendList.first(fragPtr);
2053     for(; !fragPtr.isNull();){
2054       jam();
2055       Ptr<FragmentSendInfo> copyPtr = fragPtr;
2056       c_segmentedFragmentSendList.next(fragPtr);
2057 
2058       sendNextSegmentedFragment(signal, * copyPtr.p);
2059       if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2060         jam();
2061         if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2062           jam();
2063           execute(signal, copyPtr.p->m_callback, 0);
2064         }//if
2065         c_segmentedFragmentSendList.release(copyPtr);
2066       }
2067     }
2068 
2069     c_linearFragmentSendList.first(fragPtr);
2070     for(; !fragPtr.isNull();){
2071       jam();
2072       Ptr<FragmentSendInfo> copyPtr = fragPtr;
2073       c_linearFragmentSendList.next(fragPtr);
2074 
2075       sendNextLinearFragment(signal, * copyPtr.p);
2076       if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2077         jam();
2078         if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2079           jam();
2080           execute(signal, copyPtr.p->m_callback, 0);
2081         }//if
2082         c_linearFragmentSendList.release(copyPtr);
2083       }
2084     }
2085 
2086     if(c_segmentedFragmentSendList.isEmpty() &&
2087        c_linearFragmentSendList.isEmpty()){
2088       jam();
2089       c_fragSenderRunning = false;
2090       return;
2091     }
2092 
2093     sig->type = ContinueFragmented::CONTINUE_SENDING;
2094     sig->line = __LINE__;
2095     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
2096     break;
2097   }
2098   case ContinueFragmented::CONTINUE_CLEANUP:
2099   {
2100     jam();
2101 
2102     const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2103     /* Check length of signal */
2104     ndbassert(signal->getLength() ==
2105               ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2106               callbackWords);
2107 
2108     Callback cb;
2109     memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2);
2110 
2111     doNodeFailureCleanup(signal,
2112                          sig->cleanup.failedNodeId,
2113                          sig->cleanup.resource,
2114                          sig->cleanup.cursor,
2115                          sig->cleanup.elementsCleaned,
2116                          cb);
2117     break;
2118   }
2119   default:
2120     ndbrequire(false);
2121   }
2122 }
2123 
2124 void
execSTOP_FOR_CRASH(Signal * signal)2125 SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal)
2126 {
2127 #ifdef NDBD_MULTITHREADED
2128   mt_execSTOP_FOR_CRASH();
2129 #endif
2130 }
2131 
2132 void
execNODE_START_REP(Signal * signal)2133 SimulatedBlock::execNODE_START_REP(Signal* signal)
2134 {
2135 }
2136 
2137 void
execAPI_START_REP(Signal * signal)2138 SimulatedBlock::execAPI_START_REP(Signal* signal)
2139 {
2140 }
2141 
2142 void
execSEND_PACKED(Signal * signal)2143 SimulatedBlock::execSEND_PACKED(Signal* signal)
2144 {
2145 }
2146 
2147 ATTRIBUTE_NOINLINE
2148 void
handle_execute_error(GlobalSignalNumber gsn)2149 SimulatedBlock::handle_execute_error(GlobalSignalNumber gsn)
2150 {
2151   /**
2152    * This method only called if an error has occurred
2153    */
2154   char errorMsg[255];
2155   if (!(gsn <= MAX_GSN)) {
2156     BaseString::snprintf(errorMsg, 255, "Illegal signal received (GSN %d too high)", gsn);
2157     ERROR_SET(fatal, NDBD_EXIT_PRGERR, errorMsg, errorMsg);
2158   }
2159   if (!(theExecArray[gsn] != 0)) {
2160     BaseString::snprintf(errorMsg, 255, "Illegal signal received (GSN %d not added)", gsn);
2161     ERROR_SET(fatal, NDBD_EXIT_PRGERR, errorMsg, errorMsg);
2162   }
2163   ndbrequire(false);
2164 }
2165 
2166 // MT LQH callback CONF via signal
2167 
2168 const SimulatedBlock::CallbackEntry&
getCallbackEntry(Uint32 ci)2169 SimulatedBlock::getCallbackEntry(Uint32 ci)
2170 {
2171   ndbrequire(m_callbackTableAddr != 0);
2172   const CallbackTable& ct = *m_callbackTableAddr;
2173   ndbrequire(ci < ct.m_count);
2174   return ct.m_entry[ci];
2175 }
2176 
2177 void
sendCallbackConf(Signal * signal,Uint32 fullBlockNo,CallbackPtr & cptr,Uint32 senderData,Uint32 callbackInfo,Uint32 returnCode)2178 SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
2179                                  CallbackPtr& cptr,
2180                                  Uint32 senderData, Uint32 callbackInfo,
2181                                  Uint32 returnCode)
2182 {
2183   Uint32 blockNo = blockToMain(fullBlockNo);
2184   Uint32 instanceNo = blockToInstance(fullBlockNo);
2185   SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo);
2186   ndbrequire(b != 0);
2187 
2188   const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
2189 
2190   if (!isNdbMtLqh()) {
2191     Callback c;
2192     c.m_callbackFunction = ce.m_function;
2193     c.m_callbackData = cptr.m_callbackData;
2194     b->execute(signal, c, returnCode);
2195 
2196     if (ce.m_flags & CALLBACK_ACK) {
2197       jam();
2198       CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2199       ack->senderData = senderData;
2200       ack->callbackInfo = callbackInfo;
2201       EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK,
2202                      signal, CallbackAck::SignalLength);
2203     }
2204   } else {
2205     CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend();
2206     conf->senderData = senderData;
2207     conf->senderRef = reference();
2208     conf->callbackIndex = cptr.m_callbackIndex;
2209     conf->callbackData = cptr.m_callbackData;
2210     conf->callbackInfo = callbackInfo;
2211     conf->returnCode = returnCode;
2212 
2213     if (ce.m_flags & CALLBACK_DIRECT) {
2214       jam();
2215       EXECUTE_DIRECT(blockNo, GSN_CALLBACK_CONF,
2216                      signal, CallbackConf::SignalLength, instanceNo);
2217     } else {
2218       jam();
2219       BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
2220       sendSignal(ref, GSN_CALLBACK_CONF,
2221                  signal, CallbackConf::SignalLength, JBB);
2222     }
2223   }
2224   cptr.m_callbackIndex = ZNIL;
2225 }
2226 
2227 void
execCALLBACK_CONF(Signal * signal)2228 SimulatedBlock::execCALLBACK_CONF(Signal* signal)
2229 {
2230   const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr();
2231 
2232   Uint32 senderData = conf->senderData;
2233   Uint32 senderRef = conf->senderRef;
2234   Uint32 callbackIndex = conf->callbackIndex;
2235   Uint32 callbackData = conf->callbackData;
2236   Uint32 callbackInfo = conf->callbackInfo;
2237   Uint32 returnCode = conf->returnCode;
2238 
2239   ndbrequire(m_callbackTableAddr != 0);
2240   const CallbackEntry& ce = getCallbackEntry(callbackIndex);
2241   CallbackFunction function = ce.m_function;
2242 
2243   Callback callback;
2244   callback.m_callbackFunction = function;
2245   callback.m_callbackData = callbackData;
2246   execute(signal, callback, returnCode);
2247 
2248   if (ce.m_flags & CALLBACK_ACK) {
2249     jam();
2250     CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2251     ack->senderData = senderData;
2252     ack->callbackInfo = callbackInfo;
2253     sendSignal(senderRef, GSN_CALLBACK_ACK,
2254                signal, CallbackAck::SignalLength, JBB);
2255   }
2256 }
2257 
2258 #ifdef VM_TRACE_TIME
2259 void
clearTimes()2260 SimulatedBlock::clearTimes() {
2261   for(Uint32 i = 0; i <= MAX_GSN; i++){
2262     m_timeTrace[i].cnt = 0;
2263     m_timeTrace[i].sum = 0;
2264     m_timeTrace[i].sub = 0;
2265   }
2266 }
2267 
2268 void
printTimes(FILE * output)2269 SimulatedBlock::printTimes(FILE * output){
2270   fprintf(output, "-- %s --\n", getBlockName(number()));
2271   Uint64 sum = 0;
2272   for(Uint32 i = 0; i <= MAX_GSN; i++){
2273     Uint32 n = m_timeTrace[i].cnt;
2274     if(n != 0){
2275       double dn = n;
2276 
2277       double avg = m_timeTrace[i].sum;
2278       double avg2 = avg - m_timeTrace[i].sub;
2279 
2280       avg /= dn;
2281       avg2 /= dn;
2282 
2283       fprintf(output,
2284 	      //name ; cnt ; loc ; acc
2285 	      "%s ; #%d ; %dus ; %dus ; %dms\n",
2286 	      getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
2287 	      (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
2288 
2289       sum += (m_timeTrace[i].sum - m_timeTrace[i].sub);
2290     }
2291   }
2292   sum = (sum + 500)/ 1000;
2293   fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
2294   fprintf(output, "\n");
2295   fflush(output);
2296 }
2297 
2298 #endif
2299 
FragmentInfo(Uint32 fragId,Uint32 sender)2300 SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
2301   m_fragmentId = fragId;
2302   m_senderRef = sender;
2303   m_sectionPtrI[0] = RNIL;
2304   m_sectionPtrI[1] = RNIL;
2305   m_sectionPtrI[2] = RNIL;
2306 }
2307 
FragmentSendInfo()2308 SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
2309 {
2310 }
2311 
2312 bool
assembleFragments(Signal * signal)2313 SimulatedBlock::assembleFragments(Signal * signal){
2314   Uint32 sigLen = signal->length() - 1;
2315   Uint32 fragId = signal->theData[sigLen];
2316   Uint32 fragInfo = signal->header.m_fragmentInfo;
2317   Uint32 senderRef = signal->getSendersBlockRef();
2318 
2319   Uint32 *sectionPtr = signal->m_sectionPtrI;
2320 
2321   if(fragInfo == 0){
2322     return true;
2323   }
2324 
2325   const Uint32 secs = signal->header.m_noOfSections;
2326   const Uint32 * const secNos = &signal->theData[sigLen - secs];
2327 
2328   if(fragInfo == 1){
2329     /**
2330      * First in train
2331      */
2332     Ptr<FragmentInfo> fragPtr;
2333     if(!c_fragmentInfoHash.seize(fragPtr)){
2334       ndbrequire(false);
2335       return false;
2336     }
2337 
2338     new (fragPtr.p)FragmentInfo(fragId, senderRef);
2339     c_fragmentInfoHash.add(fragPtr);
2340 
2341     for(Uint32 i = 0; i<secs; i++){
2342       Uint32 sectionNo = secNos[i];
2343       ndbassert(sectionNo < 3);
2344       fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i];
2345     }
2346 
2347     ndbassert(! fragPtr.p->isDropped() );
2348 
2349     /**
2350      * Don't release allocated segments
2351      */
2352     signal->header.m_fragmentInfo = 0;
2353     signal->header.m_noOfSections = 0;
2354     return false;
2355   }
2356 
2357   FragmentInfo key(fragId, senderRef);
2358   Ptr<FragmentInfo> fragPtr;
2359   if(c_fragmentInfoHash.find(fragPtr, key)){
2360 
2361     /**
2362      * FragInfo == 2 or 3
2363      */
2364     if ( likely(! fragPtr.p->isDropped()) )
2365     {
2366       Uint32 i;
2367       for(i = 0; i<secs; i++){
2368         Uint32 sectionNo = secNos[i];
2369         ndbassert(sectionNo < 3);
2370         Uint32 sectionPtrI = sectionPtr[i];
2371         if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
2372           linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
2373         } else {
2374           fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
2375         }
2376       }
2377 
2378       /**
2379        * fragInfo = 2
2380        */
2381       if(fragInfo == 2){
2382         signal->header.m_fragmentInfo = 0;
2383         signal->header.m_noOfSections = 0;
2384         return false;
2385       }
2386 
2387       /**
2388        * fragInfo = 3
2389        */
2390       for(i = 0; i<3; i++){
2391         Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
2392         if(ptrI != RNIL){
2393           signal->m_sectionPtrI[i] = ptrI;
2394         } else {
2395           break;
2396         }
2397       }
2398 
2399       signal->setLength(sigLen - secs);
2400       signal->header.m_noOfSections = i;
2401       signal->header.m_fragmentInfo = 0;
2402 
2403       c_fragmentInfoHash.release(fragPtr);
2404       return true;
2405     }
2406     else
2407     {
2408       /* This fragmented signal has already had at least 1 fragment
2409        * dropped.  We must release the received segments.
2410        */
2411       for (Uint32 i=0; i < secs; i++)
2412         releaseSection( sectionPtr[i] );
2413 
2414       signal->header.m_fragmentInfo = 0;
2415       signal->header.m_noOfSections = 0;
2416 
2417       /* FragInfo == 2
2418        * More fragments to come, keep waiting
2419        */
2420       if (fragInfo == 2)
2421         return false;
2422 
2423       /* FragInfo == 3
2424        * That was the last fragment.
2425        * We're now ready for handling the dropped signal.
2426        */
2427       SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData;
2428       Uint32 gsn = signal->header.theVerId_signalNumber;
2429       Uint32 len = signal->header.theLength;
2430       Uint32 newLen= (len > 22 ? 22 : len);
2431       memmove(rep->originalData, signal->theData, (4 * newLen));
2432       rep->originalGsn = gsn;
2433       rep->originalLength = len;
2434       rep->originalSectionCount = 0;
2435       signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
2436       signal->header.theLength = newLen + 3;
2437       signal->header.m_noOfSections = 0;
2438       signal->header.m_fragmentInfo = 3;
2439 
2440 
2441       /**
2442        * NOTE: Don't use EXECUTE_DIRECT as it
2443        *       sets sendersBlockRef to reference()
2444        */
2445       /* Perform dropped signal handling, in this thread, now */
2446       jamBuffer()->markEndOfSigExec();
2447       executeFunction(GSN_SIGNAL_DROPPED_REP, signal);
2448 
2449       /* return false to caller - they should not process the signal */
2450       return false;
2451     } // else (isDropped())
2452   }
2453 
2454   /**
2455    * Unable to find fragment
2456    */
2457   ndbrequire(false);
2458   return false;
2459 }
2460 
2461 bool
assembleDroppedFragments(Signal * signal)2462 SimulatedBlock::assembleDroppedFragments(Signal* signal)
2463 {
2464   /* This method is called at the start of a  SIGNAL_DROPPED_REP
2465    * handler when there is a chance that the dropped signal could
2466    * be part of a fragmented signal.
2467    * If the dropped signal was a fragmented signal, this
2468    * needs to be handled specially to ensure that fragments
2469    * of the signal are correctly dropped to avoid segment
2470    * leaks etc.
2471    * There are a number of cases :
2472    *   1) First fragment dropped  (FragInfo=1)
2473    *      All remaining fragments must be dropped when they
2474    *      arrive.  The Signal dropped report handler must be
2475    *      executed when the last fragment has arrived.
2476    *   2) Middle fragment dropped  (FragInfo=2)
2477    *      Any existing stored segments must be released.
2478    *      All remaining fragments must be dropped when they
2479    *      arrive.
2480    *   3) Last fragment dropped  (FragInfo=3)
2481    *      Any existing stored segments must be released.
2482    *      Signal Dropped handling can occur, so return true.
2483    *
2484    * To indicate that a fragment has been dropped for a signal,
2485    * all the section I Values in the fragment's hash entry are
2486    * set to RNIL.
2487    * Signal Dropped Report handling is performed when the last
2488    * fragment arrives.  If the last fragment is not dropped
2489    * by the transporter layer then normal fragment assembly
2490    * arranges for dropped signal handling to occur.
2491    */
2492   Uint32 sigLen = signal->length() - 1;
2493   Uint32 fragId = signal->theData[sigLen];
2494   Uint32 fragInfo = signal->header.m_fragmentInfo;
2495   Uint32 senderRef = signal->getSendersBlockRef();
2496 
2497   if(fragInfo == 0){
2498     return true;
2499   }
2500 
2501   /* This method is for handling SIGNAL_DROPPED_REP only */
2502   ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
2503   ndbrequire(signal->header.m_noOfSections == 0);
2504 
2505   if(fragInfo == 1){
2506     /**
2507      * First in train
2508      */
2509     Ptr<FragmentInfo> fragPtr;
2510     if(!c_fragmentInfoHash.seize(fragPtr)){
2511       ndbrequire(false);
2512       return false;
2513     }
2514 
2515     new (fragPtr.p)FragmentInfo(fragId, senderRef);
2516     c_fragmentInfoHash.add(fragPtr);
2517 
2518     /* Mark entry in hash as belonging to dropped signal so subsequent
2519      * fragments can also be dropped
2520      */
2521     fragPtr.p->m_sectionPtrI[0]= RNIL;
2522     fragPtr.p->m_sectionPtrI[1]= RNIL;
2523     fragPtr.p->m_sectionPtrI[2]= RNIL;
2524 
2525     /* Wait for last fragment before SignalDroppedRep handling */
2526     signal->header.m_fragmentInfo = 0;
2527     return false;
2528   }
2529 
2530   FragmentInfo key(fragId, senderRef);
2531   Ptr<FragmentInfo> fragPtr;
2532   if(c_fragmentInfoHash.find(fragPtr, key)){
2533 
2534     /**
2535      * FragInfo == 2 or 3
2536      */
2537     if (! fragPtr.p->isDropped() )
2538     {
2539       /* Fragmented Signal not already marked as dropped
2540        * Need to free stored segments
2541        */
2542       releaseSection(fragPtr.p->m_sectionPtrI[0]);
2543       releaseSection(fragPtr.p->m_sectionPtrI[1]);
2544       releaseSection(fragPtr.p->m_sectionPtrI[2]);
2545 
2546       /* Mark as dropped now */
2547       fragPtr.p->m_sectionPtrI[0]= RNIL;
2548       fragPtr.p->m_sectionPtrI[1]= RNIL;
2549       fragPtr.p->m_sectionPtrI[2]= RNIL;
2550 
2551       ndbassert( fragPtr.p->isDropped() );
2552     }
2553 
2554     /**
2555      * fragInfo = 2
2556      *   Still waiting for final fragments.
2557      *   Return false to caller.
2558      */
2559     if(fragInfo == 2){
2560       signal->header.m_fragmentInfo = 0;
2561       return false;
2562     }
2563 
2564     /**
2565      * fragInfo = 3
2566      *   All fragments received, remove entry
2567      *   from hash and return to caller for
2568      *   dropped signal handling.
2569      */
2570     signal->header.m_fragmentInfo = 0;
2571 
2572     c_fragmentInfoHash.release(fragPtr);
2573     return true;
2574   }
2575 
2576   /**
2577    * Unable to find fragment
2578    */
2579   ndbrequire(false);
2580   return false;
2581 }
2582 
2583 /**
2584  * doCleanupFragInfo
2585  * Iterate over block's Fragment assembly hash, looking
2586  * for in-assembly fragments from the failed node
2587  * Release these
2588  * Returns after each scanned bucket to avoid consuming
2589  * too much time.
2590  *
2591  * Parameters
2592  *   failedNodeId    : Node id of failed node
2593  *   cursor          : Hash bucket to start iteration from
2594  *   rtUnitsUsed     : Total rt units used
2595  *   elementsCleaned : Number of elements cleaned
2596  *
2597  * Updates
2598  *   cursor          : Hash bucket to continue iteration from
2599  *   rtUnitsUsed     : += units used
2600  *   elementsCleaned : += elements cleaned
2601  *
2602  * Returns
2603  *   true  if all FragInfo structs cleaned up
2604  *   false if more to do
2605  */
2606 bool
doCleanupFragInfo(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2607 SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
2608                                   Uint32& cursor,
2609                                   Uint32& rtUnitsUsed,
2610                                   Uint32& elementsCleaned)
2611 {
2612   jam();
2613   DLHashTable<FragmentInfo>::Iterator iter;
2614 
2615   c_fragmentInfoHash.next(cursor, iter);
2616 
2617   const Uint32 startBucket = iter.bucket;
2618 
2619   while (!iter.isNull() &&
2620          (iter.bucket == startBucket))
2621   {
2622     jam();
2623 
2624     Ptr<FragmentInfo> curr = iter.curr;
2625     c_fragmentInfoHash.next(iter);
2626 
2627     FragmentInfo* fragInfo = curr.p;
2628 
2629     if (refToNode(fragInfo->m_senderRef) == failedNodeId)
2630     {
2631       jam();
2632       /* We were assembling a fragmented signal from the
2633        * failed node, discard the partially assembled
2634        * sections and free the FragmentInfo hash entry
2635        */
2636       for(Uint32 s = 0; s<3; s++)
2637       {
2638         if (fragInfo->m_sectionPtrI[s] != RNIL)
2639         {
2640           jam();
2641           SegmentedSectionPtr ssptr;
2642           getSection(ssptr, fragInfo->m_sectionPtrI[s]);
2643           release(ssptr);
2644         }
2645       }
2646 
2647       /* Release FragmentInfo hash element */
2648       c_fragmentInfoHash.release(curr);
2649 
2650       elementsCleaned++;
2651       rtUnitsUsed+=3;
2652     }
2653 
2654     rtUnitsUsed++;
2655   } // while
2656 
2657   cursor = iter.bucket;
2658   return iter.isNull();
2659 }
2660 
2661 bool
doCleanupFragSend(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2662 SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
2663                                   Uint32& cursor,
2664                                   Uint32& rtUnitsUsed,
2665                                   Uint32& elementsCleaned)
2666 {
2667   jam();
2668 
2669   Ptr<FragmentSendInfo> fragPtr;
2670   const Uint32 NumSendLists = 2;
2671   ndbrequire(cursor < NumSendLists);
2672 
2673   DLList<FragmentSendInfo>* fragSendLists[ NumSendLists ] =
2674     { &c_segmentedFragmentSendList,
2675       &c_linearFragmentSendList };
2676 
2677   DLList<FragmentSendInfo>* list = fragSendLists[ cursor ];
2678 
2679   list->first(fragPtr);
2680   for(; !fragPtr.isNull();){
2681     jam();
2682     Ptr<FragmentSendInfo> copyPtr = fragPtr;
2683     list->next(fragPtr);
2684     rtUnitsUsed++;
2685 
2686     NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
2687 
2688     if (rg.m_nodes.get(failedNodeId))
2689     {
2690       jam();
2691       /* Fragmented signal is being sent to node */
2692       rg.m_nodes.clear(failedNodeId);
2693 
2694       if (rg.m_nodes.isclear())
2695       {
2696         jam();
2697         /* No other nodes in receiver group - send
2698          * is cancelled
2699          * Will be cleaned up in the usual CONTINUE_FRAGMENTED
2700          * handling code.
2701          */
2702         copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
2703       }
2704       elementsCleaned++;
2705     }
2706   }
2707 
2708   /* Next time we'll do the next list */
2709   cursor++;
2710 
2711   return (cursor == NumSendLists);
2712 }
2713 
2714 
2715 Uint32
doNodeFailureCleanup(Signal * signal,Uint32 failedNodeId,Uint32 resource,Uint32 cursor,Uint32 elementsCleaned,Callback & cb)2716 SimulatedBlock::doNodeFailureCleanup(Signal* signal,
2717                                      Uint32 failedNodeId,
2718                                      Uint32 resource,
2719                                      Uint32 cursor,
2720                                      Uint32 elementsCleaned,
2721                                      Callback& cb)
2722 {
2723   jam();
2724   const bool userCallback = (cb.m_callbackFunction != 0);
2725   const Uint32 maxRtUnits = userCallback ?
2726 #ifdef VM_TRACE
2727     2 :
2728 #else
2729     16 :
2730 #endif
2731     ~0; /* Must complete all processing in this call */
2732 
2733   Uint32 rtUnitsUsed = 0;
2734 
2735   /* Loop over resources, cleaning them up */
2736   do
2737   {
2738     bool resourceDone= false;
2739     switch(resource) {
2740     case ContinueFragmented::RES_FRAGSEND:
2741     {
2742       jam();
2743       resourceDone = doCleanupFragSend(failedNodeId, cursor,
2744                                        rtUnitsUsed, elementsCleaned);
2745       break;
2746     }
2747     case ContinueFragmented::RES_FRAGINFO:
2748     {
2749       jam();
2750       resourceDone = doCleanupFragInfo(failedNodeId, cursor,
2751                                        rtUnitsUsed, elementsCleaned);
2752       break;
2753     }
2754     case ContinueFragmented::RES_LAST:
2755     {
2756       jam();
2757       /* Node failure processing complete, execute user callback if provided */
2758       if (userCallback)
2759         execute(signal, cb, elementsCleaned);
2760 
2761       return elementsCleaned;
2762     }
2763     default:
2764       ndbrequire(false);
2765     }
2766 
2767     /* Did we complete cleaning up this resource? */
2768     if (resourceDone)
2769     {
2770       resource++;
2771       cursor= 0;
2772     }
2773 
2774   } while (rtUnitsUsed <= maxRtUnits);
2775 
2776   jam();
2777 
2778   /* Not yet completed failure handling.
2779    * Must have exhausted RT units.
2780    * Update cursor and re-invoke
2781    */
2782   ndbassert(userCallback);
2783 
2784   /* Send signal to continue processing */
2785 
2786   ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2787   sig->type = ContinueFragmented::CONTINUE_CLEANUP;
2788   sig->cleanup.failedNodeId = failedNodeId;
2789   sig->cleanup.resource = resource;
2790   sig->cleanup.cursor = cursor;
2791   sig->cleanup.elementsCleaned= elementsCleaned;
2792   Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2793   Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2794     callbackWords;
2795   ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
2796   memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2);
2797 
2798   sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
2799 
2800   return elementsCleaned;
2801 }
2802 
2803 Uint32
simBlockNodeFailure(Signal * signal,Uint32 failedNodeId,Callback & cb)2804 SimulatedBlock::simBlockNodeFailure(Signal* signal,
2805                                     Uint32 failedNodeId,
2806                                     Callback& cb)
2807 {
2808   jam();
2809   return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
2810 }
2811 
2812 Uint32
debugPrintFragmentCounts()2813 SimulatedBlock::debugPrintFragmentCounts()
2814 {
2815   const char* blockName = getBlockName(theNumber);
2816   DLHashTable<FragmentInfo>::Iterator iter;
2817   Uint32 fragmentInfoCount = 0;
2818   c_fragmentInfoHash.first(iter);
2819 
2820   while(!iter.isNull())
2821   {
2822     fragmentInfoCount++;
2823     c_fragmentInfoHash.next(iter);
2824   }
2825 
2826   Ptr<FragmentSendInfo> ptr;
2827   Uint32 linSendInfoCount = 0;
2828 
2829   c_linearFragmentSendList.first(ptr);
2830 
2831   while (!ptr.isNull())
2832   {
2833     linSendInfoCount++;
2834     c_linearFragmentSendList.next(ptr);
2835   }
2836 
2837   Uint32 segSendInfoCount = 0;
2838   c_segmentedFragmentSendList.first(ptr);
2839 
2840   while (!ptr.isNull())
2841   {
2842     segSendInfoCount++;
2843     c_segmentedFragmentSendList.next(ptr);
2844   }
2845 
2846   ndbout_c("%s : Fragment assembly hash entry count : %d",
2847            blockName,
2848            fragmentInfoCount);
2849 
2850   ndbout_c("%s : Linear fragment send list size : %d",
2851            blockName,
2852            linSendInfoCount);
2853 
2854   ndbout_c("%s : Segmented fragment send list size : %d",
2855            blockName,
2856            segSendInfoCount);
2857 
2858   return fragmentInfoCount +
2859     linSendInfoCount +
2860     segSendInfoCount;
2861 }
2862 
2863 
2864 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Uint32 messageSize)2865 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
2866 				  NodeReceiverGroup rg,
2867 				  GlobalSignalNumber gsn,
2868 				  Signal* signal,
2869 				  Uint32 length,
2870 				  JobBufferLevel jbuf,
2871 				  SectionHandle* sections,
2872                                   bool noRelease,
2873                                   Uint32 messageSize) {
2874 
2875   Uint32 noSections = sections->m_cnt;
2876   SegmentedSectionPtr * ptr = sections->m_ptr;
2877 
2878   info.m_sectionPtr[0].m_segmented.i = RNIL;
2879   info.m_sectionPtr[1].m_segmented.i = RNIL;
2880   info.m_sectionPtr[2].m_segmented.i = RNIL;
2881 
2882   Uint32 totalSize = 0;
2883   switch(noSections){
2884   case 3:
2885     info.m_sectionPtr[2].m_segmented.i = ptr[2].i;
2886     info.m_sectionPtr[2].m_segmented.p = ptr[2].p;
2887     totalSize += ptr[2].sz;
2888   case 2:
2889     info.m_sectionPtr[1].m_segmented.i = ptr[1].i;
2890     info.m_sectionPtr[1].m_segmented.p = ptr[1].p;
2891     totalSize += ptr[1].sz;
2892   case 1:
2893     info.m_sectionPtr[0].m_segmented.i = ptr[0].i;
2894     info.m_sectionPtr[0].m_segmented.p = ptr[0].p;
2895     totalSize += ptr[0].sz;
2896   }
2897 
2898   if(totalSize <= messageSize + SectionSegment::DataLength){
2899     /**
2900      * Send signal directly
2901      */
2902     if (noRelease)
2903       sendSignalNoRelease(rg, gsn, signal, length, jbuf, sections);
2904     else
2905       sendSignal(rg, gsn, signal, length, jbuf, sections);
2906 
2907     info.m_status = FragmentSendInfo::SendComplete;
2908     return true;
2909   }
2910 
2911   /**
2912    * Setup info object
2913    */
2914   info.m_status = FragmentSendInfo::SendNotComplete;
2915   info.m_prio = (Uint8)jbuf;
2916   info.m_gsn = gsn;
2917   info.m_fragInfo = 1;
2918   info.m_flags = 0;
2919   info.m_messageSize = messageSize;
2920   info.m_fragmentId = c_fragmentIdCounter++;
2921   info.m_nodeReceiverGroup = rg;
2922   info.m_callback.m_callbackFunction = 0;
2923 
2924   if (noRelease)
2925   {
2926     /* Record info that we are not releasing segments */
2927     info.m_flags|= FragmentSendInfo::SendNoReleaseSeg;
2928   }
2929   else
2930   {
2931     /**
2932      * Clear sections in caller's handle.  Actual send
2933      * will consume them
2934      */
2935     sections->m_cnt = 0;
2936   }
2937 
2938   /* Store main signal data in a segment for sending later */
2939   Ptr<SectionSegment> tmp;
2940   if(!import(tmp, &signal->theData[0], length))
2941   {
2942     handle_out_of_longsignal_memory(0);
2943     return false;
2944   }
2945   info.m_theDataSection.p = &tmp.p->theData[0];
2946   info.m_theDataSection.sz = length;
2947   tmp.p->theData[length] = tmp.i;
2948 
2949   sendNextSegmentedFragment(signal, info);
2950 
2951   if(c_fragmentIdCounter == 0){
2952     /**
2953      * Fragment id 0 is invalid
2954      */
2955     c_fragmentIdCounter = 1;
2956   }
2957 
2958   return true;
2959 }
2960 
2961 #if 0
2962 #define lsout(x) x
2963 #else
2964 #define lsout(x)
2965 #endif
2966 
2967 void
sendNextSegmentedFragment(Signal * signal,FragmentSendInfo & info)2968 SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
2969 					  FragmentSendInfo & info){
2970 
2971   if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
2972   {
2973     /* Send was cancelled - all dest. nodes have failed
2974      * since send was started
2975      */
2976     if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg))
2977     {
2978       /*
2979        * Free any sections still to be sent
2980        */
2981       SectionHandle handle(this);
2982       for (Uint32 s = 0; s < 3; s++)
2983       {
2984         Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
2985         if (sectionI != RNIL)
2986         {
2987           getSection(handle.m_ptr[handle.m_cnt], sectionI);
2988           info.m_sectionPtr[s].m_segmented.i = RNIL;
2989           info.m_sectionPtr[s].m_segmented.p = NULL;
2990           handle.m_cnt++;
2991         }
2992       }
2993 
2994       releaseSections(handle);
2995     }
2996 
2997     /* Free inline signal data storage section */
2998     Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
2999     g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3000 
3001     info.m_status = FragmentSendInfo::SendComplete;
3002     return;
3003   }
3004 
3005   /**
3006    * Setup main signal data from stored copy
3007    */
3008   const Uint32 sigLen = info.m_theDataSection.sz;
3009   memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3010 
3011   Uint32 sz = 0;
3012   Uint32 maxSz = info.m_messageSize;
3013 
3014   Int32 secNo = 2;
3015   Uint32 secCount = 0;
3016   Uint32 * secNos = &signal->theData[sigLen];
3017 
3018   SectionHandle sections(this);
3019   SegmentedSectionPtr *ptr = sections.m_ptr;
3020 
3021   bool split= false;
3022   Uint32 splitSectionStartI= RNIL;
3023   SectionSegment* splitSectionStartP= NULL;
3024   Uint32 splitSectionLastSegment= RNIL;
3025   Uint32 splitSectionSz= 0;
3026 
3027   enum { Unknown = 0, Full = 1 } loop = Unknown;
3028   for(; secNo >= 0 && secCount < 3; secNo--){
3029     Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
3030     if(ptrI == RNIL)
3031       continue;
3032 
3033     info.m_sectionPtr[secNo].m_segmented.i = RNIL;
3034 
3035     SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p;
3036     const Uint32 size = ptrP->m_sz;
3037 
3038     ptr[secCount].i = ptrI;
3039     ptr[secCount].p = ptrP;
3040     ptr[secCount].sz = size;
3041     secNos[secCount] = secNo;
3042     secCount++;
3043 
3044     const Uint32 sizeLeft = maxSz - sz;
3045     if(size <= sizeLeft){
3046       /**
3047        * The section fits
3048        */
3049       sz += size;
3050       lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3051       continue;
3052     }
3053 
3054     const Uint32 overflow = size - sizeLeft; // > 0
3055     if(overflow <= SectionSegment::DataLength){
3056       /**
3057        * Only one segment left to send
3058        *   send even if sizeLeft <= size
3059        */
3060       lsout(ndbout_c("section %d saved as %d but full over: %d",
3061 		     secNo, secCount-1, overflow));
3062       secNo--;
3063       break;
3064     }
3065 
3066     // size >= 61
3067     if(sizeLeft < SectionSegment::DataLength){
3068       /**
3069        * Less than one segment left (space)
3070        *   dont bother sending
3071        */
3072       secCount--;
3073       info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3074       loop = Full;
3075       lsout(ndbout_c("section %d not saved", secNo));
3076       break;
3077     }
3078 
3079     /**
3080      * Split list
3081      * 1) Find place to split
3082      * 2) Rewrite header (the part that will be sent)
3083      * 3) Write new header (for remaining part)
3084      * 4) Store new header on FragmentSendInfo - record
3085      */
3086     // size >= 61 && sizeLeft >= 60
3087     Uint32 sum = SectionSegment::DataLength;
3088     Uint32 prevPtrI = ptrI;
3089     ptrI = ptrP->m_nextSegment;
3090     const Uint32 fill = sizeLeft - SectionSegment::DataLength;
3091     while(sum < fill){
3092       prevPtrI = ptrI;
3093       ptrP = g_sectionSegmentPool.getPtr(ptrI);
3094       ptrI = ptrP->m_nextSegment;
3095       sum += SectionSegment::DataLength;
3096     }
3097 
3098     Uint32 prev = secCount - 1;
3099     /**
3100      * Record details of the section pre-split
3101      * This allows the split to be 'healed' afterwards in the
3102      * no release case.
3103      */
3104     split= true;
3105     splitSectionStartI= ptr[prev].i;
3106     splitSectionStartP= ptr[prev].p;
3107     splitSectionLastSegment= splitSectionStartP->m_lastSegment;
3108     splitSectionSz= splitSectionStartP->m_sz;
3109 
3110     /**
3111      * Rewrite header w.r.t size and last
3112      * This is what will be sent in this fragment.
3113      */
3114     splitSectionStartP->m_lastSegment = prevPtrI;
3115     splitSectionStartP->m_sz = sum;
3116     ptr[prev].sz = sum;
3117 
3118     /**
3119      * Write "new" list header
3120      * This is what remains to be sent in this section
3121      */
3122     ptrP = g_sectionSegmentPool.getPtr(ptrI);
3123     ptrP->m_lastSegment = splitSectionLastSegment;
3124     ptrP->m_sz = size - sum;
3125 
3126     /**
3127      * And store it on info-record
3128      */
3129     info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3130     info.m_sectionPtr[secNo].m_segmented.p = ptrP;
3131 
3132     loop = Full;
3133     lsout(ndbout_c("section %d split into %d", secNo, prev));
3134     break;
3135   }
3136 
3137   lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3138 		 loop, secNo, secCount, sz));
3139 
3140   /**
3141    * Store fragment id
3142    */
3143   secNos[secCount] = info.m_fragmentId;
3144 
3145   Uint32 fragInfo = info.m_fragInfo;
3146   info.m_fragInfo = 2;
3147   switch(loop){
3148   case Unknown:
3149     if(secNo >= 0){
3150       lsout(ndbout_c("Unknown - Full"));
3151       /**
3152        * Not finished
3153        */
3154       break;
3155     }
3156     // Fall through
3157     lsout(ndbout_c("Unknown - Done"));
3158     info.m_status = FragmentSendInfo::SendComplete;
3159     ndbassert(fragInfo == 2);
3160     fragInfo = 3;
3161   case Full:
3162     break;
3163   }
3164 
3165   signal->header.m_fragmentInfo = fragInfo;
3166   signal->header.m_noOfSections = 0;
3167   sections.m_cnt = secCount;
3168 
3169   if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)
3170   {
3171     sendSignalNoRelease(info.m_nodeReceiverGroup,
3172                         info.m_gsn,
3173                         signal,
3174                         sigLen + secCount + 1,
3175                         (JobBufferLevel)info.m_prio,
3176                         &sections);
3177     /* NoRelease leaves SectionHandle populated, we'll
3178      * clear it here.  The actual sections themselves
3179      * remain allocated.
3180      */
3181     sections.m_cnt = 0;
3182 
3183     if (split)
3184     {
3185       /* There was a split section, which required us to modify the
3186        * segment list.
3187        * Now restore the split section's segment list back to
3188        * its previous state
3189        * (Only really required for first segment, but we do
3190        *  it for all of them, to be a good citizen)
3191        */
3192       ndbrequire( splitSectionStartI != RNIL );
3193       ndbrequire( splitSectionStartP != NULL );
3194       ndbrequire( splitSectionLastSegment != RNIL );
3195 
3196       splitSectionStartP->m_lastSegment= splitSectionLastSegment;
3197       splitSectionStartP->m_sz= splitSectionSz;
3198 
3199       /* Check our handiwork */
3200       assert(verifySection(splitSectionStartI));
3201     }
3202   }
3203   else
3204   {
3205     /* Normal, release sections case */
3206     sendSignal(info.m_nodeReceiverGroup,
3207                info.m_gsn,
3208                signal,
3209                sigLen + secCount + 1,
3210                (JobBufferLevel)info.m_prio,
3211                &sections);
3212   }
3213 
3214   if(fragInfo == 3){
3215     /**
3216      * This is the last signal
3217      * Release saved 'main signal' words segment
3218      */
3219     g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3220   }
3221 }
3222 
3223 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Uint32 messageSize)3224 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
3225 				  NodeReceiverGroup rg,
3226 				  GlobalSignalNumber gsn,
3227 				  Signal* signal,
3228 				  Uint32 length,
3229 				  JobBufferLevel jbuf,
3230 				  LinearSectionPtr ptr[3],
3231 				  Uint32 noOfSections,
3232 				  Uint32 messageSize){
3233 
3234   check_sections(signal, signal->header.m_noOfSections, noOfSections);
3235 
3236   info.m_sectionPtr[0].m_linear.p = NULL;
3237   info.m_sectionPtr[1].m_linear.p = NULL;
3238   info.m_sectionPtr[2].m_linear.p = NULL;
3239 
3240   Uint32 totalSize = 0;
3241   switch(noOfSections){
3242   case 3:
3243     info.m_sectionPtr[2].m_linear = ptr[2];
3244     totalSize += ptr[2].sz;
3245   case 2:
3246     info.m_sectionPtr[1].m_linear = ptr[1];
3247     totalSize += ptr[1].sz;
3248   case 1:
3249     info.m_sectionPtr[0].m_linear = ptr[0];
3250     totalSize += ptr[0].sz;
3251   }
3252 
3253   if(totalSize <= messageSize + SectionSegment::DataLength){
3254     /**
3255      * Send signal directly
3256      */
3257     sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
3258     info.m_status = FragmentSendInfo::SendComplete;
3259 
3260     /**
3261      * Indicate to sendLinearSignalFragment
3262      *   that we'r already done
3263      */
3264     return true;
3265   }
3266 
3267   /**
3268    * Setup info object
3269    */
3270   info.m_status = FragmentSendInfo::SendNotComplete;
3271   info.m_prio = (Uint8)jbuf;
3272   info.m_gsn = gsn;
3273   info.m_messageSize = messageSize;
3274   info.m_fragInfo = 1;
3275   info.m_flags = 0;
3276   info.m_fragmentId = c_fragmentIdCounter++;
3277   info.m_nodeReceiverGroup = rg;
3278   info.m_callback.m_callbackFunction = 0;
3279 
3280   Ptr<SectionSegment> tmp;
3281   if(unlikely(!import(tmp, &signal->theData[0], length)))
3282   {
3283     handle_out_of_longsignal_memory(0);
3284     return false;
3285   }
3286 
3287   info.m_theDataSection.p = &tmp.p->theData[0];
3288   info.m_theDataSection.sz = length;
3289   tmp.p->theData[length] = tmp.i;
3290 
3291   sendNextLinearFragment(signal, info);
3292 
3293   if(c_fragmentIdCounter == 0){
3294     /**
3295      * Fragment id 0 is invalid
3296      */
3297     c_fragmentIdCounter = 1;
3298   }
3299 
3300   return true;
3301 }
3302 
3303 void
sendNextLinearFragment(Signal * signal,FragmentSendInfo & info)3304 SimulatedBlock::sendNextLinearFragment(Signal* signal,
3305 				       FragmentSendInfo & info){
3306 
3307   if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3308   {
3309     /* Send was cancelled - all dest. nodes have failed
3310      * since send was started
3311      */
3312     /* Free inline signal data storage section */
3313     Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3314     g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3315 
3316     info.m_status = FragmentSendInfo::SendComplete;
3317     return;
3318   }
3319 
3320   /**
3321    * Store "theData"
3322    */
3323   const Uint32 sigLen = info.m_theDataSection.sz;
3324   memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3325 
3326   Uint32 sz = 0;
3327   Uint32 maxSz = info.m_messageSize;
3328 
3329   Int32 secNo = 2;
3330   Uint32 secCount = 0;
3331   Uint32 * secNos = &signal->theData[sigLen];
3332   LinearSectionPtr signalPtr[3];
3333 
3334   enum { Unknown = 0, Full = 2 } loop = Unknown;
3335   for(; secNo >= 0 && secCount < 3; secNo--){
3336     Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
3337     if(ptrP == NULL)
3338       continue;
3339 
3340     info.m_sectionPtr[secNo].m_linear.p = NULL;
3341     const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
3342 
3343     signalPtr[secCount].p = ptrP;
3344     signalPtr[secCount].sz = size;
3345     secNos[secCount] = secNo;
3346     secCount++;
3347 
3348     const Uint32 sizeLeft = maxSz - sz;
3349     if(size <= sizeLeft){
3350       /**
3351        * The section fits
3352        */
3353       sz += size;
3354       lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3355       continue;
3356     }
3357 
3358     const Uint32 overflow = size - sizeLeft; // > 0
3359     if(overflow <= SectionSegment::DataLength){
3360       /**
3361        * Only one segment left to send
3362        *   send even if sizeLeft <= size
3363        */
3364       lsout(ndbout_c("section %d saved as %d but full over: %d",
3365 		     secNo, secCount-1, overflow));
3366       secNo--;
3367       break;
3368     }
3369 
3370     // size >= 61
3371     if(sizeLeft < SectionSegment::DataLength){
3372       /**
3373        * Less than one segment left (space)
3374        *   dont bother sending
3375        */
3376       secCount--;
3377       info.m_sectionPtr[secNo].m_linear.p = ptrP;
3378       loop = Full;
3379       lsout(ndbout_c("section %d not saved", secNo));
3380       break;
3381     }
3382 
3383     /**
3384      * Split list
3385      * 1) Find place to split
3386      * 2) Rewrite header (the part that will be sent)
3387      * 3) Write new header (for remaining part)
3388      * 4) Store new header on FragmentSendInfo - record
3389      */
3390     Uint32 sum = sizeLeft;
3391     sum /= SectionSegment::DataLength;
3392     sum *= SectionSegment::DataLength;
3393 
3394     /**
3395      * Rewrite header w.r.t size
3396      */
3397     Uint32 prev = secCount - 1;
3398     signalPtr[prev].sz = sum;
3399 
3400     /**
3401      * Write/store "new" header
3402      */
3403     info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
3404     info.m_sectionPtr[secNo].m_linear.sz = size - sum;
3405 
3406     loop = Full;
3407     lsout(ndbout_c("section %d split into %d", secNo, prev));
3408     break;
3409   }
3410 
3411   lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3412 		 loop, secNo, secCount, sz));
3413 
3414   /**
3415    * Store fragment id
3416    */
3417   secNos[secCount] = info.m_fragmentId;
3418 
3419   Uint32 fragInfo = info.m_fragInfo;
3420   info.m_fragInfo = 2;
3421   switch(loop){
3422   case Unknown:
3423     if(secNo >= 0){
3424       lsout(ndbout_c("Unknown - Full"));
3425       /**
3426        * Not finished
3427        */
3428       break;
3429     }
3430     // Fall through
3431     lsout(ndbout_c("Unknown - Done"));
3432     info.m_status = FragmentSendInfo::SendComplete;
3433     ndbassert(fragInfo == 2);
3434     fragInfo = 3;
3435   case Full:
3436     break;
3437   }
3438 
3439   signal->header.m_noOfSections = 0;
3440   signal->header.m_fragmentInfo = fragInfo;
3441 
3442   sendSignal(info.m_nodeReceiverGroup,
3443 	     info.m_gsn,
3444 	     signal,
3445 	     sigLen + secCount + 1,
3446 	     (JobBufferLevel)info.m_prio,
3447 	     signalPtr,
3448 	     secCount);
3449 
3450   if(fragInfo == 3){
3451     /**
3452      * This is the last signal
3453      */
3454     g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3455   }
3456 }
3457 
3458 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3459 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3460 				     GlobalSignalNumber gsn,
3461 				     Signal* signal,
3462 				     Uint32 length,
3463 				     JobBufferLevel jbuf,
3464 				     SectionHandle* sections,
3465 				     Callback & c,
3466 				     Uint32 messageSize){
3467   bool res = true;
3468   Ptr<FragmentSendInfo> tmp;
3469   res = c_segmentedFragmentSendList.seizeFirst(tmp);
3470   ndbrequire(res);
3471 
3472   res = sendFirstFragment(* tmp.p,
3473 			  NodeReceiverGroup(ref),
3474 			  gsn,
3475 			  signal,
3476 			  length,
3477 			  jbuf,
3478 			  sections,
3479                           false, // Release sections on send
3480 			  messageSize);
3481   ndbrequire(res);
3482 
3483   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3484     c_segmentedFragmentSendList.release(tmp);
3485     if(c.m_callbackFunction != 0)
3486       execute(signal, c, 0);
3487     return;
3488   }
3489   tmp.p->m_callback = c;
3490 
3491   if(!c_fragSenderRunning)
3492   {
3493     SaveSignal<2> save(signal);
3494     c_fragSenderRunning = true;
3495     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3496     sig->type = ContinueFragmented::CONTINUE_SENDING;
3497     sig->line = __LINE__;
3498     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3499   }
3500 }
3501 
3502 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3503 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3504 				     GlobalSignalNumber gsn,
3505 				     Signal* signal,
3506 				     Uint32 length,
3507 				     JobBufferLevel jbuf,
3508 				     SectionHandle * sections,
3509 				     Callback & c,
3510 				     Uint32 messageSize){
3511   bool res = true;
3512   Ptr<FragmentSendInfo> tmp;
3513   res = c_segmentedFragmentSendList.seizeFirst(tmp);
3514   ndbrequire(res);
3515 
3516   res = sendFirstFragment(* tmp.p,
3517 			  rg,
3518 			  gsn,
3519 			  signal,
3520 			  length,
3521 			  jbuf,
3522 			  sections,
3523 			  false, // Release sections on send
3524                           messageSize);
3525   ndbrequire(res);
3526 
3527   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3528     c_segmentedFragmentSendList.release(tmp);
3529     if(c.m_callbackFunction != 0)
3530       execute(signal, c, 0);
3531     return;
3532   }
3533   tmp.p->m_callback = c;
3534 
3535   if(!c_fragSenderRunning)
3536   {
3537     SaveSignal<2> save(signal);
3538     c_fragSenderRunning = true;
3539     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3540     sig->type = ContinueFragmented::CONTINUE_SENDING;
3541     sig->line = __LINE__;
3542     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3543   }
3544 }
3545 
3546 SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
3547 void
TheNULLCallbackFunction(class Signal *,Uint32,Uint32)3548 SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32)
3549 { abort(); /* should never be called */ }
3550 SimulatedBlock::Callback SimulatedBlock::TheNULLCallback =
3551 { &SimulatedBlock::TheNULLCallbackFunction, 0 };
3552 
3553 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3554 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3555 				     GlobalSignalNumber gsn,
3556 				     Signal* signal,
3557 				     Uint32 length,
3558 				     JobBufferLevel jbuf,
3559 				     LinearSectionPtr ptr[3],
3560 				     Uint32 noOfSections,
3561 				     Callback & c,
3562 				     Uint32 messageSize){
3563   bool res = true;
3564   Ptr<FragmentSendInfo> tmp;
3565   res = c_linearFragmentSendList.seizeFirst(tmp);
3566   ndbrequire(res);
3567 
3568   res = sendFirstFragment(* tmp.p,
3569 			  NodeReceiverGroup(ref),
3570 			  gsn,
3571 			  signal,
3572 			  length,
3573 			  jbuf,
3574 			  ptr,
3575 			  noOfSections,
3576 			  messageSize);
3577   ndbrequire(res);
3578 
3579   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3580     c_linearFragmentSendList.release(tmp);
3581     if(c.m_callbackFunction != 0)
3582       execute(signal, c, 0);
3583     return;
3584   }
3585   tmp.p->m_callback = c;
3586 
3587   if(!c_fragSenderRunning)
3588   {
3589     SaveSignal<2> save(signal);
3590     c_fragSenderRunning = true;
3591     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3592     sig->type = ContinueFragmented::CONTINUE_SENDING;
3593     sig->line = __LINE__;
3594     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3595   }
3596 }
3597 
3598 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3599 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3600 				     GlobalSignalNumber gsn,
3601 				     Signal* signal,
3602 				     Uint32 length,
3603 				     JobBufferLevel jbuf,
3604 				     LinearSectionPtr ptr[3],
3605 				     Uint32 noOfSections,
3606 				     Callback & c,
3607 				     Uint32 messageSize){
3608   bool res = true;
3609   Ptr<FragmentSendInfo> tmp;
3610   res = c_linearFragmentSendList.seizeFirst(tmp);
3611   ndbrequire(res);
3612 
3613   res = sendFirstFragment(* tmp.p,
3614 			  rg,
3615 			  gsn,
3616 			  signal,
3617 			  length,
3618 			  jbuf,
3619 			  ptr,
3620 			  noOfSections,
3621 			  messageSize);
3622   ndbrequire(res);
3623 
3624   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3625     c_linearFragmentSendList.release(tmp);
3626     if(c.m_callbackFunction != 0)
3627       execute(signal, c, 0);
3628     return;
3629   }
3630   tmp.p->m_callback = c;
3631 
3632   if(!c_fragSenderRunning)
3633   {
3634     SaveSignal<2> save(signal);
3635     c_fragSenderRunning = true;
3636     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3637     sig->type = ContinueFragmented::CONTINUE_SENDING;
3638     sig->line = __LINE__;
3639     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3640   }
3641 }
3642 
3643 NodeInfo &
setNodeInfo(NodeId nodeId)3644 SimulatedBlock::setNodeInfo(NodeId nodeId) {
3645   ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
3646   return globalData.m_nodeInfo[nodeId];
3647 }
3648 
3649 bool
isMultiThreaded()3650 SimulatedBlock::isMultiThreaded()
3651 {
3652 #ifdef NDBD_MULTITHREADED
3653   return true;
3654 #else
3655   return false;
3656 #endif
3657 }
3658 
3659 
3660 void
execUTIL_CREATE_LOCK_REF(Signal * signal)3661 SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){
3662   jamEntry();
3663   c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
3664 }
3665 
execUTIL_CREATE_LOCK_CONF(Signal * signal)3666 void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){
3667   jamEntry();
3668   c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
3669 }
3670 
execUTIL_DESTORY_LOCK_REF(Signal * signal)3671 void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){
3672   jamEntry();
3673   c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
3674 }
3675 
execUTIL_DESTORY_LOCK_CONF(Signal * signal)3676 void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){
3677   jamEntry();
3678   c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
3679 }
3680 
execUTIL_LOCK_REF(Signal * signal)3681 void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){
3682   jamEntry();
3683   c_mutexMgr.execUTIL_LOCK_REF(signal);
3684 }
3685 
execUTIL_LOCK_CONF(Signal * signal)3686 void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){
3687   jamEntry();
3688   c_mutexMgr.execUTIL_LOCK_CONF(signal);
3689 }
3690 
execUTIL_UNLOCK_REF(Signal * signal)3691 void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){
3692   jamEntry();
3693   c_mutexMgr.execUTIL_UNLOCK_REF(signal);
3694 }
3695 
execUTIL_UNLOCK_CONF(Signal * signal)3696 void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){
3697   jamEntry();
3698   c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
3699 }
3700 
3701 void
ignoreMutexUnlockCallback(Signal * signal,Uint32 ptrI,Uint32 retVal)3702 SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal,
3703 					  Uint32 ptrI, Uint32 retVal){
3704   c_mutexMgr.release(ptrI);
3705 }
3706 
3707 void
fsRefError(Signal * signal,Uint32 line,const char * msg)3708 SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg)
3709 {
3710   const FsRef *fsRef = (FsRef*)signal->getDataPtr();
3711   Uint32 errorCode = fsRef->errorCode;
3712   Uint32 osErrorCode = fsRef->osErrorCode;
3713   char msg2[100];
3714 
3715   sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
3716 
3717   progError(line, errorCode, msg2);
3718 }
3719 
3720 void
execFSWRITEREF(Signal * signal)3721 SimulatedBlock::execFSWRITEREF(Signal* signal)
3722 {
3723   fsRefError(signal, __LINE__, "File system write failed");
3724 }
3725 
3726 void
execFSREADREF(Signal * signal)3727 SimulatedBlock::execFSREADREF(Signal* signal)
3728 {
3729   fsRefError(signal, __LINE__, "File system read failed");
3730 }
3731 
3732 void
execFSCLOSEREF(Signal * signal)3733 SimulatedBlock::execFSCLOSEREF(Signal* signal)
3734 {
3735   fsRefError(signal, __LINE__, "File system close failed");
3736 }
3737 
3738 void
execFSOPENREF(Signal * signal)3739 SimulatedBlock::execFSOPENREF(Signal* signal)
3740 {
3741   fsRefError(signal, __LINE__, "File system open failed");
3742 }
3743 
3744 void
execFSREMOVEREF(Signal * signal)3745 SimulatedBlock::execFSREMOVEREF(Signal* signal)
3746 {
3747   fsRefError(signal, __LINE__, "File system remove failed");
3748 }
3749 
3750 void
execFSSYNCREF(Signal * signal)3751 SimulatedBlock::execFSSYNCREF(Signal* signal)
3752 {
3753   fsRefError(signal, __LINE__, "File system sync failed");
3754 }
3755 
3756 void
execFSAPPENDREF(Signal * signal)3757 SimulatedBlock::execFSAPPENDREF(Signal* signal)
3758 {
3759   fsRefError(signal, __LINE__, "File system append failed");
3760 }
3761 
3762 #ifdef VM_TRACE
3763 static Ptr<void> * m_empty_global_variables[] = { 0 };
3764 void
disable_global_variables()3765 SimulatedBlock::disable_global_variables()
3766 {
3767   m_global_variables_save = m_global_variables;
3768   m_global_variables = m_empty_global_variables;
3769 }
3770 
3771 void
enable_global_variables()3772 SimulatedBlock::enable_global_variables()
3773 {
3774   if (m_global_variables == m_empty_global_variables)
3775   {
3776     m_global_variables = m_global_variables_save;
3777   }
3778 }
3779 
3780 void
clear_global_variables()3781 SimulatedBlock::clear_global_variables(){
3782   Ptr<void> ** tmp = m_global_variables;
3783   while(* tmp != 0){
3784     (* tmp)->i = RNIL;
3785     (* tmp)->p = 0;
3786     tmp++;
3787   }
3788 }
3789 
3790 void
init_globals_list(void ** tmp,size_t cnt)3791 SimulatedBlock::init_globals_list(void ** tmp, size_t cnt){
3792   m_global_variables = new Ptr<void> * [cnt+1];
3793   for(size_t i = 0; i<cnt; i++){
3794     m_global_variables[i] = (Ptr<void>*)tmp[i];
3795   }
3796   m_global_variables[cnt] = 0;
3797 }
3798 
3799 #endif
3800 
3801 Uint32
xfrm_key(Uint32 tab,const Uint32 * src,Uint32 * dst,Uint32 dstSize,Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const3802 SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src,
3803 			 Uint32 *dst, Uint32 dstSize,
3804 			 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3805 {
3806   const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
3807   const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3808 
3809   Uint32 i = 0;
3810   Uint32 srcPos = 0;
3811   Uint32 dstPos = 0;
3812   while (i < noOfKeyAttr)
3813   {
3814     const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
3815     Uint32 dstWords =
3816       xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3817                 src, srcPos, dst, dstPos, dstSize);
3818     keyPartLen[i++] = dstWords;
3819     if (unlikely(dstWords == 0))
3820       return 0;
3821   }
3822 
3823   if (0)
3824   {
3825     for(Uint32 i = 0; i<dstPos; i++)
3826     {
3827       printf("%.8x ", dst[i]);
3828     }
3829     printf("\n");
3830   }
3831   return dstPos;
3832 }
3833 
3834 Uint32
xfrm_attr(Uint32 attrDesc,CHARSET_INFO * cs,const Uint32 * src,Uint32 & srcPos,Uint32 * dst,Uint32 & dstPos,Uint32 dstSize) const3835 SimulatedBlock::xfrm_attr(Uint32 attrDesc, CHARSET_INFO* cs,
3836                           const Uint32* src, Uint32 & srcPos,
3837                           Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const
3838 {
3839   Uint32 array =
3840     AttributeDescriptor::getArrayType(attrDesc);
3841   Uint32 srcBytes =
3842     AttributeDescriptor::getSizeInBytes(attrDesc);
3843 
3844   Uint32 srcWords = ~0;
3845   Uint32 dstWords = ~0;
3846   uchar* dstPtr = (uchar*)&dst[dstPos];
3847   const uchar* srcPtr = (const uchar*)&src[srcPos];
3848 
3849   if (cs == NULL)
3850   {
3851     jam();
3852     Uint32 len;
3853     switch(array){
3854     case NDB_ARRAYTYPE_SHORT_VAR:
3855       len = 1 + srcPtr[0];
3856       break;
3857     case NDB_ARRAYTYPE_MEDIUM_VAR:
3858       len = 2 + srcPtr[0] + (srcPtr[1] << 8);
3859       break;
3860 #ifndef VM_TRACE
3861     default:
3862       abort();
3863 #endif
3864     case NDB_ARRAYTYPE_FIXED:
3865       len = srcBytes;
3866     }
3867     srcWords = (len + 3) >> 2;
3868     dstWords = srcWords;
3869     memcpy(dstPtr, srcPtr, dstWords << 2);
3870 
3871     if (0)
3872     {
3873       ndbout_c("srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d",
3874                srcPos, dstPos, len, srcWords, dstWords);
3875 
3876       for(Uint32 i = 0; i<srcWords; i++)
3877         printf("%.8x ", src[srcPos + i]);
3878       printf("\n");
3879     }
3880   }
3881   else
3882   {
3883     jam();
3884     Uint32 typeId =
3885       AttributeDescriptor::getType(attrDesc);
3886     Uint32 lb, len;
3887     bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
3888     if (unlikely(!ok))
3889       return 0;
3890     Uint32 xmul = cs->strxfrm_multiply;
3891     if (xmul == 0)
3892       xmul = 1;
3893     /*
3894      * Varchar end-spaces are ignored in comparisons.  To get same hash
3895      * we blank-pad to maximum length via strnxfrm.
3896      */
3897     Uint32 dstLen = xmul * (srcBytes - lb);
3898     ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
3899     int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
3900     if (unlikely(n == -1))
3901       return 0;
3902     while ((n & 3) != 0)
3903     {
3904       dstPtr[n++] = 0;
3905     }
3906     dstWords = (n >> 2);
3907     srcWords = (lb + len + 3) >> 2;
3908   }
3909 
3910   dstPos += dstWords;
3911   srcPos += srcWords;
3912   return dstWords;
3913 }
3914 
3915 Uint32
create_distr_key(Uint32 tableId,const Uint32 * src,Uint32 * dst,const Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const3916 SimulatedBlock::create_distr_key(Uint32 tableId,
3917 				 const Uint32 *src,
3918                                  Uint32* dst,
3919 				 const Uint32
3920 				 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3921 {
3922   const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3923   const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3924   Uint32 noOfDistrKeys = desc->noOfDistrKeys;
3925 
3926   Uint32 i = 0;
3927   Uint32 dstPos = 0;
3928 
3929   /* --Note that src and dst may be the same location-- */
3930 
3931   if(keyPartLen)
3932   {
3933     while (i < noOfKeyAttr && noOfDistrKeys)
3934     {
3935       Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3936       Uint32 len = keyPartLen[i];
3937       if(AttributeDescriptor::getDKey(attr))
3938       {
3939 	noOfDistrKeys--;
3940 	memmove(dst+dstPos, src, len << 2);
3941 	dstPos += len;
3942       }
3943       src += len;
3944       i++;
3945     }
3946   }
3947   else
3948   {
3949     while (i < noOfKeyAttr && noOfDistrKeys)
3950     {
3951       Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3952       Uint32 len = AttributeDescriptor::getSizeInWords(attr);
3953       ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED);
3954       if(AttributeDescriptor::getDKey(attr))
3955       {
3956 	noOfDistrKeys--;
3957 	memmove(dst+dstPos, src, len << 2);
3958 	dstPos += len;
3959       }
3960       src += len;
3961       i++;
3962     }
3963   }
3964   return dstPos;
3965 }
3966 
3967 CArray<KeyDescriptor> g_key_descriptor_pool;
3968 
3969 void
sendRoutedSignal(RoutePath path[],Uint32 pathcnt,Uint32 dst[],Uint32 dstcnt,Uint32 gsn,Signal * signal,Uint32 sigLen,JobBufferLevel prio,SectionHandle * userhandle)3970 SimulatedBlock::sendRoutedSignal(RoutePath path[], Uint32 pathcnt,
3971                                  Uint32 dst[],
3972                                  Uint32 dstcnt,
3973                                  Uint32 gsn,
3974                                  Signal * signal,
3975                                  Uint32 sigLen,
3976                                  JobBufferLevel prio,
3977                                  SectionHandle * userhandle)
3978 {
3979   ndbrequire(pathcnt > 0); // don't support (now) directly multi-cast
3980   pathcnt--; // first hop is made from here
3981 
3982 
3983   Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt;
3984   ndbrequire(len <= 25);
3985 
3986   SectionHandle handle(this, signal);
3987   if (userhandle)
3988   {
3989     jam();
3990     handle.m_cnt = userhandle->m_cnt;
3991     for (Uint32 i = 0; i<handle.m_cnt; i++)
3992       handle.m_ptr[i] = userhandle->m_ptr[i];
3993     userhandle->m_cnt = 0;
3994   }
3995 
3996   if (len + sigLen > 25)
3997   {
3998     jam();
3999 
4000     /**
4001      * we need to store theData in a section
4002      */
4003     ndbrequire(handle.m_cnt < 3);
4004     handle.m_ptr[2] = handle.m_ptr[1];
4005     handle.m_ptr[1] = handle.m_ptr[0];
4006     Ptr<SectionSegment> tmp;
4007     if (unlikely(! import(tmp, signal->theData, sigLen)))
4008     {
4009       handle_out_of_longsignal_memory(0);
4010     }
4011     handle.m_ptr[0].p = tmp.p;
4012     handle.m_ptr[0].i = tmp.i;
4013     handle.m_ptr[0].sz = sigLen;
4014     handle.m_cnt ++;
4015   }
4016   else
4017   {
4018     jam();
4019     memmove(signal->theData + len, signal->theData, 4 * sigLen);
4020     len += sigLen;
4021   }
4022 
4023   LocalRouteOrd * ord = (LocalRouteOrd*)signal->getDataPtrSend();
4024   ord->cnt = (pathcnt << 16) | (dstcnt);
4025   ord->gsn = gsn;
4026   ord->prio = Uint32(prio);
4027 
4028   Uint32 * dstptr = ord->path;
4029   for (Uint32 i = 1; i <= pathcnt; i++)
4030   {
4031     ndbrequire(refToNode(path[i].ref) == 0 ||
4032                refToNode(path[i].ref) == getOwnNodeId());
4033 
4034     * dstptr++ = path[i].ref;
4035     * dstptr++ = Uint32(path[i].prio);
4036   }
4037 
4038   for (Uint32 i = 0; i<dstcnt; i++)
4039   {
4040     ndbrequire(refToNode(dst[i]) == 0 ||
4041                refToNode(dst[i]) == getOwnNodeId());
4042 
4043     * dstptr++ = dst[i];
4044   }
4045 
4046   sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4047              path[0].prio, &handle);
4048 }
4049 
4050 void
execLOCAL_ROUTE_ORD(Signal * signal)4051 SimulatedBlock::execLOCAL_ROUTE_ORD(Signal* signal)
4052 {
4053   jamEntry();
4054 
4055   if (!assembleFragments(signal))
4056   {
4057     jam();
4058     return;
4059   }
4060 
4061   if (ERROR_INSERTED(1001))
4062   {
4063     /**
4064      * This NDBCNTR error code 1001
4065      */
4066     jam();
4067     SectionHandle handle(this, signal);
4068     sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200,
4069                         signal->getLength(), &handle);
4070     return;
4071   }
4072 
4073   LocalRouteOrd* ord = (LocalRouteOrd*)signal->getDataPtr();
4074   Uint32 pathcnt = ord->cnt >> 16;
4075   Uint32 dstcnt = ord->cnt & 0xFFFF;
4076   Uint32 sigLen = signal->getLength();
4077 
4078   if (pathcnt == 0)
4079   {
4080     /**
4081      * Send to final destination(s);
4082      */
4083     jam();
4084     Uint32 gsn = ord->gsn;
4085     Uint32 prio = ord->prio;
4086     memcpy(signal->theData+25, ord->path, 4*dstcnt);
4087     SectionHandle handle(this, signal);
4088     if (sigLen > LocalRouteOrd::StaticLen + dstcnt)
4089     {
4090       jam();
4091       /**
4092        * Data is at end of this...
4093        */
4094       memmove(signal->theData,
4095               signal->theData + LocalRouteOrd::StaticLen + dstcnt,
4096               4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt)));
4097       sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt);
4098     }
4099     else
4100     {
4101       jam();
4102       /**
4103        * Put section 0 in signal->theData
4104        */
4105       sigLen = handle.m_ptr[0].sz;
4106       ndbrequire(sigLen <= 25);
4107       copy(signal->theData, handle.m_ptr[0]);
4108       release(handle.m_ptr[0]);
4109 
4110       for (Uint32 i = 0; i < handle.m_cnt - 1; i++)
4111         handle.m_ptr[i] = handle.m_ptr[i+1];
4112       handle.m_cnt--;
4113     }
4114 
4115     /*
4116      * The extra if-statement is as sendSignalNoRelease will copy sections
4117      *   which is not necessary is only sending to one destination
4118      */
4119     if (dstcnt > 1)
4120     {
4121       jam();
4122       for (Uint32 i = 0; i<dstcnt; i++)
4123       {
4124         jam();
4125         sendSignalNoRelease(signal->theData[25+i], gsn, signal, sigLen,
4126                             JobBufferLevel(prio), &handle);
4127       }
4128       releaseSections(handle);
4129     }
4130     else
4131     {
4132       jam();
4133       sendSignal(signal->theData[25+0], gsn, signal, sigLen,
4134                  JobBufferLevel(prio), &handle);
4135     }
4136   }
4137   else
4138   {
4139     /**
4140      * Reroute
4141      */
4142     jam();
4143     SectionHandle handle(this, signal);
4144     Uint32 ref = ord->path[0];
4145     Uint32 prio = ord->path[1];
4146     Uint32 len = sigLen - 2;
4147     ord->cnt = ((pathcnt - 1) << 16) | dstcnt;
4148     memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen));
4149     sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4150                JobBufferLevel(prio), &handle);
4151   }
4152 }
4153 
4154 
4155 #ifdef VM_TRACE
4156 bool
debugOutOn()4157 SimulatedBlock::debugOutOn()
4158 {
4159   SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
4160   return
4161     globalData.testOn &&
4162     globalSignalLoggers.logMatch(number(), mask);
4163 }
4164 
4165 const char*
debugOutTag(char * buf,int line)4166 SimulatedBlock::debugOutTag(char *buf, int line)
4167 {
4168   char blockbuf[40];
4169   char instancebuf[40];
4170   char linebuf[40];
4171   char timebuf[40];
4172   sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
4173   instancebuf[0] = 0;
4174   if (instance() != 0)
4175     sprintf(instancebuf, "/%u", instance());
4176   sprintf(linebuf, " %d", line);
4177   timebuf[0] = 0;
4178 #ifdef VM_TRACE_TIME
4179   {
4180     Uint64 t = NdbTick_CurrentMillisecond();
4181     uint s = (t / 1000) % 3600;
4182     uint ms = t % 1000;
4183     sprintf(timebuf, " - %u.%03u -", s, ms);
4184   }
4185 #endif
4186   sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
4187   return buf;
4188 }
4189 #endif
4190 
4191 void
synchronize_threads_for_blocks(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4192 SimulatedBlock::synchronize_threads_for_blocks(Signal * signal,
4193                                                const Uint32 blocks[],
4194                                                const Callback & cb,
4195                                                JobBufferLevel prio)
4196 {
4197 #ifndef NDBD_MULTITHREADED
4198   Callback copy = cb;
4199   execute(signal, copy, 0);
4200 #else
4201   jam();
4202   Uint32 ref[32]; // max threads
4203   Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(),
4204                                                    ref, NDB_ARRAY_SIZE(ref));
4205   if (cnt == 0)
4206   {
4207     jam();
4208     Callback copy = cb;
4209     execute(signal, copy, 0);
4210     return;
4211   }
4212 
4213   Ptr<SyncThreadRecord> ptr;
4214   ndbrequire(c_syncThreadPool.seize(ptr));
4215   ptr.p->m_cnt = cnt;
4216   ptr.p->m_callback = cb;
4217 
4218   signal->theData[0] = reference();
4219   signal->theData[1] = ptr.i;
4220   signal->theData[2] = Uint32(prio);
4221   for (Uint32 i = 0; i<cnt; i++)
4222   {
4223     sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio);
4224   }
4225 #endif
4226 }
4227 
4228 void
execSYNC_THREAD_REQ(Signal * signal)4229 SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
4230 {
4231   jamEntry();
4232   Uint32 ref = signal->theData[0];
4233   Uint32 prio = signal->theData[2];
4234   sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
4235              JobBufferLevel(prio));
4236 }
4237 
4238 void
execSYNC_THREAD_CONF(Signal * signal)4239 SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
4240 {
4241   jamEntry();
4242   Ptr<SyncThreadRecord> ptr;
4243   c_syncThreadPool.getPtr(ptr, signal->theData[1]);
4244   if (ptr.p->m_cnt == 1)
4245   {
4246     jam();
4247     Callback copy = ptr.p->m_callback;
4248     c_syncThreadPool.release(ptr);
4249     execute(signal, copy, 0);
4250     return;
4251   }
4252   ptr.p->m_cnt --;
4253 }
4254 
4255 void
execSYNC_REQ(Signal * signal)4256 SimulatedBlock::execSYNC_REQ(Signal* signal)
4257 {
4258   jamEntry();
4259   Uint32 ref = signal->theData[0];
4260   Uint32 prio = signal->theData[2];
4261   sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(),
4262              JobBufferLevel(prio));
4263 }
4264 
4265 void
synchronize_path(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4266 SimulatedBlock::synchronize_path(Signal * signal,
4267                                  const Uint32 blocks[],
4268                                  const Callback & cb,
4269                                  JobBufferLevel prio)
4270 {
4271   jam();
4272 
4273   // reuse SyncThreadRecord
4274   Ptr<SyncThreadRecord> ptr;
4275   ndbrequire(c_syncThreadPool.seize(ptr));
4276   ptr.p->m_cnt = 0; // with count of 0
4277   ptr.p->m_callback = cb;
4278 
4279   SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4280   req->senderData = ptr.i;
4281   req->prio = Uint32(prio);
4282   req->count = 1;
4283   if (blocks[0] == 0)
4284   {
4285     jam();
4286     ndbrequire(false); // TODO
4287   }
4288   else
4289   {
4290     jam();
4291     Uint32 len = 0;
4292     for (; blocks[len+1] != 0; len++)
4293     {
4294       req->path[len] = blocks[len+1];
4295     }
4296     req->pathlen = 1 + len;
4297     req->path[len] = reference();
4298     sendSignal(numberToRef(blocks[0], getOwnNodeId()),
4299                GSN_SYNC_PATH_REQ, signal,
4300                SyncPathReq::SignalLength + (1 + len), prio);
4301   }
4302 }
4303 
4304 void
execSYNC_PATH_REQ(Signal * signal)4305 SimulatedBlock::execSYNC_PATH_REQ(Signal* signal)
4306 {
4307   jamEntry();
4308   SyncPathReq * req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4309   if (req->pathlen == 1)
4310   {
4311     jam();
4312     SyncPathReq copy = *req;
4313     SyncPathConf* conf = CAST_PTR(SyncPathConf, signal->getDataPtrSend());
4314     conf->senderData = copy.senderData;
4315     conf->count = copy.count;
4316     sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal,
4317                SyncPathConf::SignalLength, JobBufferLevel(copy.prio));
4318   }
4319   else
4320   {
4321     jam();
4322     Uint32 ref = numberToRef(req->path[0], getOwnNodeId());
4323     req->pathlen--;
4324     memmove(req->path, req->path + 1, 4 * req->pathlen);
4325     sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
4326                SyncPathReq::SignalLength + (1 + req->pathlen),
4327                JobBufferLevel(req->prio));
4328   }
4329 }
4330 
4331 void
execSYNC_PATH_CONF(Signal * signal)4332 SimulatedBlock::execSYNC_PATH_CONF(Signal* signal)
4333 {
4334   jamEntry();
4335   SyncPathConf conf = * CAST_CONSTPTR(SyncPathConf, signal->getDataPtr());
4336   Ptr<SyncThreadRecord> ptr;
4337 
4338   c_syncThreadPool.getPtr(ptr, conf.senderData);
4339 
4340   if (ptr.p->m_cnt == 0)
4341   {
4342     jam();
4343     ptr.p->m_cnt = conf.count;
4344   }
4345 
4346   if (ptr.p->m_cnt == 1)
4347   {
4348     jam();
4349     Callback copy = ptr.p->m_callback;
4350     c_syncThreadPool.release(ptr);
4351     execute(signal, copy, 0);
4352     return;
4353   }
4354 
4355   ptr.p->m_cnt --;
4356 }
4357 
4358 
4359 bool
checkNodeFailSequence(Signal * signal)4360 SimulatedBlock::checkNodeFailSequence(Signal* signal)
4361 {
4362   Uint32 ref = signal->getSendersBlockRef();
4363 
4364   /**
4365    * Make sure that a signal being part of node-failure handling
4366    *   from a remote node, does not get to us before we got the NODE_FAILREP
4367    *   (this to avoid tricky state handling)
4368    *
4369    * To ensure this, we send the signal via QMGR (GSN_COMMIT_FAILREQ)
4370    *   and NDBCNTR (which sends NODE_FAILREP)
4371    *
4372    * The extra time should be negilable
4373    *
4374    * Note, make an exception for signals sent by our self
4375    *       as they are only sent as a consequence of NODE_FAILREP
4376    */
4377   if (ref == reference() ||
4378       (refToNode(ref) == getOwnNodeId() &&
4379        refToMain(ref) == NDBCNTR))
4380   {
4381     jam();
4382     return true;
4383   }
4384 
4385   RoutePath path[2];
4386   path[0].ref = QMGR_REF;
4387   path[0].prio = JBB;
4388   path[1].ref = NDBCNTR_REF;
4389   path[1].prio = JBB;
4390 
4391   Uint32 dst[1];
4392   dst[0] = reference();
4393 
4394   SectionHandle handle(this, signal);
4395   Uint32 gsn = signal->header.theVerId_signalNumber;
4396   Uint32 len = signal->getLength();
4397 
4398   sendRoutedSignal(path, 2, dst, 1, gsn, signal, len, JBB, &handle);
4399   return false;
4400 }
4401 
4402 void
setup_wakeup()4403 SimulatedBlock::setup_wakeup()
4404 {
4405 #ifdef NDBD_MULTITHREADED
4406 #else
4407   globalTransporterRegistry.setup_wakeup_socket();
4408 #endif
4409 }
4410 
4411 void
wakeup()4412 SimulatedBlock::wakeup()
4413 {
4414 #ifdef NDBD_MULTITHREADED
4415   mt_wakeup(this);
4416 #else
4417   globalTransporterRegistry.wakeup();
4418 #endif
4419 }
4420 
4421 
4422 void
ndbinfo_send_row(Signal * signal,const DbinfoScanReq & req,const Ndbinfo::Row & row,Ndbinfo::Ratelimit & rl) const4423 SimulatedBlock::ndbinfo_send_row(Signal* signal,
4424                                  const DbinfoScanReq& req,
4425                                  const Ndbinfo::Row& row,
4426                                  Ndbinfo::Ratelimit& rl) const
4427 {
4428   // Check correct number of columns against table
4429   assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
4430 
4431   TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();
4432   tidai->connectPtr= req.resultData;
4433   tidai->transId[0]= req.transId[0];
4434   tidai->transId[1]= req.transId[1];
4435 
4436   LinearSectionPtr ptr[3];
4437   ptr[0].p = row.getDataPtr();
4438   ptr[0].sz = row.getLength();
4439 
4440   rl.rows++;
4441   rl.bytes += row.getLength();
4442 
4443   sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
4444              signal, TransIdAI::HeaderLength, JBB, ptr, 1);
4445 }
4446 
4447 
4448 void
ndbinfo_send_scan_break(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl,Uint32 data1,Uint32 data2,Uint32 data3,Uint32 data4) const4449 SimulatedBlock::ndbinfo_send_scan_break(Signal* signal,
4450                                        DbinfoScanReq& req,
4451                                        const Ndbinfo::Ratelimit& rl,
4452                                        Uint32 data1, Uint32 data2,
4453                                        Uint32 data3, Uint32 data4) const
4454 {
4455   DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4456   const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4457   MEMCOPY_NO_WORDS(conf, &req, signal_length);
4458 
4459   conf->returnedRows = rl.rows;
4460 
4461   // Update the cursor with current item number
4462   Ndbinfo::ScanCursor* cursor =
4463     (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4464 
4465   cursor->data[0] = data1;
4466   cursor->data[1] = data2;
4467   cursor->data[2] = data3;
4468   cursor->data[3] = data4;
4469 
4470   // Increase number of rows and bytes sent to far
4471   cursor->totalRows += rl.rows;
4472   cursor->totalBytes += rl.bytes;
4473 
4474   Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
4475 
4476   sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
4477              signal_length, JBB);
4478 }
4479 
4480 void
ndbinfo_send_scan_conf(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl) const4481 SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal,
4482                                        DbinfoScanReq& req,
4483                                        const Ndbinfo::Ratelimit& rl) const
4484 {
4485   DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4486   const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4487   Uint32 sender_ref = req.resultRef;
4488   MEMCOPY_NO_WORDS(conf, &req, signal_length);
4489 
4490   conf->returnedRows = rl.rows;
4491 
4492   if (req.cursor_sz)
4493   {
4494     jam();
4495     // Update the cursor with current item number
4496     Ndbinfo::ScanCursor* cursor =
4497       (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4498 
4499     // Reset all data holders
4500     memset(cursor->data, 0, sizeof(cursor->data));
4501 
4502     // Increase number of rows and bytes sent to far
4503     cursor->totalRows += rl.rows;
4504     cursor->totalBytes += rl.bytes;
4505 
4506     Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
4507 
4508     sender_ref = cursor->senderRef;
4509 
4510   }
4511   sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
4512              signal_length, JBB);
4513 }
4514 
init_elapsed_time(Signal * signal,NDB_TICKS & latestTIME_SIGNAL)4515 void SimulatedBlock::init_elapsed_time(Signal *signal,
4516                                        NDB_TICKS &latestTIME_SIGNAL)
4517 {
4518   const NDB_TICKS currentTime = NdbTick_getCurrentTicks();
4519   signal->theData[0] = Uint32(currentTime.getUint64() >> 32);
4520   signal->theData[1] = Uint32(currentTime.getUint64() & 0xFFFFFFFF);
4521   latestTIME_SIGNAL = currentTime;
4522   sendSignal(reference(), GSN_TIME_SIGNAL, signal, 2, JBB);
4523 }
4524 
sendTIME_SIGNAL(Signal * signal,const NDB_TICKS currentTime,Uint32 delay)4525 void SimulatedBlock::sendTIME_SIGNAL(Signal *signal,
4526                                      const NDB_TICKS currentTime,
4527                                      Uint32 delay)
4528 {
4529   signal->theData[0] = Uint32(currentTime.getUint64() >> 32);
4530   signal->theData[1] = Uint32(currentTime.getUint64() & 0xFFFFFFFF);
4531   sendSignalWithDelay(reference(), GSN_TIME_SIGNAL, signal, delay, 2);
4532 }
4533 
4534 /*
4535   This function is used to handle TIME_SIGNAL. This signal is intended to
4536   be used sort of like a drum beat. We should execute some timer calls
4537   every so often. However the OS can easily make the delayed signals to
4538   be delayed if the OS is occupied with other things. We will never report
4539   sleeps for longer than twice the expected delay. We rely on the delayed
4540   signal scheduler to ensure that we run time a bit faster for a while
4541   after long sleeps.
4542 
4543   This function will return the elapsed time since last time we called it.
4544 */
4545 Uint64
elapsed_time(Signal * signal,const NDB_TICKS currentTime,NDB_TICKS & latestTIME_SIGNAL,Uint32 expected_delay)4546 SimulatedBlock::elapsed_time(Signal *signal,
4547                              const NDB_TICKS currentTime,
4548                              NDB_TICKS &latestTIME_SIGNAL,
4549                              Uint32 expected_delay)
4550 {
4551   const Uint64 elapsed_time =
4552     NdbTick_Elapsed(latestTIME_SIGNAL, currentTime).milliSec();
4553   latestTIME_SIGNAL = currentTime;
4554 
4555   if (elapsed_time > Uint64(2 * expected_delay))
4556   {
4557     return Uint64(2 * expected_delay);
4558   }
4559   return elapsed_time;
4560 }
4561 
4562 #ifdef VM_TRACE
4563 void
assertOwnThread()4564 SimulatedBlock::assertOwnThread()
4565 {
4566 #ifdef NDBD_MULTITHREADED
4567   mt_assert_own_thread(this);
4568 #endif
4569 }
4570 
4571 #endif
4572 
4573 Uint32
get_recv_thread_idx(NodeId nodeId)4574 SimulatedBlock::get_recv_thread_idx(NodeId nodeId)
4575 {
4576 #ifdef NDBD_MULTITHREADED
4577   return mt_get_recv_thread_idx(nodeId);
4578 #else
4579   return 0;
4580 #endif
4581 }
4582 
4583 #ifndef NDBD_MULTITHREADED
4584 /**
4585  * Add a stub for this function since we have some code in ErrorReporter.cpp
4586  * that needs this function, it's only really needed for ndbmtd, so need an
4587  * empty function in ndbd.
4588  */
4589 void
prepare_to_crash(bool first_phase,bool error_insert_crash)4590 ErrorReporter::prepare_to_crash(bool first_phase, bool error_insert_crash)
4591 {
4592   (void)first_phase;
4593   (void)error_insert_crash;
4594 }
4595 #endif
4596 
4597 /**
4598  * Implementation of SegmentUtils
4599  * Here we forward the calls, but
4600  * in ndbmtd, add our thread cache
4601  * + lock function arguments.
4602  */
4603 
4604 SectionSegment*
getSegmentPtr(Uint32 iVal)4605 SimulatedBlock::getSegmentPtr(Uint32 iVal)
4606 {
4607   return g_sectionSegmentPool.getPtr(iVal);
4608 }
4609 
4610 bool
seizeSegment(Ptr<SectionSegment> & p)4611 SimulatedBlock::seizeSegment(Ptr<SectionSegment>& p)
4612 {
4613   return g_sectionSegmentPool.seize(SB_SP_REL_ARG p);
4614 }
4615 
4616 void
releaseSegment(Uint32 iVal)4617 SimulatedBlock::releaseSegment(Uint32 iVal)
4618 {
4619   g_sectionSegmentPool.release(SB_SP_REL_ARG iVal);
4620 }
4621 
4622 void
releaseSegmentList(Uint32 firstSegmentIVal)4623 SimulatedBlock::releaseSegmentList(Uint32 firstSegmentIVal)
4624 {
4625   ::releaseSection(SB_SP_ARG firstSegmentIVal);
4626 }
4627 
4628 
4629 /**
4630  * #undef is needed since this file is included by SimulatedBlock_nonmt.cpp
4631  * and SimulatedBlock_mt.cpp
4632  */
4633 #undef JAM_FILE_ID
4634